一、概述

BlockManager 是 Spark 存储体系中的核心组件,运行在每个节点(Driver和 Executors)上,提供接口用于读写本地和远程各种存储设备(内存、磁盘和 off-heap)

二、架构设计

在 Spark 的 Driver 以及所有的 Executor 上,都存在一个 BlockManager、BlockManagerMaster。BlockManager 提供了存储模块与其他模块的交互接口,而 BlockManagerMaster 则是 Block 管理的接口类,通过调用 BlockManagerMasterEndpoint 和 RpcEndpointRef 进行通信。

无论是 Driver 还是 Executor 节点都会创建自己的 BlockManagerMaster,不过 Driver 上的 BlockManagerMaster 会实例化并且注册 BlockManagerMasterEndpoint, 在 BlockManagerMasterEndpoint 中维护了注册的 BlockManager, BlockManager 和 Executor 的映射关系,数据块所在的位置等信息。

Driver 端 BlockManager 中的 BlockManagerMaster 与 Driver 通信时,不产生网络通信。而 Executor 节点中的 BlockManagerMaster 与 Driver 通信时, 会产生网络通信。BlockManagerMasterEndpoint 只存在于 Driver 上。Executor 上通过获取的它的引用,然后给它发消息实现和 Driver 交互

Driver 上的 BlockManagerMaster 对于存在与 Executor 上的 BlockManager 统一管理,比如 Executor 需要向 Driver 发送注册 BlockManager、更新 Executor 上的 Block 的最新信息、询问所需要的 Block 目前所在的位置以及当 Executor 运行结束需要将此 Executor 移除等。而 BlockManager 只是负责管理所在 Executor 上的 Block。

BlockManagerMaster 负责发送消息,BlockManagerMasterEndpoint 负责消息的接收与处理,BlockManagerStorageEndpoint 则接收 BlockManagerMasterEndpoint 下发的命令。

三、组件

3.1. 磁盘组件

DiskStore 负责 Spark 磁盘存储。依赖于 DiskBlockManager, 负责对 Block 的磁盘存储。DiskStore 中数据的存取本质上就是字节序列与磁盘文件之间的转换,它通过 $putBytes()$ 方法把字节序列存入磁盘文件,再通过 $getBytes()$ 方法将文件内容转换为数据块。

3.2. 内存组件

MemoryStore 负责 Spark 内存存储。依赖于 MemoryManager,负责对 Block 的内存存储。

3.3. 块传输组件

BlockTransferService 负责建立到远程其他节点 BlockManager 的连接,对远程其他节点的 BlockManager 的数据进行读写。

3.4. 块退役组件

BlockManagerDecommissioner 负责将退役的块管理器上的数据迁移到其他健康的节点上,以保证数据的可靠性和高可用性。确保集群在节点退役时能够正常运行。

3.5. 通信组件

3.5.1. BlockManagerStorageEndpoint

RpCEndpoint 来响应 Master 发来的各种命令,如删除 block,删除 shuffle 文件,删除 Broadcast 数据 等、

BlockManagerMasterEndpoint 存在于 Driver 端,主要对 Executor 和 BlockManager 的关系、BlockManager 和 Block 的关系等进行管理。并作为一个 RpcEndpoint 接受各类消息事件进行处理

3.6. BlockReplicationPolicy

Spark 中块复制的策略

3.6.1. RandomBlockReplicationPolicy

默认策略,将块的副本随机放到不同的节点上。

3.6.2. BasicBlockReplicationPolicy

如果是 hdfs,需要配置成 BasicBlockReplicationPolicy,一份 rank,一份 out of rank,它随机选择。

3.6.3. SortOnHostNameBlockReplicationPolicy

按服主机名的字母顺序对每个数据块的副本进行排序。排序的目的是为了尽量将副本分布在不同的主机上,以增加数据的容错性和可靠性。通过将副本分布在不同的主机上,当某个主机发生故障时,仍然可以从其他主机获取数据。

3.7. BlockInfoManager

BlocklnfoManager 负责管理块的的元数据,并提供读写锁功能。

3.8. MigratableResolver

实验性接口,允许 Spark 迁移 shuffle 块。

四、初始化

4.1. 初始化 BlockManagerMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val blockManagerMaster = new BlockManagerMaster(
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(
rpcEnv,
isLocal,
conf,
listenerBus,
if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
externalShuffleClient
} else {
None
}, blockManagerInfo,
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
shuffleManager,
isDriver)
),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)
),
conf,
isDriver
)

4.2. 初始化 BlockManager

1
2
3
4
5
6
7
8
9
10
11
12
13
val blockManager = new BlockManager(
executorId,
rpcEnv,
blockManagerMaster,
serializerManager,
conf,
memoryManager,
mapOutputTracker,
shuffleManager,
blockTransferService,
securityManager,
externalShuffleClient
)

五、通信

5.1. 注册

$BlockManager.reregister()$ 方法用于向 BlockManagerMaster 注册 BlockManager,并向 BlockManagerMaster 报告所有的 Block 信息

5.1.1. 注册 BlockManager

调用 BlockManagerMaster 的 $reportAllBlocks()$ 方法向 BlockManagerMaster 注册 BlockManager

5.1.2. 汇报块信息

调用 $reportAllBlocks()$ 方法报告所有的 Block 信息,$reportAllBlocks()$ 方法遍历 BlockInfoManager 管理的所有 BlockId 与
BlockInfo 的映射关系,并进行如下操作:

  1. 调用 $getCurrentBlockStatus()$ 方法,获取 Block 的状态信息 BlockStatus。

  2. 如果需要将 Block的 BlockStatus 汇报给 BlockManagerMaster, 则调用 $tryToReportBlockStatus()$ 方法,向 BlockManagerMaster 汇报此 Block 的状态信息。

    向 BlockManagerMaster 汇报 Block 的状态信息是通过调用 BlockManagerMaster 的 $updateBlockinfo()$ 方法完成的。BlockManagerMaster 的 $updateBlockInfo()$ 方法将向 BlockManagerMasterEndpoint 发送 UpdateBlockinfo 消息

六、管理数据块

6.1. 读取

使用 BlockManager 进行写操作时,如 RDD 运行过程中的中间数据,或者执行 persist 操作,会优先将数据写入内存中。如果内存大小不够,将内存中的部分数据写入磁盘;如果 persist 指定了要 replica 会使用 BlockTransferService 将数据复制一份到其他节点的 BlockManager 上去。使用 BlockManager 进行读操作时,如 Shuffle Read 操作,如果能从本地读取,就利用 DiskStore 或
MemoryStore 从本地读取数据:如果本地没有数据,就利用 BlockTransferService 从远程读取数据。

$getLocalBytes()$ 方法用于从存储体系获取 BlockId 所对应 Block 的数据,并封装为 ChunkedByteBuffer 后返回, $getLocalBytes()$ 的执行步骤如下:

  1. 如果当前 Block 是 ShuffleBlock, 那么调用 ShuffleManager 的 ShuffleBlockResolver 组件的 $getBlockData()$ 方法获取 Block 数据, 并封装为 ChunkedByteBuffer 返回。
  2. 如果当前 Block 不是 ShuffleBlock, 那么首先获取 Block 的读锁, 然后调用 $doGetLocalBytes()$ 方法获取 Block 数据。
    • 获取 Block 的存储级别
    • 如果 Block 的存储级别说明 Block 没有被序列化, 那么按照 DiskStore、 MemoryStore 的顺序, 获取 Block 数据
    • 如果 Block 存储级别说明 Block 被序列化了, 那么按照 MemoryStore、 DiskStore 的顺序, 获取 Block 数据

6.2. 写数据

6.2.1. putBytes()