Spark-源码学习-SparkCore-存储服务-块元数据管理 BlockInfoManager
一、概述
BlockInfoManager 是 BlockManager 内部的子组件之一,BlockInfoManager 对 BlockInfo 进行了一些简单的管理,但是 BlockInfoManager 主要对 Block 的锁资源进行管理
二、实现
2.1. 结构
2.1.1. 属性
blockInfoWrappers
BlockId 与 BlockInfoWrapper 之间映射关系的缓存。
1
private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId, BlockInfoWrapper]
invisibleRDDBlocks
RDDBlock 可见性
1
private[this] val invisibleRDDBlocks = new mutable.HashSet[RDDBlockId]
writeLocksByTask
每次任务执行尝试的标识 TaskAttemptId 与执行获取的 Block 的写锁之间的映射关系。TaskAttemptId 与写锁之间是一对多的关系,即一次任务尝试执行会获取零到多个 Block 的写锁。
1
private[this] val writeLocksByTask = new ConcurrentHashMap[TaskAttemptId, util.Set[BlockId]]
readLocksByTask
每次任务尝试执行的标识 TaskAttemptId 与执行获取的 Block 的读锁之间的映射关系。TaskAttemptId 与读锁之间是一对多的关系,即一次任务尝试执行会获取零到多个 Block 的读锁,并且会记录对于同一个 Block 的读锁的占用次数。
1
private[this] val readLocksByTask = new ConcurrentHashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
三、初始化
在 BlockInfoManager 构造时,会调用 $registerTask()$ 方法注册任务,初始化 NON_TASK_WRITER
这个 TaskAttemptId 对应的 BlockId 集合。NON_TASK_WRITER
在 BlockInfo 伴生对象里定义,是一个特殊的标记(-1024),表示当前持有写锁的并非一个具体的 Task,而是其他线程。
1 | def registerTask(taskAttemptId: TaskAttemptId): Unit = { |
四、锁资源管理
BlockInfoManager 对 Block 的锁管理采用了共享锁与排他锁,其中读锁是共享锁,写锁是排他锁。一个 TaskAttempt 执行线程可以同时获得零到多个不同 Block 的写锁或零到多个不同 Block 的读锁,但不能同时获得同一个 Block 的读锁与写锁。读锁可重入,但写锁不能重入。
4.1. 加锁
4.1.1. 读锁
$lockForReading()$ 方法负责为一个 Block 加读锁:
- 根据块 ID 获取它对应的 BlockInfo,检查它的 writerTask 是否为
NO_WRITER
(值为 -1,表示该 BlockInfo 的写锁没有被占用) - 如果 Block 的写锁没有被其他任务尝试线程占用,自增 BlockInfo 结构中的 readerCount 计数,并将块 ID 加入 readLocksByTask 映射,视为加锁成功
- 如果允许阻塞(即 blocking 为 true),那么当前线程调用 $Object.wait()$ 方法等待,直到占用写锁的线程释放 Block 的写锁后被 $notify()/notifyAll()$ 方法唤醒当前线程。如果占有写锁的线程一直不释放写锁,那么当前线程将出现 “饥饿” 状况,即可能无限期等待下去。
4.1.2. 写锁
$lockForWriting()$ 方法负责为一个 Block 加写锁: 这个方法的执行流程与 $lockForReading()$ 方法相似,不过会将 BlockInfo 中的 writerTask 字段设为 Task 尝试 ID,将块 ID 加入writeLocksByTask 映射。
1 | if (acquire) { |
并且判断条件是没有读锁也没有写锁:
1 | val acquire = info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0 |
块的读锁和写锁、写锁和写锁之间是互斥的,而读锁和读锁之间是可以共享的,并且读锁可重入,写锁不可重入。
$lockForReading()$ 和 $lockForWriting()$ 两个方法共同实现了写锁与写锁、写锁与读锁之间的互斥性,同时也实现了读锁与读锁之间的共享。此外,这两个方法都提供了阻塞的方式,在读锁或写锁的争用较少或锁的持有时间都非常短暂,能够带来性能提升(如果获取锁的线程发现锁被占用,就立即失败,然而这个锁很快又被释放了,结果是获取锁的线程无法正常执行。如果获取锁的线程可以等待的话,很快它就发现自己能重新获得锁了,然后推进当前线程继续执行)
4.1.3. 写新 Block 时获得写锁
$lockNewBlockForWriting()$ 方法用来获取一个新块的写锁。该方法先试图持有 blockId 对应的块的读锁。如果能获取到,说明该块已经存在了,亦即已经有其他线程赢得竞争并写了这个块,没有必要再写,直接返回 false(表示返回读锁)。反之,就将这个新的块放入 infos 映射,然后获取其对应的写锁,并返回 true。
4.2. 释放锁
$unlock()$ 方法负责释放 BlockId 对应的 Block 上的锁
1 | def unlock(blockId: BlockId, taskAttemptIdOption: Option[TaskAttemptId] = None): Unit = { |
获取 BlockId 对应的BlockInfo
如果当前任务尝试线程已经获得了 Block 的写锁,则释放当前 Block 的写锁
如果当前任务尝试线程没有获得 Block 的写锁,则释放当前 Block 的读锁。释放读锁实际是减少当前任务尝试线程已经获取的 Block 的读锁次数。
newPinCountForTask 是当前尝试执行线程占有 BlockId 对应 Block 的读锁的次数与 1 的差值,如果大于等于 0,则说明当前尝试执行线程多次获得了这个读锁。
4.3. 锁降级
$lockForWriting()$ 方法负责为 Block 锁降级
1 | def downgradeLock(blockId: BlockId): Unit = { |
- 获取 BlockId 对应的 BlockInfo
- 调用 $unlock()$ 方法释放当前任务尝试线程从 BlockId 对应 Block 获取的写锁。
- 由于已经释放了 BlockId 对应 Block 的写锁,所以用非阻塞方式获取 BlockId 对应 Block 的读锁。