Spark-源码系列-SparkCore-Shuffle-Shuffle 管理器-SortShuffleManager
一、概述
SortShuffleManager 管理基于排序的 Shuffle,输入的记录按照目标分区 ID 排序,然后输出到一个单独的 map 输出文件中。reduce 为了读出 map 输出,需要获取 map 输出文件的连续内容。当 map 的输出数据太大已经不适合放在内存中时,排序后的输出子集将被溢出到文件中,这些磁盘上的文件将被合并生成最终的输出文件。
二、注册 Shuffle
Driver 和每个 Executor 都会持有一个 ShuffleManager,这个 ShuffleManager 可以通过配置项 spark.shuffle.manager 指定,并且由 SparkEnv 创建。Driver 中的 ShuffleManager 负责注册 Shuffle 的元数据,比如 shuffleId、MapTask 的数量等。Executor 中的 ShuffleManager 则负责读和写 Shuffle 的数据。
三、获取 ShuffleWriter
ShuffleWriter 负责将 Map 任务的输出,写出到 Shuffle 系统的文件中。在基于排序的 Shuffle 框架中,ShuffleWriter 会合并文件,它为每个 Map Task生成一个数据文件和一个索引文件。
- https://blog.51cto.com/u_15067227/2573455
- https://blog.csdn.net/Christopher_L1n/article/details/122903200
抽象类 ShuffleWriter 定义了将 map 任务的中间结果输出到磁盘上的功能规范,包括将数据写入磁盘和关闭 ShuffleWriter。
ShuffleWriter 定义的 $write()$ 方法用于将 map 任务的结果写到磁盘,而 $stop()$ 方法可以关闭 ShuffleWriter。ShuffleWriter一共有三个子类,分别为SortShuffleWriter、UnsafeShuffleWriter 及 BypassMergeSortShuffleWriter。
![](https://note3.oss-cn-hangzhou.aliyuncs.com/source/%E6%88%AA%E5%B1%8F2023-07-26%2000.04.24.png)
https://weread.qq.com/web/reader/0c832fb05e12e40c8ca6190k2a3327002582a38a4a932bf
3.1. 概述
3.1.1. SortShuffleWriter
SortShuffleWriter 是 ShuffleWriter 的实现类之一,提供了对 Shuffle 数据的排序功能。SortShuffleWriter 使用 ExternalSorter 作为排序器,由于ExternalSorter 底层使用了 PartitionedAppendOnlyMap 和 PartitionedPairBuffer 两种缓存,因此 SortShuffleWriter 还支持对 Shuffle 数据的聚合功能。
SortShuffleWriter 主要委托 ExternalSorter 做数据插入,排序,归并(Merge),聚合 (Combine)以及最终写数据和索引文件的工作。
3.1.2. BypassMergeSortShuffleWriter
BypassMergeSortShuffleWriter 适用于 map 端不需要在持久化数据之前进行聚合、排序等操作的场景。
3.1.3. UnsafeShuffleWriter
UnsafeShuffleWriter 底层使用 ShuffleExternalSorte 作为外部排序器,所以 UnsafeShuffleWriter 不具备 SortShuffleWriter 的聚合功能。UnsafeShuffleWriter 将使用 Tungsten 的内存作为缓存,以提高写入磁盘的性能。
四、获取 ShuffleReader
BlockStoresShuffleReader 用于 Shuffle 执行过程中,reduce 任务从其他节点的 Block 文件中读取由起始分区(startPartition)和结束分区(endPartition)指定范围内的数据。
目前 ShuffleReader 只有一种实现 BlockStoreShuffleReader,BlockStoresShuffleReader 用于 Shuffle 执行过程中,ReduceTask 从其他节点的 Block 文件中读取由起始分区(startPartition)和结束分区(endPartition)指定范围内的数据。
![](https://note3.oss-cn-hangzhou.aliyuncs.com/source/image-20230902192351230.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_20)
获取数据解析 ShuffleBlockResolver
特质 ShuffleBlockResolver 定义了对 Shuffle Block 进行解析的规范,包括获取 Shuffle 数据文件、获取 Shuffle 索引文件、删除指定的 Shuffle 数据文件和索引文件、生成Shuffle 索引文件、获取 Shuffle 块的数据等。
- IndexShuffleBlockResolver