Spark-源码学习-集群启动-standalone-worker
一、概述
Worker 在 Local/Standalone 部署模式中对工作节点的资源和 Executor 进行管理。Worker 一方面向 Master 汇报自身所管理的资源信息,一方面接收 Master 的命令运行 Driver 或者为 Application 运行 Executor。
二、实现
2.1. 属性
cores: 内核数
memory: 内存大小
masterRpcAddresses: Master 的 RpcEnv 地址(即 RpcAddress)的数组。
由于一个集群为了可靠性和容错,需要多个 Master 节点,因此用数组来存储它们的 RpcEnv 地址
workDirPath: Worker 工作目录
host/port: Worker 的 RpcEnv 的 host/端口
forwordMessageScheduler: 用于发送消息的调度执行器(ScheduledThreadPool-Executor)
forwordMessageScheduler 只能调度执行一个线程,执行的线程以
worker-forward-message-scheduler
作为名称。cleanupThreadExecutor: 用于清理 Worker 的工作目录。
HEARTBEAT_MILLIS: 向 Master 发送心跳的时间间隔,是 spark.worker.timeout 属性值的1/4,默认为15秒
INITIAL_REGISTRATION_RETRIES: 固定为 6,代表连接 Master 的前六次尝试
TOTAL_REGISTRATION_RETRIES: 固定为 16,代表连接 Master 最多有 16 次尝试
FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND: 固定为 0.500,是为了确保尝试的时间间隔不至于过小的下边界。
REGISTRATION_RETRY_FUZZ_MULTIPLIER: 0~1 范围的随机数与 FUZZ_MULTIP-LIER_INTERVAL_LOWER_BOUND 的和
加入随机数是为了避免各个 Worker 在同一时间发送心跳。
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS: 代表前六次尝试的时间间隔
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS: 代表最后十次尝试的时间间隔
CLEANUP_ENABLED: 是否对旧的 Application 产生的文件夹及文件进行清理。通过 spark.worker.cleanup.enabled 配置,默认为 false。
CLEANUP_INTERVAL_MILLIS: 对旧的 Application 产生的文件夹及文件进行清理的时间间隔。通过 spark.worker.cleanup.interval 配置,默认为 1800 秒。
APP_DATA_RETENTION_SECONDS: Application 产生的文件夹及文件保留的时间。可通过 spark.worker.cleanup.appDataTtl 属性配置,默认为 7 天
activeMasterUrl: 处于激活(ALIVE)状态的 Master 的 URL
activeMasterWebUiUrl: 处于激活(ALIVE)状态的 Master 的 WebUI 的 URL
workerWebUiUrl: Worker 的 WebUI 的 URL
workerUri: Worker 的 URL,格式为
spark://$name@${ host}:${ port}
。registered: 标记 Worker 是否已经注册到 Master
connected: 标记 Worker 是否已经连接到 Master
workDir: Worker 的工作目录
finishedExecutors: 已经完成的 Executor 的身份标识与 ExecutorRunner 之间的映射关系
drivers: driver id 与 DriverRunner 之间的映射关系
executors: executor id 与 ExecutorRunner 之间的映射关系
finishedDrivers: 已经完成的 Driver Id 与 DriverRunner 之间的映射关系
appDirectories: Application ID 与对应的目录集合之间的映射关系
finishedApps: 已经完成的 Application 的 ID 的集合
retainedExecutors: 保留的 Executor 的数量。通过 spark.worker.ui.retainedExecutors 属性配置,默认为 1000
retainedDrivers: 保留的 Driver 的数量。通过 spark.worker.ui.retainedDrivers 属性配置,默认为 1000
shuffleService: 外部 Shuffle 服务
publicAddress: Worker 的公共地址,如果设置了环境变量
SPARK_PUBLIC_DNS
,则为环境变量SPARK_PUBLIC_DNS
的值,否则为 hostconnectionAttemptCount: 连接尝试次数的计数器
registerMasterFutures: 由于 Worker 向 Master 进行注册的过程是异步的,此变量保存线程返回的 java.util.concurrent.Future
registrationRetryTimer:Worker向Master进行注册重试的定时器
registerMasterThreadPool: Worker 向 Master 进行注册的线程池
此线程池的大小与 Master 节点的数量一样,启动的线程以
worker-register-master-threadpool
为前缀。coresUsed: 当前 Worker 已经使用的内核数。
Worker 提供了 coresFree 方法返回 cores 属性和 coresUsed 的差值,作为空闲的内核数。
memoryUsed: 当前 Worker 已经使用的内存大小
Worker 提供了 $memoryFree$ 方法返回 memory 属性和 memoryUsed 的差值,作为空闲的内存大小。
三、源码
3.1. 启动脚本
Spark 中各个组件是通过脚本来启动部署的。以脚本为入口点开始分析 Worker 的部署。每个组件对应提供了启动的脚本,同时也会提供停止的脚本。
3.2. main()
查看 Worker 伴生对象中的 main() 方法~, Worker伴生对象中的 main 方法、格式和 Master 基本一致。通过参数的类型WorkerArguments来解析命令行参数。
1 | Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(exitOnUncaughtException = false)) |
Worker 类的入口点处包含了对应的参数类 WorkerArguments。WorkerArguments 类包括 Spark 属性配置相美的一些解析。解析完 Worker 的参数后,调用 startRpcEnvAndEndpoint() 方法启动 RPC 通信环境以及 Worker 的 RPC 通信终端~
3.2.1. 创建 SecurityManager
SecurityManager 主要对账号、权限及身份认证进行设置和管理
3.2.2. 创建 RpcEnv
3.2.3. 注册 Worker 通信终端 OnStart()
实例化 Worker
注册 Worker
将 Worker 注册 RpcEnv 中,并获得 Worker 的 RpcEndpointRef,Worker 继承了 ThreadSafeRpcEndpoint,实例化后首先会调用 $onStart()$ 方法
构建工作目录
$createWorkDir()$ 方法构建了该 Worker 节点上的工作目录,后续在该节点上执行的 Application 相关信息都会存放在该目录下。
启动 Shuffle 服务
如果配置了外部的 Shuffle 服务, 启动 shuffleService
1
startExternalShuffleService()
启动 Worker Web UI
1
2webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()引用本站文章Spark-源码系列-SparkCore-UIJoker注册
sendRegisterMessageToMaster(masterEndpoint) 向特定 Master 的 RPC 通信终端发送消息 RegisterWorker。Worker 接收到反馈消息后,进一步调用 handleRegisterResponse() 方法进行处理。
1
registerWithMaster()
引用本站文章Spark-源码学习-集群启动-standalone-worker-注册机制Joker启动度量系统
度量系统启动后, 将 Worker 度量的 Servlet 处理程序附加到 Web 用户界面
1
2
3metricsSystem.registerSource(workerSource)
metricsSystem.start()
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)引用本站文章Spark-源码系列-SparkCore-度量系统Joker