Spark-源码系列-SparkCore-Shuffle-ShuffleRead-Reader-BlockStoreShuffleReader
一、概述
https://blog.csdn.net/u011239443/article/details/56843264
ShuffleReader 实现了下游 Task 如何读取上游 ShuffleMapTask的Shuffle 输出的逻辑,通过 MapOutputTracker 获得数据的位置信息,如果数据在本地则调用BlockManager 的 $getBlockData()$ 读取本地数据。
Shuffle Read 的整体架构如图所示:
$ShuffledRDD.compute()$ 开始~
1 | override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { |
二、获取 ShuffleReader
通过调用 ShuffleManager 的 $getReader()$ 方法,获取到 ShuffleReader,Spark 2.x 之后 ShuffleReader 的实现只有 BlockStoreShuffleReader,然后调用 $BlockStoreShuffleReader.read()$ 方法进行读取~
三、读取数据
3.1. 数据位置获取
MapOutputTracker Map 输出跟踪器。在 Shuffle 过程中,Map 任务通过 ShuffleWrite 阶段产生了中间数据,Reduce 任务进行 Shuffle Read 时需要知道哪些数据位于哪个节点上,以及 Map 输出的状态等信息。MapOutputTracker 负责维护这些信息。
3.2. 初始化块数据迭代器 ShuffleBlockFetcherIterator
数据块的迭代器 ShuffleBlockFetcherIterator 可以从本地或远端获取数据块~
3.3. 聚合
Shuffle read 阶段的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后在内存进行聚合等操作。聚合完一批数据后,再拉取下一批数据,直到最后将所有数据到拉取完,并得到最终的结果。
1 | val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { |
3.4. 排序
在 Shuffle 的 map 阶段会将所有数据进行排序,并将分区的数据写入同一个文件中,在创建数据文件的同时会产生索引文件,来记录分区的大小和偏移量。所以这里产生文件的数量和 reduce 分区就没有关系了,只会产生 2 * M
个临时文件。
Spark 中的外部排序器用于对 map 任务的输出数据在 map 端或 reduce 端进行排序。Spark 中有两个外部排序器,分别是 ExternalSorter 和 ShuffleExternalSorter
外部排序指的是大文件的排序,即待排序的记录存储在外存储器上,待排序的文件无法一次装入内存,需要在内存和外部存储器之间进行多次数据交换,以达到排序整个文件的目的。
1 | val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) |
$BlockStoreShuffleReader.read()$
2.2.1. 初始化 ShuffleBlockFetcherIterator
获取一个包装的迭代器 ShuffleBlockFetcherIterator,它迭代的元素是blockId和这个block对应的读取流,很显然这个类就是实现reduce阶段数据读取的关键
2.2.2. 将原始读取流转换成反序列化后的迭代器
2.2.3. 将迭代器转换成能够统计度量值的迭代器,这一系列的转换和java中对于流的各种装饰器很类似
2.2.4. 中断器
将迭代器包装成能够相应中断的迭代器。每读一条数据就会检查一下任务有没有被杀死,这种做法是为了尽量及时地响应杀死任务的请求,比如从driver端发来杀死任务的消息。
2.2.5. 聚合
利用聚合器对结果进行聚合。这里再次利用了AppendonlyMap这个数据结构,前面shuffle写阶段也用到这个数据结构,它的内部是一个以数组作为底层数据结构的,以线性探测法线性的hash表。
2.2.6. 排序
最后对结果进行排序。
$BlockStoreShuffleFetcher.fetch()$ 会获得数据,它首先会通过 $MapOutputTracker.getServerStatuses()$ 来获得数据的meta信息,这个过程有可能需要向org.apache.spark.MapOutputTrackerMasterActor 发送读请求,这个读请求是在org.apache.spark.MapOutputTracker#askTracker发出的。在获得了数据的meta信息后,它会将这些数据存入Seq[(BlockManagerId,Seq[(BlockId, Long)])]中,然后调用org.apache.spark.storage.ShuffleBlockFetcherIterator最终发起请求。ShuffleBlockFetcherIterator根据数据的本地性原则进行数据获取。如果数据在本地,那么会调用org.apache.spark.storage.BlockManager#getBlockData进行本地数据块的读取。
https://mp.weixin.qq.com/s/i0JmbjNvG2jitkpGLKyGJg
二、实现
目前 ShuffleReader 只有一种实现 BlockStoreShuffleReader,BlockStoresShuffleReader 用于 Shuffle 执行过程中,reduce 任务从其他节点的 Block 文件中读取由起始分区(startPartition)和结束分区(endPartition)指定范围内的数据。
ShuffleBlockFetcherIterator
ShuffleBlockFetcherIterator 是用于获取多个 Block 的迭代器。如果 Block 在本地,那么从本地的 BlockManager 获取;如果 Block 在远端,那么通过 ShuffleClient 请求远端节点上的 BlockTransferService 获取。
2.1.3. 获取数据解析 ShuffleBlockResolver
特质 ShuffleBlockResolver 定义了对 Shuffle Block 进行解析的规范,包括获取 Shuffle 数据文件、获取 Shuffle 索引文件、删除指定的 Shuffle 数据文件和索引文件、生成Shuffle 索引文件、获取 Shuffle 块的数据等。
- IndexShuffleBlockResolver
2.2. 实现
2.2.1. SortShuffleManager
SortShuffleManager 管理基于排序的 Shuffle,输入的记录按照目标分区 ID 排序,然后输出到一个单独的 map 输出文件中。reduce 为了读出 map 输出,需要获取 map 输出文件的连续内容。当 map 的输出数据太大已经不适合放在内存中时,排序后的输出子集将被溢出到文件中,这些磁盘上的文件将被合并生成最终的输出文件。