一、概述

Spark 中的外部排序器用于对 map 任务的输出数据在 map 端或 reduce 端进行排序。

二、实现

Spark 中有两个外部排序器,分别是 ExternalSorter 和 ShuffleExternalSorter

2.1. ExternalSorter

ExternalSorter 是 SortShuffleManager 的底层组件,它提供了很多功能,包括将 map 任务的输出存储到 JVM 的堆中,如果指定了聚合函数,则还会对数据进行聚合;使用分区计算器首先将 Key 分组到各个分区中,然后使用自定义比较器对每个分区中的键进行可选的排序;可以将每个分区输出到单个文件的不同字节范围中,便于 reduce 端的 Shuffle 获取。

2.1.1. 设计

  1. 属性

  2. 方法

    1. map 端输出的缓存处理: $insertAll()$

      map 任务在执行结束后会将数据写入磁盘,等待 reduce 任务获取。但在写入磁盘之前,Spark 可能会对 map 任务的输出在内存中进行一些排序和聚合。ExternalSorter 的 $insertAll()$ 方法是这一过程的入口。

      • 如果用户指定了聚合器,则执行如下操作:

        1. 获取聚合器的 $mergeValue$ 函数: 用于将新的 value 合并到聚合的结果中
        2. 获取聚合器的 $createCombiner$ 函数(此函数用于创建聚合的初始值)。
        3. 定义偏函数 $update$: 是当有新的 value 时(即 hadValue 为 true),调用 $mergeValue$ 函数将新的 value 合并到之前聚合的结果中,否则说明刚刚开始聚合,此时调用 $createCombiner$ 函数以 value 作为聚合的初始值。
        4. 迭代输入的记录,首先调用父类 Spillable 的 $addElementsRead$ 方法增加已经读取的元素数,然后对每个 scala.Product2[K, V] 的 key通过调用 $getPartition$ 方法计算分区索引 (ID),并将分区索引与 key 作为调用 AppendOnlyMap 的 $changeValue$ 方法的参数 key,以偏函数 $update$ 作为 $changeValue$ 方法的参数 updateFunc,对由分区索引与 key 组成的对偶进行聚合。最后调用 $maybeSpillCollection$ 方法进行可能的磁盘溢出。
      • 如果用户没有指定聚合器,则对迭代器中的记录进行迭代,并在每次迭代过程中执行如下操作:

        1. 调用父类 Spillable 的 $addElementsRead$ 方法增加已经读取的元素数。
        2. 对每个 scala.Product2[K, V] 的 key 通过调用 $getPartition$ 方法计算分区索引(ID),并将分区索引、key 及 value 作为调用PartitionedPairBuffer 的 $insert$ 方法的参数。
        3. 调用 $maybeSpillCollection$ 方法进行可能的磁盘溢出。
    2. 缓存溢出: $maybeSpillCollection()$

      ExternalSorter 使用了 AppendOnlyMap 和 PartitionedPairBuffer

      ExternalSorter 的 map 属性的类型为 PartitionedAppendOnlyMap[K, C], buffer 属性的类型为 PartitionedPairBuffer[K, C]

      大数据场景下,数据全部都放入内存很容易引起系统的 OOM 问题。另一方面,map 任务的输出需要写入磁盘,磁盘写入频率过高会因为大量的磁盘 I/O 降低效率,那么何时才应该将内存中的数据写入到磁盘呢🤔️?

      Spark 为了解决这两个问题,提供了$maybeSpillCollection()$ 方法,以判断何时将内存中的数据写入磁盘。

      1. 如果 ExternalSorter 正在使用的数据结构是 PartitionedAppendOnlyMap,对 PartitionedAppendOnlyMap 的大小进行估算。调用$maybeSpill$ 方法时,如果的确将 PartitionedAppendOnlyMap中 的数据溢出到了磁盘上,那么重新创建 PartitionedAppendOnlyMap。
      2. 如果 ExternalSorter 正在使用的数据结构是 PartitionedPairBuffer,对 PartitionedPairBuffer 的大小进行估算。调用 $maybeSpill$ 方法时,如果的确将 PartitionedPairBuffer 中的数据溢出到了磁盘上,那么重新创建 PartitionedPairBuffer。
      3. 如果估算的大小超过了 ExternalSorter 已经使用的内存大小的峰值 _peakMemoryUsedBytes,那么将 ExternalSorter 的_peakMemoryUsedBytes 修改为估算的大小。
    3. $maybeSpill()$

      $maybeSpill$ 方法用于将 PartitionedAppendOnlyMap 或 PartitionedPairBuffer 底层的数据溢出到磁盘

      • 如果当前集合已经读取的元素数是32的倍数,并且集合当前的内存大小(current-Memory) 大于等于 myMemoryThreshold,那么执行以下操作:

        1. 调用 $acquireMemory$ 方法,为当前任务尝试获取 2 * currentMemory - myMemory - Threshold 大小的内存,并获得实际获得的内存大小granted
        2. 将 granted 累加到 myMemoryThreshold
        3. 判断是否应该进行溢出,即 currentMemory 是否大于等于 myMemoryThreshold。如果 currentMemory 还是大于等于 myMemoryThreshold,说明TaskMemoryManager 已经没有多余的内存可以分配了,此时应该进行溢出。
      • 如果需要溢出或者 _elementsRead 大于 numElementsForceSpillThreshold,那就需要溢出。溢出执行的操作如下:

        1. _spillCount 加一
        2. 调用 $spill$ 方法将集合中的数据溢出到磁盘。
        3. 溢出后处理,包括已读取元素数(_elementsRead)归零,已溢出内存字节数(_memoryBytesSpilled),增加当前集合的大小 currentMemory,释放 ExternalSorter 占用的内存。
      • 返回是否进行了溢出

    4. map 端输出持久化: $writePartitionedFile()$

      数据仅仅存放在内存中,存在着丢失的风险。ExternalSorter 的 $writePartitionedFile$ 方法用于持久化计算结果。

2.2. ShuffleExternalSorter

ShuffleExternalSorter 是专门用于对 Shuffle 数据进行排序的外部排序器,用于将 map 任务的输出存储到 Tungsten 中;在记录超过限制时,将数据溢出到磁盘。

与 ExternalSorter 不同,ShuffleExternalSorter 本身并没有实现数据的持久化功能,具体的持久化将由 ShuffleExternalSorter 的调用者UnsafeShuffleWriter 来实现。