一、概述

AppStatusStore 用来存储 Application 的状态数据,Spark Web UI 及 REST API 需要的数据都取自它。

二、设计

AppStatusStore 的构造依赖于两个要素: 一为键值对存储 KVStore,二为 App 状态监听器 AppStatusListener

2.1. KVStore

InMemoryStore 是在内存中维护的键值对存储;LevelDB 则是借助 Google 开源的 KV 数据库来实现,可以持久化到磁盘。ElementTrackingStore 额外加上了跟踪元素个数的功能,可以根据元素个数阈值触发特定的操作,但它更多地是个包装类,需要依赖于 InMemoryStore 或者 LevelDB。

2.1.1. ElementTrackingStore

ElementTrackingStore 的初始化依赖于 InMemoryStore,它的多数方法都是直接代理了 InMemoryStore 的方法。为了实现跟踪元素数并触发操作的功能,其内部维护了一个类型与触发器(通过内部样例类 Trigger 定义)的映射关系,添加触发器的方法如下。

2.2. AppStatusListener

AppStatusListener 类继承自 SparkListener 类,因此实现了很多 SparkListener 中定义的监听事件处理方法

2.2.1. LiveEntity 缓存

LiveEntity,可以直译为”活动实体”,指在 Application 运行过程中状态在不断变化的 Spark 内部组件,比如 Job、Stage、Task、Executor、RDD,它们会向ElementTrackingStore 更新自己的状态数据。

LiveEntity 的实现类(比如 LiveJob、LiveTask等) 都包含了大量的监控信息和度量信息,监控信息来自 AppStatusListener,度量信息来自 MetricsSystem。并且它们都需要实现 $doUpdate()$ 方法,该方法负责将 LiveEntity 最新的状况反映给 ElementTrackingStore。

在 AppStatusListener 类中,也预先定义了各个 LiveEntity 的缓存。

1
2
3
4
5
6
7
8
private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
private[spark] val liveExecutors = new HashMap[String, LiveExecutor]()
private[spark] val deadExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
private[spark] val liveMiscellaneousProcess = new HashMap[String, LiveMiscellaneousProcess]()

2.2.2. 清理触发器

在 AppStatusListener 的构造方法中,调用了 $ElementTrackingStore.addTrigger()$ 方法添加触发器。

2.2.3. onJobStart()

在接收到表示 Job 启动的 SparkListenerJobStart 事件后,该方法的大致流程如下:

  1. 根据该 Job 的 Stage 信息,估算出一个大致的 Task 数目,获取其最后一个 Stage 的名称

    1
    2
    3
    4
    val numTasks = {
    val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
    missingStages.map(_.numTasks).sum
    }
  2. 封装 LiveJob 实例,将其放入缓存,并调用 $liveUpdate()$ 方法向 KVStore 更新状态。$liveUpdate()$ 方法最终调用 $LiveEntity.write()$ 方法

    1
    2
    3
    4
    val job = new LiveJob(event.jobId, jobName, description,
    if (event.time > 0) Some(new Date(event.time)) else None, event.stageIds, jobGroup, numTasks, sqlExecutionId)
    liveJobs.put(event.jobId, job)
    liveUpdate(job, now)
  3. 调用 $getOrCreateStage()$ 方法生成 LiveStage 实例,同样向 KVStore 更新状态

    1
    2
    3
    4
    5
    6
    event.stageInfos.foreach { stageInfo =>
    val stage = getOrCreateStage(stageInfo)
    stage.jobs :+= job
    stage.jobIds += event.jobId
    liveUpdate(stage, now)
    }
  4. 生成 RDD 的 DAG 表示,并写入 KVStore 中。

    1
    2
    3
    4
    5
    6
    event.stageInfos.foreach { stage =>
    val graph = RDDOperationGraph.makeOperationGraph(stage, maxGraphRootNodes)
    val uigraph = new RDDOperationGraphWrapper(stage.stageId, graph.edges, graph.outgoingEdges, graph.incomingEdges,
    newRDDOperationCluster(graph.rootCluster))
    kvstore.write(uigraph)
    }

AppStatusListener 监听的每个事件都会采用类似上面的逻辑来处理,将数据写入 KVStore 之后,就可以通过 AppStatusStore 将它们取出并且展示~。

三、实现

有了存储数据的 ElementTrackingStore 和监听并写入数据的 AppStatusListener,AppStatusStore 的实现就会非常简单,只需要调用 $read()$ 或 $view()$ 从ElementTrackingStore 中以一定的规则取出数据进行包装即可。

3.1. environmentInfo()

3.2. stageData()

通过调用 $stageData()$ 方法展示 Stage 相关的信息