一、概述

Worker 在 Local/Standalone 部署模式中对工作节点的资源和 Executor 进行管理。Worker 一方面向 Master 汇报自身所管理的资源信息,一方面接收 Master 的命令运行 Driver 或者为 Application 运行 Executor。

二、实现

2.1. 属性

  1. cores: 内核数

  2. memory: 内存大小

  3. masterRpcAddresses: Master 的 RpcEnv 地址(即 RpcAddress)的数组。

    由于一个集群为了可靠性和容错,需要多个 Master 节点,因此用数组来存储它们的 RpcEnv 地址

  4. workDirPath: Worker 工作目录

  5. host/port: Worker 的 RpcEnv 的 host/端口

  6. forwordMessageScheduler: 用于发送消息的调度执行器(ScheduledThreadPool-Executor)

    forwordMessageScheduler 只能调度执行一个线程,执行的线程以 worker-forward-message-scheduler 作为名称。

  7. cleanupThreadExecutor: 用于清理 Worker 的工作目录。

  8. HEARTBEAT_MILLIS: 向 Master 发送心跳的时间间隔,是 spark.worker.timeout 属性值的1/4,默认为15秒

  9. INITIAL_REGISTRATION_RETRIES: 固定为 6,代表连接 Master 的前六次尝试

  10. TOTAL_REGISTRATION_RETRIES: 固定为 16,代表连接 Master 最多有 16 次尝试

  11. FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND: 固定为 0.500,是为了确保尝试的时间间隔不至于过小的下边界。

  12. REGISTRATION_RETRY_FUZZ_MULTIPLIER: 0~1 范围的随机数与 FUZZ_MULTIP-LIER_INTERVAL_LOWER_BOUND 的和

    加入随机数是为了避免各个 Worker 在同一时间发送心跳。

  13. INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS: 代表前六次尝试的时间间隔

  14. PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS: 代表最后十次尝试的时间间隔

  15. CLEANUP_ENABLED: 是否对旧的 Application 产生的文件夹及文件进行清理。通过 spark.worker.cleanup.enabled 配置,默认为 false。

  16. CLEANUP_INTERVAL_MILLIS: 对旧的 Application 产生的文件夹及文件进行清理的时间间隔。通过 spark.worker.cleanup.interval 配置,默认为 1800 秒。

  17. APP_DATA_RETENTION_SECONDS: Application 产生的文件夹及文件保留的时间。可通过 spark.worker.cleanup.appDataTtl 属性配置,默认为 7 天

  18. activeMasterUrl: 处于激活(ALIVE)状态的 Master 的 URL

  19. activeMasterWebUiUrl: 处于激活(ALIVE)状态的 Master 的 WebUI 的 URL

  20. workerWebUiUrl: Worker 的 WebUI 的 URL

  21. workerUri: Worker 的 URL,格式为 spark://$name@${ host}:${ port}

  22. registered: 标记 Worker 是否已经注册到 Master

  23. connected: 标记 Worker 是否已经连接到 Master

  24. workDir: Worker 的工作目录

  25. finishedExecutors: 已经完成的 Executor 的身份标识与 ExecutorRunner 之间的映射关系

  26. drivers: driver id 与 DriverRunner 之间的映射关系

  27. executors: executor id 与 ExecutorRunner 之间的映射关系

  28. finishedDrivers: 已经完成的 Driver Id 与 DriverRunner 之间的映射关系

  29. appDirectories: Application ID 与对应的目录集合之间的映射关系

  30. finishedApps: 已经完成的 Application 的 ID 的集合

  31. retainedExecutors: 保留的 Executor 的数量。通过 spark.worker.ui.retainedExecutors 属性配置,默认为 1000

  32. retainedDrivers: 保留的 Driver 的数量。通过 spark.worker.ui.retainedDrivers 属性配置,默认为 1000

  33. shuffleService: 外部 Shuffle 服务

  34. publicAddress: Worker 的公共地址,如果设置了环境变量 SPARK_PUBLIC_DNS,则为环境变量 SPARK_PUBLIC_DNS 的值,否则为 host

  35. connectionAttemptCount: 连接尝试次数的计数器

  36. registerMasterFutures: 由于 Worker 向 Master 进行注册的过程是异步的,此变量保存线程返回的 java.util.concurrent.Future

  37. registrationRetryTimer:Worker向Master进行注册重试的定时器

  38. registerMasterThreadPool: Worker 向 Master 进行注册的线程池

    此线程池的大小与 Master 节点的数量一样,启动的线程以 worker-register-master-threadpool 为前缀。

  39. coresUsed: 当前 Worker 已经使用的内核数。

    Worker 提供了 coresFree 方法返回 cores 属性和 coresUsed 的差值,作为空闲的内核数。

  40. memoryUsed: 当前 Worker 已经使用的内存大小

    Worker 提供了 $memoryFree$ 方法返回 memory 属性和 memoryUsed 的差值,作为空闲的内存大小。

三、源码

3.1. 启动脚本

Spark 中各个组件是通过脚本来启动部署的。以脚本为入口点开始分析 Worker 的部署。每个组件对应提供了启动的脚本,同时也会提供停止的脚本。

3.2. main()

查看 Worker 伴生对象中的 main() 方法~, Worker伴生对象中的 main 方法、格式和 Master 基本一致。通过参数的类型WorkerArguments来解析命令行参数。

1
2
3
4
5
6
7
8
9
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf,
val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
rpcEnv.awaitTermination()

Worker 类的入口点处包含了对应的参数类 WorkerArguments。WorkerArguments 类包括 Spark 属性配置相美的一些解析。解析完 Worker 的参数后,调用 startRpcEnvAndEndpoint() 方法启动 RPC 通信环境以及 Worker 的 RPC 通信终端~

3.2.1. 创建 SecurityManager

SecurityManager 主要对账号、权限及身份认证进行设置和管理

3.2.2. 创建 RpcEnv

3.2.3. 注册 Worker 通信终端 OnStart()

  1. 实例化 Worker

  2. 注册 Worker

    将 Worker 注册 RpcEnv 中,并获得 Worker 的 RpcEndpointRef,Worker 继承了 ThreadSafeRpcEndpoint,实例化后首先会调用 $onStart()$ 方法