Spark-源码系列-SparkCore-Shuffle-ShuffleWrite-Writer-UnsafeShuffleWriter
https://mp.weixin.qq.com/s/wDKB1IFOS4v1gUuiUDuLAA
https://mp.weixin.qq.com/s/UaVnI6yNAwIc2oPIedrFvg
https://mp.weixin.qq.com/s/IvgdE41TtkjnXHxQkj0ToQ
一、概述
当使用 BypassMergeSortShuffleWriter 的条件不满足时,SortShuffleManager 会继续考虑 UnsafeShuffleWriter,它是 Tungsten 支持的,与基本 SortShuffleWriter 相比内存效率高。
使用 UnsafeShuffleWriter 的条件:
- Shuffle 依赖不带有聚合(aggregation)操作
- 支持序列化值的重新定位,即使用 KryoSerializer 或者 SparkSQL 自定义的一些序列化方式
- 分区数目必须小于 (2^24)
由于 UnsafeShuffleWriter 排序的是二进制的数据,不会进数据进行反序列,所以不能进行聚合操作,另一方面 PartitionId 是占用 24 位的数,所以要小于 16777216
二、实现
UnsafeShuffleWriter 的实现在一定程度上是 Tungsten 内存管理优化的的主要应用场景。其实现过程实际上和 SortShuffleWriter 是类似的,但是其中维护和执行的数据结构是不一样的~
三、初始化 Writer
四、Write
UnsafeShuffleWriter 具体的实现也是先从对应类的 $write()$ 函数中~
1 | public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException { |
3.1. 数据写入内存缓存区
https://mp.weixin.qq.com/s/ebRfhNQjlvu56TZ_3BePEQ
3.1.1. 聚合&排序
UnsafeShuffleWriter 底层使用 ShuffleExternalSorter 作为外部排序器,所以 UnsafeShuffleWriter 不具备 SortShuffleWriter 的聚合功能。UnsafeShuffleWriter 将使用 Tungsten 的内存作为缓存,以提高写入磁盘的性能。