Spark-理论笔记-Shuffle 概述
一、概述
Shuffle 描述着数据从 map task 输出到 reduce task 输入的这段过程。是连接 Map 和 Reduce 之间的桥梁, Map 的输出要用到 Reduce 中必须经过 shuffle 这个环节.
shuffle 的性能高低直接影响了整个程序的性能和吞吐量。因为在分布式情况下,reduce task 需要跨节点去拉取其它节点上的 map task 结果。这一过程将会产生网络资源消耗和内存,磁盘 IO 的消耗。
1.1. 导致 Shuffle 操作算子
1.1.1. 重分区类的操作
重分区类算子一般会 shuffle,因为需要在整个集群中对之前所有的分区的数据进行随机,均匀的打乱,然后把数据放入下游新的指定数量的分区内。
比如 repartition
、repartitionAndSortWithinPartitions
等
1.1.2. byKey 类的操作
比如 reduceByKey
、groupByKey
、sortByKey
等,对一个 key 进行聚合操作时要保证集群中,所有节点上相同的 key 分配到同一个节点上进行处理
1.1.3. Join 类的操作
比如 join
、cogroup
等。两个 rdd 进行 join,就必须将相同 key 的数据,shuffle 到同一个节点上,然后进行相同 key 的两个 rdd 数据操作。
二、Shuffle 设计
通常 shuffle 分为两部分: Map阶段的数据准备和 Reduce 阶段的数据拷贝处理。
一般将在 map 端的 shuffle 称之为 Shuffle Write, 在Reduce 端的 Shuffle 称之为 Shuffle Read
ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘
ResultStage基本上对应代码中的 action 算子, 即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束
在划分 stage 时, 最后一个 stage 称为 finalStage, 它本质上是一个 ResultStage
通常 shuffle 分为两部分: write 阶段的数据准备和 read 阶段的数据拷贝处理。
三、 Shuffle 实现
Spark 实现了多种 shuffle,通过 spark.shuffle.manager 来确定。暂时总共有三种: hash shuffle、sort shuffle 和 tungsten-sort shuffle
3.1. HashShuffle
3.1.1. 未优化的 HashShuffle
$Write$ 阶段
每个 task 将数据写入内存缓冲中,当内存缓冲填满之后将数据溢写到磁盘文件中去,在溢写过程中,将 key 经过 hash 之后相同的数据写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。
$Read$ 阶段
stage 的每一个 task 将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。
Shuffle read 阶段的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后在内存进行聚合等操作。聚合完一批数据后,再拉取下一批数据,直到最后将所有数据到拉取完,并得到最终的结果。
这种策略的不足在于,下游有几个 task,上游的每一个 task 都就都需要创建几个临时文件,导致了当下游的 task 任务过多的时候,上游会堆积大量的小文件.
- Shuffle 前在磁盘上会产生海量的小文件,此时会产生大量耗时低效的 IO 操作
- 内存不够用,由于内存中需要保存海量文件操作信息和临时信息,如果数据处理的规模比较庞大的话,会出现 OOM 等问题。
3.1.2. 优化之后的 HashShuffle
spark.shuffle.consolidateFiles 默认值为 false,将其设置为 true 即可开启优化机制。
开启之后,在 shuffle write 过程中,task 不为下游 stage 的每个 task 创建一个磁盘文件。第一批并行执行的每个 task 都会创建一个 ShuffleFileGroup,并将数据写入对应的磁盘文件内。当 Executor 接着执行下一批 task 时,下一批 task 就会复用之前已有的 ShuffleFileGroup,包括其中的磁盘文件。而不会写入新的磁盘文件中。
$consolidate$ 机制允许不同的 task 复用同一批磁盘文件,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。
这个功能优点明显,但为什么 Spark 一直没有在基于 Hash Shuffle 的实现中将功能设置为默认选项呢?🤔️: 官方给出的说法是这个功能还欠稳定。
3.1.3. 总结
基于 Hash 的 Shuffle 机制的优缺点:
- 优点
- 避免了排序所需的内存开销。
缺点
生产的文件过多,会对文件系统造成压力。大量小文件的随机读写带来一定的磁盘开销。
数据块写入时所需的缓存空间也会随之增加,对内存造成压力。
3.2. SortShuffle
在 Spark2.0 中,抛弃了基于 Hash 的 Shuffle,只有基于排序的 Shuffle,使用基于排序的 Shuffle 主要解决了 Shuffle 过程中产生过多的文件和 Writer Handler 的缓存开销过大的问题。
在 Sort Based Shuffle 中,每个 Shuffle Map Task 不会为后续的每个任务创建单独的文件, 而是会将所有结果写到同一个文件中,对应生成一个 Index 文件进行索引。通过这种机制避免了大量文件的产生,一方面可以减轻文件系统管理众多文件的压力, 另一方面可以减少 Writer Handler 的缓存所占用的内存大小,节省了内存的同时避免了 GC 的风险和频率。
SortShuffleManager 的运行机制主要分成两种,一种是普通运行机制,另一种是 bypass 运 行 机 制。 当 shuffle read task 的 数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值时, 就会启用 bypass 机制。
3.2.1. 普通 SortShuffle
$Write$ 阶段
Task 将输出数据会先写入一个内存数据结构。
根据不同的 shuffle 算子,可能选用不同的数据结构。
- 聚合类 shuffle 算子: $reduceByKey$,选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存
- join 类 shuffle 算子,会选用 Array 数据结构,直接写入内存。
map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是 5M
设置定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过 5M 时,比如现在内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。
每插入 32 次数据,就会检查一次内存大小,如果内存大小 size 超过 5M 就会申请
(size*2 - 5)m
的空间,如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。1
elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold
如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
在溢写磁盘前,先根据 key 进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为 10000 条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,一个 map task 溢写过程会产生多个临时文件。
map task 执行完成后,将所有的临时文件合并成一个大的磁盘文件 (merge),同时单独写一份索引文件,标识下游各个 task 的数据在文件中的索引: start offset 和 end offset。
$Read$ 阶段
reduce task 去 map 端拉取数据,首先解析索引文件,根据索引文件再去拉取对应的数据。
疑问?🤔️
ShuffleMapTask 产生的结果写到哪里去?
ShuffleMapTask 产生的结果一般写入到本地磁盘,数据存入 shuffle
{shuffleId}-{mapId}-{reduceId}.data
文件中, 然后再本地磁盘建立索引文件 shuffle{shuffleId}{mapId}{reduceId}.index
。- shuffledId 是对该 shuffle 的标识
- mapId 可以理解成该 stage 的 taskId
- reduceId 理解成下游 stage 的 taskId。
task 计算完毕后,会将 shuffleId 和 blockManagerId 注册到 driver 中的 MapOutputTracker,MapOutPutTracker 的作用是为每个 shuffle 准备其所需要的所有 map out,可以加速 map outs 传送给 shuffle 的速度。
ResultTask怎么知道去哪里读去数据?
ResultTask 会主动去向 driver 发送查询请求,通过 shuffleId 获取该 Seq[BlockManagerId],然后通过该 ResultTask 的 taskId,从各个 BlockManagerId 获取数据
3.2.2. bypass SortShuffle
Reducer 端任务数比较少的情况下,基于 Hash 的 Shuffle 实现比基于 Sort 的 Shuffle 实现机制要快,因此基于 Sort Shuffle 实现机制提供了一个回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性 spark.shuffle.sort.bypassMergeThreshold 设置的个数时,使用带 Hash 风格的回退计划。
此时 Task 会为每个下游 Task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的hash 值,将 key 写入对应的磁盘文件之中。
该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,都要创建数量惊人的磁盘文件,但 bypass SortShuffle 在最后会做一个磁盘文件的合并,因此产生少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
而该机制与普通 SortShuffleManager 运行机制的不同在于
- 磁盘写机制不同
- 不会进行排序
启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
触发条件
- shuffle map task 的数量小于 spark.shuffle.sort.bypassMergeThreshold 参数的值[默认200]
- 不是聚合类的 shuffle 算子,比如: $groupByKey$
3.2.3. 总结
基于 Sort 的 Shuffle 机制的优缺点:
优点
小文件的数量大量减少,Mapper 端的内存占用变少
Spark 不仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。
缺点
如果 Mapper 中 Task 的数量过大,依旧会产生很多小文件,此时在 Shuffle 传数据的过程中到 Reducer 端, Reducer 会需要同时大量地记录进行反序列化,导致大量内存消耗和 GC 负担巨大,造成系统缓慢,甚至崩溃;
强制了在 Mapper 端必须要排序,即使数据本身并不需要排序;基于记录本身进行排序,这就是 Sort-Based Shuffle 最致命的性能消耗。
3.3. Tungsten-sort
Spark 在 1.4 以后可以通过 spark.shuffle.manager = tungsten-sort
开启 Tungsten-sort shuffle
Tungsten-sort 是对普通 sort 的一种优化,排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的gc的开销。
对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort
方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle
方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。
因此,当设置了 spark.shuffle.manager=tungsten-sort
时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。
要实现 Tungsten Sort Shuffle 机制需要满足以下条件:
- Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。
- Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。
- Shuffle 过程中的输出分区个数少于 16777216 个。
实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB。另外,分区个数的限制也是该内存模型导致的。
所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。
直接在序列化的二进制数据上操作,不再需要反序列化。它用了不安全( sun.misc.Unsafe )内存拷贝函数来直接拷贝数据,它可以更方便在序列化数据上操作,因为数据只是字节数组而已。
使用 cache 高效的 ShuffleExternalSorter 排序器来对压缩记录指针和分区 id 数组排序。通过在排序数组中每条记录使用 8 字节空间可以提升 cache 效率。
因为记录不再反序列化,溢写(内存中存储的本身就是序列化数据,溢出操作时直接将内存中序列化数据写入文件)序列化数据可以直接操作。
当 shuffle 压缩编码方式支持序列化流级联时,自动使用额外的溢写合并优化。当前 Spark 的 LZF 序列化器支持,并且需要 shuffle.unsafe.fastMergeEnabled 被开启。
从 Spark-1.6.0 开始,把 Sort Shuffle 和 Tungsten-Sort Based Shuffle 全部统一到 Sort Shuffle 中,如果检测到满足 Tungsten-Sort Based Shuffle 条件会自动采用 Tungsten-Sort Based Shuffle,否则采用 Sort Shuffle。从Spark-2.0.0开始,Spark 把 Hash Shuffle 移除,可以说目前 Spark-2.0 中只有一种 Shuffle,即为 Sort Shuffle。
四、Spark Shuffle 调优
大多数 Spark 作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对 shuffle 过程进行调优。
4.1. Shuffle 相关参数调优
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如60次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。spark.shuffle.io.retryWait
默认值:5s
参数说明:该参数代表了每次重试拉取数据的等待间隔,默认是 5s。
调优建议:建议加大间隔时长(比如60s),以增加 shuffle 操作的稳定性。
五、总结
Spark 中是不要求在 reduce 端进行排序的,生成 Shuffle 的结果文件并不要求排序,但是因为 Spill 到文件中后,有可能相同的 Key 会分布在不同的文件中,所以需要对不同的文件进行相同的Key的值的计算。如果 Spill 到文件是乱序的,那代表在最后生成 Shuffle 结果的时候,还是要 Load 所有文件才能确定哪些 Key 是重复的需要做合并,这样依然面对着内存不够的情况。
Spark 假定大多数情况下 Shuffle 的数据不需要排序,例如 Word Count,强制排序反而会降低性能。因此不在 Shuffle Read 时做 Merge Sort,如果需要合并的操作的话,则会使用聚合(agggregator),即用了一个 HashMap (实际上是一个 AppendOnlyMap)来将数据进行合并。
5.1. Spark Shuffle vs MR Shuffle
Hadoop 的有一个 Map 完成,Reduce 便可以去 fetch 数据了,不必等到所有Map任务完成,而 Spark 的必须等到父stage完成,也就是父stage的map操作全部完成才能去fetch数据。
Spark一定是先完成Mapper端所有的Tasks才会进行Reducer端的Shuffle过程的。Spark的Job是按照stage线性执行的,前面的stage必须执行完成才能够执行后面的Reducer的Shuffle过程。为了更好的保证血缘关系,重新计算丢失数据
为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据,因此map和reduce是交叉进行的
Hadoop 的Shuffle是 sort-base 的,那么不管是 Map 的输出,还是 Reduce 的输出,都是 partion 内有序的,而 spark 不要求这一点
spark早期版本采用的是AppendOnlyMap来实现shuffle reduce阶段数据的聚合,当数据量不大时没什么问题,但当数据量很大时就会占用大量内存,最后可能OOM。所以从spark 0.9开始就引入了ExternalAppendOnlyMap来代替AppendOnlyMap。
Spark中Sorted-Based Shuffle在Mapper端是进行排序的,包括partition的排序和每个partition内部元素进行排序。但是在Reducer端没有进行排序,
所以job的结果默认情况下不是排序的。 Sorted-Based Shuffle 采用Tim-Sort排序算法,好处是可以极为高效的使用Mapper端的排序成果完成全局排序。
Hadoop的 Reduce 要等到 fetch 完全部数据,才将数据传入 reduce 函数进行聚合,而 spark 是一边 fetch 一边聚合。
Spark的Shuffle是边拉取数据边进行Aggregate操作的(Hadoop不是这样的),其实与Hadoop MapReduce相比其优势确实是在速度上