一、概述

Spark 的度量系统是由 Instance、Source(度量来源)、Metrics、Sink(度量目的地,每个 instance 可以设置一个或多个 Sink) 四个部分组成

二、实现

2.1. MetricsSystem

MetricsSystem 是 Spark 度量系统的中枢大脑,底层使用的是第三方的库 Metrics,MetricsSystem 里面维护了一个 MetricRegistry 的实例,Source 和 Sink 的都是通过这个 MetricRegistry 注册的。整体上 Spark 中的 MetricsSystem 就是基于 Metrics 做了一层封装~

2.1.1. 初始化

MeticsSystem 的初始化是在 SparkEnv 创建的过程中创建的~

1
2
3
4
5
6
7
8
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf)
} else {
conf.set(EXECUTOR_ID, executorId)
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf)
ms.start(conf.get(METRICS_STATIC_SOURCES_ENABLED))
ms
}

如果是 driver 端,需要等到 TaskScheduler 启动 app 之后才会启动 MetricsSystem (需要 appId),如果是 executor 端的话,在创建之后即可启动~

2.1.2. 启动

$MeticsSystem.start()$ 负责启动度量系统,内部调用了 $registerSources()$ 和 $registerSinks()$ ,是将 source 和 sink 注册到 MetricsSystem 内部对象 MetricRegistry 中。

2.2. Source

Spark 将度量来源抽象为 Source,Spark 内部实现的 instance 有: Master,Worker,Executor,Driver,Applications。Source 定义了 2 个方法,分别用来获取度量来源的名称 $sourceName()$,以及其对应的注册中心 $metricRegistry()$。目前,已经存在以下两种source:

  1. Spark 内部的 source,比如 MasterSource,WorkerSource,ExecutorSource,DAGSchedulerSource,BlockManagerSource,ApplicationSource。这些Source会收集 Spark 内部部件的状态。这些 source 都跟 instance 相关,在创建度量系统的时候会被加入。
  2. 公共的 source,比如 JVMSource,收集的是更加底层的状态,可以用配置文件配置并且是通过反射机制加载的。

2.2.1. 注册

MetricsSystem 提供了 $registerSource()$ 方法来注册单个度量来源~

$registerSource()$ 方法首先将度量来源加入缓存数组,调用 $buildRegistryName()$ 方法来构造 Source 的注册名称,然后调用 $MetricRegistry.register()$ 方法注册到度量仓库。

1
2
3
4
5
6
7
8
9
10
11
def registerSource(source: Source): Unit = {
sources.synchronized {
sources += source
}
try {
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}
  1. $buildRegistryName()$

    Source 的注册名称取决于度量的命名空间(由 spark.metrics.namespace 参数控制,默认值为 Application ID),以及 Executor ID。其默认注册名称则由 $MetricRegistry.name()$ 方法来生成。

  2. $registry.register()$

在 MetricsSystem 初始化时,会根据 MetricsConfig 来初始化所有对应的 Source~

调用 $MetricsConfig.getInstance()$ 方法取得实例名称下的配置,然后用 $MetricsConfig.subProperties()$ 方法,根据正则表达式 ^source\.(.+)\.(.+) 匹配出该实例所有与 Source 相关的参数,返回类型为 HashMap[String, Properties]。最后,根据配置的 class 属性,利用反射构造出 Source 实现类的对象实例,调用 $MetricsSystem.registerSource()$ 方法将 Source 注册到度量仓库。

2.3. Sink

Spark 将度量目的地抽象为 Sink,每个实例能够输出到 0 个到多个 Sinks。

相当于 Metric 体系中的 reporter

  • ConsoleSink: 将指标信息记录到控制台。

  • CSVSink: 将度量数据导出到 CSV 文件。

  • JmxSink: 注册指标到 JMX 控制台中查看的。

  • MetricsServlet: 可以利用 Spark UI 内置的 Jetty 服务将监控数据输出到浏览器页面。

    MetricsServlet 作为默认的 sink,只支持,master,worker,client driver,可以通过发送 http请求 /metrics/json ,可以以 json 的格式获取所有已经注册的指标数据

  • GraphiteSink: 将指标追加到 Graphite 节点。

  • Slf4jSink: 将度量标准作为日志条目发送到 slf4j。

  • GangliaSink: 向 Ganglia 节点或多播组发送指标。

2.3.1. 注册

MetricsSystem 并没有提供注册单个度量目的地的方法,只提供了 $registerSinks()$ 方法在初始化时批量注册度量目的地。

它前半部分的处理方式与 $registerSources()$ 方法一致,不改用了正则表达式 ^sink\.(.+)\.(.+) 匹配出该实例所有与 Sink 相关的参数。然后同样利用反射构造出 Sink 实现类的对象实例,如果度量实例名称为 servlet,说明是 Web UI 使用的 Sink,将它赋值给 metricsServlet 属性。否则,就将其加入 sinks 缓存数组。在MetricsSystem 初始化的最后,会调用 $Sink.start()$ 方法分别启动每个 Sink。