一、概述

SparkContext 初始化的最后一个组件: 上下文清理器 ContextCleaner。它扮演着 Spark Core 中垃圾收集器的角色。Spark 运行的时候,会产生一堆临时文件和数据,比如 Shuffle 的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用数据,最终造成大数据集群崩溃而死。

二、设计

ContextCleaner 是 Spark 应用中的垃圾收集器,负责应用级别的 shuffles,RDDs,broadcasts,accumulators 及 checkpointedRDD 文件的清理,用于减少内存及磁盘存储的压力。

ContextCleaner 在 Driver 中运行,在 SparkContext 启动且 spark.cleaner.referenceTracking 配置为 true 时启动,在 SparkContext 停止的时候停止。

2.1. 属性

  1. referenceBuffer

    缓存 CleanupTaskWeakReference 的集合。CleanupTaskWeakReference 是 Java 自带 WeakReference 类的封装,其中保存有需要清理的Spark组件实例的弱引用。

    1
    private val referenceBuffer = Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
  2. referenceQueue

    缓存弱引用实例的引用队列。对弱引用和软引用实例,当其被 GC 之后就会存入引用队列中,用户程序通过从队列中取得这些引用信息,就可以执行自定义的清理操作。

    1
    private val referenceQueue = new ReferenceQueue[AnyRef]
  3. listeners

    ContextCleaner 的监听器队列,目前只是在测试代码中用到,没有实际用途。

    1
    private val listeners = new ConcurrentLinkedQueue[CleanerListener]()
  4. cleaningThread

    执行具体清理工作的线程

    1
    private val cleaningThread = new Thread() { override def run(): Unit = keepCleaning() }
  5. periodicGCService

    一个单线程的调度线程池,用来周期性地执行 GC 操作。

    1
    2
    private val periodicGCService: ScheduledExecutorService =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
  6. periodicGCInterval

    periodicGCService 执行 GC 的周期长度,由配置项 spark.cleaner.periodicGC.interval 控制,默认为 30 分钟。

    1
    private val periodicGCInterval = sc.conf.get(CLEANER_PERIODIC_GC_INTERVAL)
  7. blockOnCleanupTasks

    行清理任务的时候是否阻塞(不包含 Shuffle 数据的清理任务),由配置项 spark.cleaner.referenceTracking.blocking 控制,默认值 true。

    1
    private val blockOnCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING)
  8. blockOnShuffleCleanupTasks

    执行清理 Shuffle 数据的任务时是否阻塞,由配置项 spark.cleaner.referenceTracking.blocking.shuffle 控制,默认值 false。

    1
    private val blockOnShuffleCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE)
  9. stopped

    该 ContextCleaner 是否停止的标记。

2.2. 清理任务

ContextCleaner 中共有 5 种清理任务,分别对应 RDD、Shuffle、广播变量、累加器和检查点,继承自 CleanupTask

1
2
3
4
5
6
7
private sealed trait CleanupTask
private case class CleanRDD(rddId: Int) extends CleanupTask
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
private case class CleanAccum(accId: Long) extends CleanupTask
private case class CleanCheckpoint(rddId: Int) extends CleanupTask
private case class CleanSparkListener(listener: SparkListener) extends CleanupTask

2.3. CleanupTaskWeakReference

CleanupTaskWeakReference 定义如下:

1
2
3
4
5
private class CleanupTaskWeakReference(
val task: CleanupTask,
referent: AnyRef,
referenceQueue: ReferenceQueue[AnyRef]
) extends WeakReference(referent, referenceQueue)

当其中的 referent 对象可达性变为弱可达时,对应的 CleanupTaskWeakReference 实例就会被加入 ReferenceQueue 中,用于执行清理任务。

三、实现

3.1. 启动

$ContextCleaner.start()$ 将清理线程 cleaningThread 设为守护线程并启动之,然后按照 periodicGCInterval 的间隔来调度执行 $System.gc()$ 方法,进而可能触发一次 GC。

1
2
3
4
5
6
def start(): Unit = {
cleaningThread.setDaemon(true) // 设置守护线程
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start() //启动线程
periodicGCService.scheduleAtFixedRate(() => System.gc(), periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}

在 Spark Application 中指定 Driver 或 Executor 的 JVM 参数时,一定不要加上 -XX:-DisableExplicitGC,该参数会使 $System.gc()$ 的调用无效化。

cleaningThread 的 $run()$ 函数为 $keepCleaning()$ 的调用:

1
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

3.1.1. 注册清理事件

ContextCleaner 提供了 $registerForCleanup()$ 方法,用来将 CleanupTask 及其对应要清理的对象加入 referenceBuffer 集合中

1
2
3
4
5
6
7
8
9
10
11
12
/** Register an RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]): Unit = registerForCleanup(rdd, CleanRDD(rdd.id))
def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = registerForCleanup(a, CleanAccum(a.id))
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}
//...
/** Register a RDDCheckpointData for cleanup when it is garbage collected. */
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
registerForCleanup(rdd, CleanCheckpoint(parentId))
}

$registerForCleanup()$ 方法将 CleanupTask 及其对应要清理的对象封装为 CleanupTaskWeakReference 对象添加到 referenceBuffer 中

1
2
3
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

3.1.2. 清理

需要被回收的对象将会在 ReferenceQueue 中,$keepCleaning()$ 方法从 ReferenceQueue 中取出 CleanupTaskWeakReference,然后将其包含的 CleanupTask 进行模式匹配,并对五种情况分别调用不同的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
synchronized {
reference.foreach { ref =>
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) => doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) => doCleanCheckpoint(rddId)
case CleanSparkListener(listener) => doCleanSparkListener(listener)
}
}
}
} catch {
//...
}
}

3.2.1. 清理 RDD

清理 RDD 通过调用 $SparkContext.unpersistRDD()$ 方法来反持久化一个 RDD

3.2.2. 清理 Shuffle 数据

清理 Shuffle 则需要同时从 MapOutputTracker 与 BlockManager 中反注册 Shuffle。清理完毕后再调用各个监听器的监听方法进行记录。