https://mp.weixin.qq.com/s/wDKB1IFOS4v1gUuiUDuLAA

https://mp.weixin.qq.com/s/UaVnI6yNAwIc2oPIedrFvg

https://mp.weixin.qq.com/s/IvgdE41TtkjnXHxQkj0ToQ

一、概述

当使用 BypassMergeSortShuffleWriter 的条件不满足时,SortShuffleManager 会继续考虑 UnsafeShuffleWriter,它是 Tungsten 支持的,与基本 SortShuffleWriter 相比内存效率高。

使用 UnsafeShuffleWriter 的条件:

  • Shuffle 依赖不带有聚合(aggregation)操作
  • 支持序列化值的重新定位,即使用 KryoSerializer 或者 SparkSQL 自定义的一些序列化方式
  • 分区数目必须小于 (2^24)

由于 UnsafeShuffleWriter 排序的是二进制的数据,不会进数据进行反序列,所以不能进行聚合操作,另一方面 PartitionId 是占用 24 位的数,所以要小于 16777216

二、实现

UnsafeShuffleWriter 的实现在一定程度上是 Tungsten 内存管理优化的的主要应用场景。其实现过程实际上和 SortShuffleWriter 是类似的,但是其中维护和执行的数据结构是不一样的~

三、初始化 Writer

四、Write

UnsafeShuffleWriter 具体的实现也是先从对应类的 $write()$ 函数中~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// 使用 success 记录 write 是否成功,判断是 write 阶段的异常还是 clean 阶段
boolean success = false;
try {
while (records.hasNext()) {
// 遍历所有的数据插入 ShuffleExternalSorter
insertRecordIntoSorter(records.next());
}
// close 排序器使所有数据写出到磁盘,并将多个溢写文件合并到一起
closeAndWriteOutput();
success = true;
} finally {
if (sorter != null) {
try {
// 清除并释放资源
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during cleanup.", e);
}
}
}
}
}

3.1. 数据写入内存缓存区

https://mp.weixin.qq.com/s/ebRfhNQjlvu56TZ_3BePEQ

3.1.1. 聚合&排序

UnsafeShuffleWriter 底层使用 ShuffleExternalSorter 作为外部排序器,所以 UnsafeShuffleWriter 不具备 SortShuffleWriter 的聚合功能。UnsafeShuffleWriter 将使用 Tungsten 的内存作为缓存,以提高写入磁盘的性能。

https://mp.weixin.qq.com/s/eTbZcB5lHL1TDFCLD_C_nw