Spark-源码系列-SparkCore-度量系统
一、概述
Spark 的度量系统是由 Instance、Source(度量来源)、Metrics、Sink(度量目的地,每个 instance 可以设置一个或多个 Sink) 四个部分组成
二、实现
2.1. MetricsSystem
MetricsSystem 是 Spark 度量系统的中枢大脑,底层使用的是第三方的库 Metrics,MetricsSystem 里面维护了一个 MetricRegistry 的实例,Source 和 Sink 的都是通过这个 MetricRegistry 注册的。整体上 Spark 中的 MetricsSystem 就是基于 Metrics 做了一层封装~
2.1.1. 初始化
MeticsSystem 的初始化是在 SparkEnv 创建的过程中创建的~
1 | val metricsSystem = if (isDriver) { |
如果是 driver 端,需要等到 TaskScheduler 启动 app 之后才会启动 MetricsSystem (需要 appId),如果是 executor 端的话,在创建之后即可启动~
2.1.2. 启动
$MeticsSystem.start()$ 负责启动度量系统,内部调用了 $registerSources()$ 和 $registerSinks()$ ,是将 source 和 sink 注册到 MetricsSystem 内部对象 MetricRegistry 中。
2.2. Source
Spark 将度量来源抽象为 Source,Spark 内部实现的 instance 有: Master,Worker,Executor,Driver,Applications。Source 定义了 2 个方法,分别用来获取度量来源的名称 $sourceName()$,以及其对应的注册中心 $metricRegistry()$。目前,已经存在以下两种source:
- Spark 内部的 source,比如 MasterSource,WorkerSource,ExecutorSource,DAGSchedulerSource,BlockManagerSource,ApplicationSource。这些Source会收集 Spark 内部部件的状态。这些 source 都跟 instance 相关,在创建度量系统的时候会被加入。
- 公共的 source,比如 JVMSource,收集的是更加底层的状态,可以用配置文件配置并且是通过反射机制加载的。
2.2.1. 注册
MetricsSystem 提供了 $registerSource()$ 方法来注册单个度量来源~
$registerSource()$ 方法首先将度量来源加入缓存数组,调用 $buildRegistryName()$ 方法来构造 Source 的注册名称,然后调用 $MetricRegistry.register()$ 方法注册到度量仓库。
1 | def registerSource(source: Source): Unit = { |
$buildRegistryName()$
Source 的注册名称取决于度量的命名空间(由 spark.metrics.namespace 参数控制,默认值为 Application ID),以及 Executor ID。其默认注册名称则由 $MetricRegistry.name()$ 方法来生成。
$registry.register()$
在 MetricsSystem 初始化时,会根据 MetricsConfig 来初始化所有对应的 Source~
调用 $MetricsConfig.getInstance()$ 方法取得实例名称下的配置,然后用 $MetricsConfig.subProperties()$ 方法,根据正则表达式 ^source\.(.+)\.(.+)
匹配出该实例所有与 Source 相关的参数,返回类型为 HashMap[String, Properties]。最后,根据配置的 class 属性,利用反射构造出 Source 实现类的对象实例,调用 $MetricsSystem.registerSource()$ 方法将 Source 注册到度量仓库。
2.3. Sink
Spark 将度量目的地抽象为 Sink,每个实例能够输出到 0 个到多个 Sinks。
相当于 Metric 体系中的 reporter
ConsoleSink: 将指标信息记录到控制台。
CSVSink: 将度量数据导出到 CSV 文件。
JmxSink: 注册指标到 JMX 控制台中查看的。
MetricsServlet: 可以利用 Spark UI 内置的 Jetty 服务将监控数据输出到浏览器页面。
MetricsServlet 作为默认的 sink,只支持,master,worker,client driver,可以通过发送 http请求
/metrics/json
,可以以 json 的格式获取所有已经注册的指标数据GraphiteSink: 将指标追加到 Graphite 节点。
Slf4jSink: 将度量标准作为日志条目发送到 slf4j。
GangliaSink: 向 Ganglia 节点或多播组发送指标。
2.3.1. 注册
MetricsSystem 并没有提供注册单个度量目的地的方法,只提供了 $registerSinks()$ 方法在初始化时批量注册度量目的地。
它前半部分的处理方式与 $registerSources()$ 方法一致,不改用了正则表达式 ^sink\.(.+)\.(.+)
匹配出该实例所有与 Sink 相关的参数。然后同样利用反射构造出 Sink 实现类的对象实例,如果度量实例名称为 servlet,说明是 Web UI 使用的 Sink,将它赋值给 metricsServlet 属性。否则,就将其加入 sinks 缓存数组。在MetricsSystem 初始化的最后,会调用 $Sink.start()$ 方法分别启动每个 Sink。