Spark-源码系列-SparkCore-Shuffle-ShuffleWrite
一、概述
https://mp.weixin.qq.com/s/wDKB1IFOS4v1gUuiUDuLAA
https://mp.weixin.qq.com/s/0zlqviF1lzUOrcBiVePLxA
https://www.ngui.cc/el/775149.html?action=onClick
https://maimai.cn/article/detail?fid=1752477164&efid=vgKPFrpp6HvV0pZtHvL3ug
Shuffle 发生与宽依赖的 Stage 间,由于 Stage 内的计算采用 $pipeline$ 的方式。Shuffle 发生的上一个 Stage 为 Map 节点,下游的stage为 Reduce 阶段。而 Shuffle 写的过程就发生在 Map 阶段,ShuffleWriter 的调用主要是在 ShuffleMapStage 中,每个 ShuffleMapStage 包含多个 ShuffleMapTask, mapTask 个数和分区数相关。
每个 ShuffleMapTask 都会在其 $runTask()$ 调用 ShuffleWriter 接口,其并非直接调用到具体的执行类。而是在划分宽依赖时想 ShuffleManager注册 shuffle 时,返回的 ShuffleHandler 决定的。
在 ShuffleMapTask 调用 Writer 时,是先调用了 ShuffleWriteProcessor ,主要控制了 ShuffleWriter 的生命周期
二、获取 ShuffleWriter
ShuffleWriter 负责将 Map 任务的输出,写出到 Shuffle 系统的文件中。在基于排序的 Shuffle 框架中,ShuffleWriter 会合并文件,它为每个 Map Task生成一个数据文件和一个索引文件。
三、Write
ShuffleWriter 定义的 $write()$ 方法用于将 map 任务的结果写到磁盘,而 $stop()$ 方法可以关闭 ShuffleWriter。ShuffleWriter一共有三个子类,分别为SortShuffleWriter、UnsafeShuffleWriter 及 BypassMergeSortShuffleWriter。
四、获取 MapStatus
ShuffleWriter 结束后,返回 MapStatus,MapStatus 封装了 ShuffleMapTask 计算后的数据存储位置。
1 | val mapStatus = writer.stop(success = true) |
Mapstatus 包括任务运行的块管理器地址和对应每个 reducer 的输出大小。如果partitions 的数量大于 2000,则用 HighlyCompressedMapStatus,否则用 CompressedMapStatus。
五、Push-based Shuffle
如果 ShuffleWriter 执行成功,初始化 push-based shuffle
如果发现该 Shuffle 阶段没有对应的 Merge server,从 MapOutputTracker 去获取对应的 ShufflePushMergerLocation