Spark-源码学习-SparkCore-提交作业
一、概述
为了简化应用程序提交的复杂性,Spark提供了各种应用程序提交的统一入口,即 spark-submit 脚本,应用程序的提交都间接或直接地调用了该脚本。
二、源码~
Spark 任务的第一步: 如何提交用户编写的程序🤔️~
2.1. /bin/spark-submit
Spark 提供了各种应用程序 application 提交的统一入口, 即 spark-submit 脚本, 应用程序的提交都间接或直接地调用了该脚本
https://blog.csdn.net/qq_26838315/article/details/115175653
用户应用程序可以使用 bin/spark-submit
脚本来启动。spark-submit
脚本负责使用 Spark 及其依赖关系设置类路径,并可支持 Spark 支持的不同群集管理器和部署模式。
1 | exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" |
所有其他脚本最终都调用到的一个执行 Java 类的脚本
2.1.1. 循环读取 ARG 参数, 加入到 CMD 中
1 | while IFS= read -d "$DELIM" -r ARG; do |
build_command() 中 spark-class 调用 org.apache.spark.launcher.Main
1 | build_command() { |
org.apache.spark.launcher.Main
Spark 内部脚本调用的工具类, 并不是真正的执行入口, 负责对参数进行解析, 并生成执行命令, 最后将命令返回给 spark-class 的 exec “${CMD[@]}” 执行。
2.1.2. org.apache.spark.launcher.Main
根据提交类型 spark-submit 和 spark-class(master、worker、historyserver 等等), 构建对应的命令解析对象SparkSubmitCommandBuilder 和 SparkClassCommandBuilder, 再通过 buildCommand() 方法构造执行命令。
2.1.3. 执行命令 exec
1 | exec "${CMD[@]}" |
2.2. SparkSubmit
启动 SparkSubmit 的 JVM 进程后,调用 SparkSubmit 伴生对象的 main() 方法~
2.2.1. 封装 Spark-submit 提交参数
SparkSubmitArguments 通过解析命令行、默认配置文件、env 中的参数,使用以 spark.开头的参数填充对象属性值, 保证重要参数不为空
2.2.2. doSubmit()
初始化日志记录
同时标记日志记录是否需要在应用程序启动之前重置
调用 main() 方法中实例化 SparkSubmit 对象时重写的参数解析方法,解析并封装 Spark 提交参数
提交逻辑
SparkSubmitAction.SUBMIT
定义包含针对代理者的 $runMain()$ 方法
$SparkSubmit.runMain()$
准备提交环境和参数: $prepareSubmitEnvironment()$
prepareSubmitEnvironment() 方法中做了如下操作:
- 根据参数中 master 和 delpoy-mode, 设置对应的 clusterManager 和 部署模式
- 再根据 args 中的其他参数, 设置相关 childArgs, childClasspath, sysProps, childMainolass 并返回结果
返回值:
childArgs: 子进程参数
childClasspath: 子进程 classpath 列表
sparkConf: 系统属性 map
childMainClass: 子进程 main 方法
在 SparkSubmit 中会根据不同的 ClusterManger 和 DeployMode 选择不同的 childMainClass
在 Standalone 中 childMainClass: STANDALONE_CLUSTER_SUBMIT_CLASS
1
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
让主类在启动日志系统后重新初始化日志系统
加载 jar 包
1
2
3
4val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}通过反射实例化 mainClass 类
1
mainClass = Utils.classForName(childMainClass)
根据 mainClass 类实例化 SparkApplication
1
2
3
4
5val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}调用 $SparkApplication.start()$ 方法启动 SparkApplication
1
app.start(childArgs.toArray, sparkConf)
在 Standalone 模式中为 $ClientApp.start()$~
在 $ClientApp.start()$~中初始化 ClientEndpoint 对象,负责与 Master 通信交互
一个 RpcEndpoint 的初始化会调用 $onStart()$。在 $ClientEndpoint.onStart()$ 中匹配到
launch
这个动作,并向 MasterEndpoint 发送 RequestSubmitDriver 消息。
SparkSubmitAction.KILL
SparkSubmitAction.REQUEST_STATUS
SparkSubmitAction.PRINT_VERSION
2.3. Master 接受 Application
1 | case RequestSubmitDriver(description) => |
Master 在接受到 RequestSubmitDriver
后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中,然后调用 $Master.schedule()$ 为处于待分配资源的 Application 分配资源。在每次有新的 Application 加入或者新的资源加入时都会调用 $schedule()$ 进行调度。