一、概述

Blockmanager 读取数据时,能从本地获取,则从 Diskstore 或 Memorystore 读取,没有就用使用块传输服务在不同节点间进行数据块传输~

二、设计

2.1. 传输服务客户端 BlockStoreClient

BlockStoreClient 接口用于从 Executor 或外部服务读取 shuffle 文件和 RDD 块

2.1.1. BlockTransferService

BlockTransferService 用于在不同节点之间传输数据块,NettyBlockTransferService 是一个基于 netty 实现的数据传输服务,NettyBlockTransferService 在 SparkEnv 初始化时创建、在 BlockManager 中初始化。

  1. 初始化 $init()$

    NettyBlockTransferService 只有在其 $init()$ 方法被调用,即被初始化后才提供服务~

  2. 上传数据块

    $NettyBlockTransferService.uploadBlock()$ 方法利用 NettyBlockTransferService 中创建的 Netty 服务,上传 Block 到远程Executor

    NettyBlockTransferService 上传 Block 的步骤如下:

    1. 创建 Netty 服务的客户端,客户端连接的 hostname 和 port 是随机选择的 BlockManager 的 hostname 和 port。
    2. 将 Block 的存储级别 StorageLevel 序列化。
    3. 将 Block 的 ByteBuffer 转化为数组,便于序列化。
    4. 将 appId、execId、blockId、序列化的 StorageLevel、转换为数组的 Block 封装为 UploadBlock,并将 UploadBlock 序列化为字节数组。
    5. 最终调用 Netty 客户端的 $sendRpc()$ 方法将字节数组上传,回调函数 RpcResponseCallback 根据 RPC 的结果更改上传状态。
  3. 获取远程数据块

    $NettyBlockTransferService.fetchBlocks()$ 方法利用 NettyBlockTransferService 中创建的 Netty 服务,获取远程数据块。

2.1.2. ExternalBlockStoreClient

ExternalBlockStoreClient 用来从外部 service 上传下载 RDD/SHuffle Block 数据的服务~

NettyBlockTransferService 和 ExternalBlockStoreClient 都是 BlockStoreClient 的子类,因此功能上类似,只是连接的 Server 不一样而已~

2.2. NettyBlockRpcServer

NettyBlockTransferService 与 NettyRpcEnv 的最大区别是使用的 RpcHandler 的实现类不同,NettyRpcEnv 采用了 NettyRpcHandler,而NettyBlockTransferService 则采用了 NettyBlockRpcServer。

当 map 任务与reduce 任务处于不同节点时,reduce 任务需要从远端节点下载 map 任务的中间输出,因此 NettyBlockRpcServer 提供打开,即下载 Block 文件的功能:一些情况下,为了容错,需要将 Block 的数据备份到其他节点上,所以 NettyBlockRpcServer 还提供了上传 Block 文件的 RPC 服务