一、概述

MapOutputTracker 用于跟踪 Map 任务的输出状态,此状态便于 Reduce 任务定位 Map 任务输出结果所在的节点地址,进而获取中间输出结果。每个Map 任务或者 Reduce 任务都会有其唯一标识,分别为 mapId 和 reduceId。每个 Reduce 任务的输入可能是多个 Map 任务的输出,Reduce 会到各个 Map 任务所在的节点上拉取 Block(Shuffle)。每次 Shuffle 都有唯一的标识 shuffleId。

https://blog.csdn.net/LINBE_blazers/article/details/88919759

二、实现

https://segmentfault.com/a/1190000040586936

https://masterwangzx.com/2020/09/22/schedule-mapTrack/#mapoutputtrackermaster

其功能有三方面:

  1. DAGScheduler 使用 MapOutputTrackerMaster 来管理各个 ShuffleMapTask 的输出数据 MapStatus
  2. 根据各 ShuffleMapTask 结果的统计信息来进行尽可能的本地化 Scheduler
  3. ShuffleMapStage 使用 MapOutputTrackerMaster 来判断是否要进行 ShuffleMapTask 的计算
  4. ShuffleReduce 任务用来获取从哪些 executor 获取需要的 map 输出数据,读取数据进行处理。

2.1. MapOutputTracker

无论是 MapOutputTrackerMaster 还是 MapOutputTrackerWorker,它们都继承自抽象类 MapOutputTracker。MapOutputTracker 内部定义了任务输出跟踪器的规范。

2.2.1. 属性

  • trackerEndpoint: 用于持有 Driver 上 MapOutputTrackerMasterEndpoint 的 RpcEndpointRef

  • epoch: 用于 Executor 故障转移的同步标记。

    每个 Executor 在运行的时候会更新 epoch,潜在的附加动作将清空缓存;当 Executor 丢失后增加 epoch

  • epochLock: 用于保证 epoch 变量的线程安全性

2.2.2. 方法

  1. $askTracker()$

    向 MapOutputTrackerMasterEndpoint 发送消息,并期望在超时时间之内得到回复

    1
    2
    3
    4
    5
    6
    7
    protected def askTracker[T: ClassTag](message: Any): T = {
    try {
    trackerEndpoint.askSync[T](message)
    } catch {
    case e: Exception => ...
    }
    }
  2. $getMapSizesByExecutorId()$

    通过 shuffleId 和 reduceId 获取存储了 reduce 所需的 map 中间输出结果的 BlockManager 的BlockManagerId,以及 map 中间输出结果每个 Block 块的 BlockId 与大小

2.2. MapOutputTrackerMaster

2.2.1. 属性

  1. shuffleStatuses

    1
    val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
  2. mapOutputTrackerMasterMessages

    1
    private val mapOutputTrackerMasterMessages = new LinkedBlockingQueue[MapOutputTrackerMasterMessage]
  3. pushBasedShuffleEnabled

    1
    private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver = true)
  4. availableProcessors

    1
    private val availableProcessors = Runtime.getRuntime.availableProcessors()

2.2.2. 方法

  1. $getStatistics()$

    获取 shuffle 依赖的各个 map 输出 Block 大小的统计信息,

2.3. MapOutputTrackerWorker

2.3.1. 属性

  1. mapStatuses

    用于维护各个 map 任务的输出状态。类型为 Map[Int,Array[MapStatus]],其中 key 对应 shuffleId,Array 存储各个 map 任务对应的状态信息 MapStatus。

    各个 MapOutputTrackerWorker 会向 MapOutputTrackerMaster 不断汇报 map 任务的状态信息,因此 MapOutputTrackerMaster 的 mapStatues 中维护的信息是最新最全的。MapOutputTrackerWorker 的 mapStatuses 对于本节点 Executor 运行的 map 任务状态是及时更新的,而对于其它节点上的 map 任务状态则更像一个缓存,在 mapStatuses 不能命中时会向 Driver 的 MapOutputTrackerMaster 获取最新的任务状态信息。

    1
    val mapStatuses: Map[Int, Array[MapStatus]] = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
  2. mergeStatuses

    1
    val mergeStatuses: Map[Int, Array[MergeStatus]] = new ConcurrentHashMap[Int, Array[MergeStatus]]().asScala
  3. fetchMergeResult

    1
    private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
  4. shufflePushMergerLocations

    1
    val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala
  5. fetchingLock:

2.3.2. 方法

  1. $updateEpoch()$

    Executor 运行出现故障时,Master 会再分配其它 Executor 运行任务,此时会调用 $updateEpoch$ 方法更新纪元,并且清空 mapStatuses

  2. $getStatuses()$

    根据 ShuffleId 获取 MapStatus 的数组

三、初始化

MapOutputTracker 是 SparkEnv 的主要组件之一

1
2
3
4
5
6
7
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

针对当前实例是 Driver 还是 Executor,创建 mapOutputTracker 的方式有所不同:

  1. 如果当前应用程序是 Driver,则创建 MapOutputTrackerMaster,然后创建 MapOutputTrackerMasterEndpoint,并且注册到 Dispatcher中,注册名为 MapOutputTracker
  2. 如果当前应用程序是 Executor,则创建 MapOutputTrackerWorker,并从远端 Driver 实例的 NettyRpcEnv 的 Dispatcher 中查找MapOutputTrackerMasterEndpoint 的引用

无论是 Driver 还是 Executor,最后都由 mapOutputTracker 的属性 trackerEndpoint 持有 MapOutputTrackerMasterEndpoint 的引用

四、ShuffleWrite 端

  1. 当 map task 执行完成后,会将 task 的执行情况和磁盘小文件的地址封装到 MapStatus 对象中通过 MapOutputTrackerWorker 对象向 Driver 中的 MapOutputTrackerMaster 汇报。
  2. 在所有的 map task 执行完毕后,Driver 中就掌握了所有的磁盘小文件的地址。

五、ShuffleRead 端

5.1. 获取 MapStatus

在 $ShuffleRDD.compute()$ 方法中,会获取 BlockStoreShuffleReader,然后在 BlockStoreShuffleReader 中,调用$mapOutputTracker.getMapSizesByExecutorId()$ 方法获取一组二元组序列 Seq[(BlockManagerId, Seq[(BlockId, Long)])],第一项代表了BlockManagerId,第二项描述了存储于该 BlockManager 上的一组 shuffle blocks。

5.1.1. getMapSizesByExecutorId()

$getMapSizesByExecutorId()$ 通过 shuffleId 和 reduceId 获取存储了 reduce 所需的 map 中间输出结果的 BlockManager 的BlockManagerId,以及 map 中间输出结果每个 Block 块的 BlockId 与大小

5.1.2. getStatuses()

$getMapSizesByExecutorId()$ 调用 $getStatuses()$ 方法获取 MapStatus 集合,然后最后返回 MapStatus 集合。

由于多个线程可能会并发访问 fetching,因此使用 fetching 的锁进行同步控制,以确保程序在并发下的安全性。

  1. MapOutputTrackerWorker

    • MapOutputTrackerWorker 首先从当前 Executor 中的 MapOutputTracker 的 mapStatuses 缓存中,获取 MapStatus 数组,如果没有则向远端 Driver 中的 MapOutputTranckerMaster 去获取任务状态信息。

    • 如果 shuffle 获取集合(即 fetching)中已经存在要取的 shuffleId(说明已经有其它线程对此 shuffleId 的数据进行远程拉取 ,那么就等待其它线程获取。等待会一直持续,直到 fetching 中不存在要取的 shuffleId(说明其它线程对此 shuffleId 的数据进行远程拉取的操作已经结束),并再次从 mapStatuses 缓存中获取 MapStatus 数组,此时如果获取到 MapStatus 数组,则继续第5步

    • 如果 fetching 中不存在要取的 shuffleId,那么当前线程需要将 shuffleId 加入 fetching,以表示已经有线程对此 shuffleId 的数据进行远程拉取了

    • 调用 $askTracker$ 方法向 MapOutputTrackerMasterEndpoint 发送 GetMapOutputStatuses 消息,以获取 map 任务的状态信息。

  2. MapOutputTrackerMaster

    MapOutputTrackerMasterEndpoint 接收到 GetMapOutputStatuses 消息后,将 GetMapOutputStatuses 消息转换为GetMapOutputMessage 消息,放入 mapOutputRequests 队列。

    • 异步线程 MessageLoop 从队列中取出 GetMapOutputMessage
    • 从 shuffledIdLocks 数组中取出与当前 GetMapOutputMessage 携带的 shuffleId 相对应的锁
    • 从 cachedSerializedStatuses 缓存中获取 shuffleId 对应的序列化任务状态信息
    • 当 cachedSerializedStatuses 中没有 shuffleId 对应的序列化任务状态信息,则获取 mapStatuses 中缓存的 shuffleId 对应的任务状态数组
    • 将任务状态数组进行序列化,然后使用 BroadcastManager 对序列化的任务状态进行广播
    • 将序列化的任务状态放入 cachedSerializedStatuses 缓存中
    • 将广播对象放入 cachedSerializedBroadcast 缓存中
    • 将请求的 map 任务状态信息序列化后返回给请求方。
    • 将获得的序列化任务状态信息,通过回调 GetMapOutputMessage 消息携带的 RpcCallContext 的 $reply()$ 方法回复客户端

    客户端接收到 map 任务状态信息后,调用 MapOutputTracker 的 $deserializeMapStatuses()$ 方法对 map 任务状态进行反序列化操作,然后放入本地的 mapStatuses 缓存中。

    这次拉取可能成功,也可能失败,所以拉取结束后无论如何都需要将 shuffleId 从 fetching 中移除,并唤醒那些在 fetching 的锁上等待的线程,以便这些线程能够获取自己需要的 MapStatus 数组。

    返回得到的 MapStatus 数组。

  3. 最后根据执行的分区范围 [startPartition, endPartition] 将返回的结果 Array[MapStatus] 转换成 Seq[(BlockManagerId, Seq[(BlockId, Long)])]

  4. 拉取对应的 Block 块

    利用这个 Seq[(BlockManagerId, Seq[(BlockId, Long)])],去指定的 BlockManager 中去拉取对应的 Block 块的数据用来迭代计算