Spark-源码学习-SparkSession-SparkContext-ContextCleaner 设计
一、概述
SparkContext 初始化的最后一个组件: 上下文清理器 ContextCleaner。它扮演着 Spark Core 中垃圾收集器的角色。Spark 运行的时候,会产生一堆临时文件和数据,比如 Shuffle 的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用数据,最终造成大数据集群崩溃而死。
二、设计
ContextCleaner 是 Spark 应用中的垃圾收集器,负责应用级别的 shuffles,RDDs,broadcasts,accumulators 及 checkpointedRDD 文件的清理,用于减少内存及磁盘存储的压力。
ContextCleaner 在 Driver 中运行,在 SparkContext 启动且 spark.cleaner.referenceTracking 配置为 true 时启动,在 SparkContext 停止的时候停止。
2.1. 属性
referenceBuffer
缓存 CleanupTaskWeakReference 的集合。CleanupTaskWeakReference 是 Java 自带 WeakReference 类的封装,其中保存有需要清理的Spark组件实例的弱引用。
1
private val referenceBuffer = Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
referenceQueue
缓存弱引用实例的引用队列。对弱引用和软引用实例,当其被 GC 之后就会存入引用队列中,用户程序通过从队列中取得这些引用信息,就可以执行自定义的清理操作。
1
private val referenceQueue = new ReferenceQueue[AnyRef]
listeners
ContextCleaner 的监听器队列,目前只是在测试代码中用到,没有实际用途。
1
private val listeners = new ConcurrentLinkedQueue[CleanerListener]()
cleaningThread
执行具体清理工作的线程
1
private val cleaningThread = new Thread() { override def run(): Unit = keepCleaning() }
periodicGCService
一个单线程的调度线程池,用来周期性地执行 GC 操作。
1
2private val periodicGCService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")periodicGCInterval
periodicGCService 执行 GC 的周期长度,由配置项 spark.cleaner.periodicGC.interval 控制,默认为 30 分钟。
1
private val periodicGCInterval = sc.conf.get(CLEANER_PERIODIC_GC_INTERVAL)
blockOnCleanupTasks
行清理任务的时候是否阻塞(不包含 Shuffle 数据的清理任务),由配置项 spark.cleaner.referenceTracking.blocking 控制,默认值 true。
1
private val blockOnCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING)
blockOnShuffleCleanupTasks
执行清理 Shuffle 数据的任务时是否阻塞,由配置项 spark.cleaner.referenceTracking.blocking.shuffle 控制,默认值 false。
1
private val blockOnShuffleCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE)
stopped
该 ContextCleaner 是否停止的标记。
2.2. 清理任务
ContextCleaner 中共有 5 种清理任务,分别对应 RDD、Shuffle、广播变量、累加器和检查点,继承自 CleanupTask
1 | private sealed trait CleanupTask |
2.3. CleanupTaskWeakReference
CleanupTaskWeakReference 定义如下:
1 | private class CleanupTaskWeakReference( |
当其中的 referent 对象可达性变为弱可达时,对应的 CleanupTaskWeakReference 实例就会被加入 ReferenceQueue 中,用于执行清理任务。
三、实现
3.1. 启动
$ContextCleaner.start()$ 将清理线程 cleaningThread
设为守护线程并启动之,然后按照 periodicGCInterval
的间隔来调度执行 $System.gc()$ 方法,进而可能触发一次 GC。
1 | def start(): Unit = { |
在 Spark Application 中指定 Driver 或 Executor 的 JVM 参数时,一定不要加上 -XX:-DisableExplicitGC,该参数会使 $System.gc()$ 的调用无效化。
cleaningThread 的 $run()$ 函数为 $keepCleaning()$ 的调用:
1 | private val cleaningThread = new Thread() { override def run() { keepCleaning() }} |
3.1.1. 注册清理事件
ContextCleaner 提供了 $registerForCleanup()$ 方法,用来将 CleanupTask 及其对应要清理的对象加入 referenceBuffer
集合中
1 | /** Register an RDD for cleanup when it is garbage collected. */ |
$registerForCleanup()$ 方法将 CleanupTask 及其对应要清理的对象封装为 CleanupTaskWeakReference 对象添加到 referenceBuffer 中
1 | private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { |
3.1.2. 清理
需要被回收的对象将会在 ReferenceQueue 中,$keepCleaning()$ 方法从 ReferenceQueue 中取出 CleanupTaskWeakReference,然后将其包含的 CleanupTask 进行模式匹配,并对五种情况分别调用不同的方法。
1 | private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { |
3.2.1. 清理 RDD
清理 RDD 通过调用 $SparkContext.unpersistRDD()$ 方法来反持久化一个 RDD
3.2.2. 清理 Shuffle 数据
清理 Shuffle 则需要同时从 MapOutputTracker 与 BlockManager 中反注册 Shuffle。清理完毕后再调用各个监听器的监听方法进行记录。