一、概述

BlockInfoManager 是 BlockManager 内部的子组件之一,BlockInfoManager 对 BlockInfo 进行了一些简单的管理,但是 BlockInfoManager 主要对 Block 的锁资源进行管理

二、实现

2.1. 结构

2.1.1. 属性

  1. blockInfoWrappers

    BlockId 与 BlockInfoWrapper 之间映射关系的缓存。

    1
    private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId, BlockInfoWrapper]
  2. invisibleRDDBlocks

    RDDBlock 可见性

    1
    private[this] val invisibleRDDBlocks = new mutable.HashSet[RDDBlockId]
  3. writeLocksByTask

    每次任务执行尝试的标识 TaskAttemptId 与执行获取的 Block 的写锁之间的映射关系。TaskAttemptId 与写锁之间是一对多的关系,即一次任务尝试执行会获取零到多个 Block 的写锁。

    1
    private[this] val writeLocksByTask = new ConcurrentHashMap[TaskAttemptId, util.Set[BlockId]]
  4. readLocksByTask

    每次任务尝试执行的标识 TaskAttemptId 与执行获取的 Block 的读锁之间的映射关系。TaskAttemptId 与读锁之间是一对多的关系,即一次任务尝试执行会获取零到多个 Block 的读锁,并且会记录对于同一个 Block 的读锁的占用次数。

    1
    private[this] val readLocksByTask = new ConcurrentHashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]

三、初始化

在 BlockInfoManager 构造时,会调用 $registerTask()$ 方法注册任务,初始化 NON_TASK_WRITER 这个 TaskAttemptId 对应的 BlockId 集合。NON_TASK_WRITER 在 BlockInfo 伴生对象里定义,是一个特殊的标记(-1024),表示当前持有写锁的并非一个具体的 Task,而是其他线程。

1
2
3
4
def registerTask(taskAttemptId: TaskAttemptId): Unit = {
writeLocksByTask.putIfAbsent(taskAttemptId, util.Collections.synchronizedSet(new util.HashSet))
readLocksByTask.putIfAbsent(taskAttemptId, ConcurrentHashMultiset.create())
}

四、锁资源管理

BlockInfoManager 对 Block 的锁管理采用了共享锁与排他锁,其中读锁是共享锁,写锁是排他锁。一个 TaskAttempt 执行线程可以同时获得零到多个不同 Block 的写锁或零到多个不同 Block 的读锁,但不能同时获得同一个 Block 的读锁与写锁。读锁可重入,但写锁不能重入。

4.1. 加锁

4.1.1. 读锁

$lockForReading()$ 方法负责为一个 Block 加读锁:

  1. 根据块 ID 获取它对应的 BlockInfo,检查它的 writerTask 是否为 NO_WRITER(值为 -1,表示该 BlockInfo 的写锁没有被占用)
  2. 如果 Block 的写锁没有被其他任务尝试线程占用,自增 BlockInfo 结构中的 readerCount 计数,并将块 ID 加入 readLocksByTask 映射,视为加锁成功
  3. 如果允许阻塞(即 blocking 为 true),那么当前线程调用 $Object.wait()$ 方法等待,直到占用写锁的线程释放 Block 的写锁后被 $notify()/notifyAll()$ 方法唤醒当前线程。如果占有写锁的线程一直不释放写锁,那么当前线程将出现 “饥饿” 状况,即可能无限期等待下去。

4.1.2. 写锁

$lockForWriting()$ 方法负责为一个 Block 加写锁: 这个方法的执行流程与 $lockForReading()$ 方法相似,不过会将 BlockInfo 中的 writerTask 字段设为 Task 尝试 ID,将块 ID 加入writeLocksByTask 映射。

1
2
3
4
if (acquire) {
info.writerTask = taskAttemptId
writeLocksByTask.get(taskAttemptId).add(blockId)
}

并且判断条件是没有读锁也没有写锁:

1
val acquire = info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0

块的读锁和写锁、写锁和写锁之间是互斥的,而读锁和读锁之间是可以共享的,并且读锁可重入,写锁不可重入。

$lockForReading()$ 和 $lockForWriting()$ 两个方法共同实现了写锁与写锁、写锁与读锁之间的互斥性,同时也实现了读锁与读锁之间的共享。此外,这两个方法都提供了阻塞的方式,在读锁或写锁的争用较少或锁的持有时间都非常短暂,能够带来性能提升(如果获取锁的线程发现锁被占用,就立即失败,然而这个锁很快又被释放了,结果是获取锁的线程无法正常执行。如果获取锁的线程可以等待的话,很快它就发现自己能重新获得锁了,然后推进当前线程继续执行)

4.1.3. 写新 Block 时获得写锁

$lockNewBlockForWriting()$ 方法用来获取一个新块的写锁。该方法先试图持有 blockId 对应的块的读锁。如果能获取到,说明该块已经存在了,亦即已经有其他线程赢得竞争并写了这个块,没有必要再写,直接返回 false(表示返回读锁)。反之,就将这个新的块放入 infos 映射,然后获取其对应的写锁,并返回 true。

4.2. 释放锁

$unlock()$ 方法负责释放 BlockId 对应的 Block 上的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def unlock(blockId: BlockId, taskAttemptIdOption: Option[TaskAttemptId] = None): Unit = {
val taskAttemptId = taskAttemptIdOption.getOrElse(currentTaskAttemptId)
blockInfo(blockId) { (info, condition) =>
if (info.writerTask != BlockInfo.NO_WRITER) {
info.writerTask = BlockInfo.NO_WRITER
writeLocksByTask.get(taskAttemptId).remove(blockId)
} else {
val countsForTask = readLocksByTask.get(taskAttemptId)
if (countsForTask != null) {
assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
info.readerCount -= 1
val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
assert(newPinCountForTask >= 0, s"Task $taskAttemptId release lock on block $blockId more times than it acquired it")
}
}
condition.signalAll()
}
}
  1. 获取 BlockId 对应的BlockInfo

  2. 如果当前任务尝试线程已经获得了 Block 的写锁,则释放当前 Block 的写锁

  3. 如果当前任务尝试线程没有获得 Block 的写锁,则释放当前 Block 的读锁。释放读锁实际是减少当前任务尝试线程已经获取的 Block 的读锁次数。

    newPinCountForTask 是当前尝试执行线程占有 BlockId 对应 Block 的读锁的次数与 1 的差值,如果大于等于 0,则说明当前尝试执行线程多次获得了这个读锁。

4.3. 锁降级

$lockForWriting()$ 方法负责为 Block 锁降级

1
2
3
4
5
6
7
8
def downgradeLock(blockId: BlockId): Unit = {
val taskAttemptId = currentTaskAttemptId
blockInfo(blockId) { (info, _) =>
unlock(blockId)
val lockOutcome = lockForReading(blockId, blocking = false)
assert(lockOutcome.isDefined)
}
}
  1. 获取 BlockId 对应的 BlockInfo
  2. 调用 $unlock()$ 方法释放当前任务尝试线程从 BlockId 对应 Block 获取的写锁。
  3. 由于已经释放了 BlockId 对应 Block 的写锁,所以用非阻塞方式获取 BlockId 对应 Block 的读锁。