一、概述

为了简化应用程序提交的复杂性,Spark提供了各种应用程序提交的统一入口,即 spark-submit 脚本,应用程序的提交都间接或直接地调用了该脚本。

二、源码~

Spark 任务的第一步: 如何提交用户编写的程序🤔️~

2.1. /bin/spark-submit

Spark 提供了各种应用程序 application 提交的统一入口, 即 spark-submit 脚本, 应用程序的提交都间接或直接地调用了该脚本

https://blog.csdn.net/qq_26838315/article/details/115175653

用户应用程序可以使用 bin/spark-submit 脚本来启动。spark-submit 脚本负责使用 Spark 及其依赖关系设置类路径,并可支持 Spark 支持的不同群集管理器和部署模式。

1
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

所有其他脚本最终都调用到的一个执行 Java 类的脚本

2.1.1. 循环读取 ARG 参数, 加入到 CMD 中

1
2
3
4
5
6
7
8
9
10
11
12
13
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")

build_command() 中 spark-class 调用 org.apache.spark.launcher.Main

1
2
3
4
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}

org.apache.spark.launcher.Main Spark 内部脚本调用的工具类, 并不是真正的执行入口, 负责对参数进行解析, 并生成执行命令, 最后将命令返回给 spark-class 的 exec “${CMD[@]}” 执行。

2.1.2. org.apache.spark.launcher.Main

根据提交类型 spark-submit 和 spark-class(master、worker、historyserver 等等), 构建对应的命令解析对象SparkSubmitCommandBuilder 和 SparkClassCommandBuilder, 再通过 buildCommand() 方法构造执行命令。

2.1.3. 执行命令 exec

1
exec "${CMD[@]}"

2.2. SparkSubmit

启动 SparkSubmit 的 JVM 进程后,调用 SparkSubmit 伴生对象的 main() 方法~

2.2.1. 封装 Spark-submit 提交参数

SparkSubmitArguments 通过解析命令行、默认配置文件、env 中的参数,使用以 spark.开头的参数填充对象属性值, 保证重要参数不为空

2.2.2. doSubmit()

  1. 初始化日志记录

    同时标记日志记录是否需要在应用程序启动之前重置

  2. 调用 main() 方法中实例化 SparkSubmit 对象时重写的参数解析方法,解析并封装 Spark 提交参数

  3. 提交逻辑

    • SparkSubmitAction.SUBMIT

      • 定义包含针对代理者的 $runMain()$ 方法

      • $SparkSubmit.runMain()$

        • 准备提交环境和参数: $prepareSubmitEnvironment()$

          prepareSubmitEnvironment() 方法中做了如下操作:

          1. 根据参数中 master 和 delpoy-mode, 设置对应的 clusterManager 和 部署模式
          2. 再根据 args 中的其他参数, 设置相关 childArgs, childClasspath, sysProps, childMainolass 并返回结果

          返回值:

          • childArgs: 子进程参数

          • childClasspath: 子进程 classpath 列表

          • sparkConf: 系统属性 map

          • childMainClass: 子进程 main 方法

            在 SparkSubmit 中会根据不同的 ClusterManger 和 DeployMode 选择不同的 childMainClass

            在 Standalone 中 childMainClass: STANDALONE_CLUSTER_SUBMIT_CLASS

            1
            private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
        • 让主类在启动日志系统后重新初始化日志系统

        • 加载 jar 包

          1
          2
          3
          4
          val loader = getSubmitClassLoader(sparkConf)
          for (jar <- childClasspath) {
          addJarToClasspath(jar, loader)
          }
        • 通过反射实例化 mainClass 类

          1
          mainClass = Utils.classForName(childMainClass)
        • 根据 mainClass 类实例化 SparkApplication

          1
          2
          3
          4
          5
          val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
          mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
          } else {
          new JavaMainApplication(mainClass)
          }
        • 调用 $SparkApplication.start()$ 方法启动 SparkApplication

          1
          app.start(childArgs.toArray, sparkConf)

          在 Standalone 模式中为 $ClientApp.start()$~

          在 $ClientApp.start()$~中初始化 ClientEndpoint 对象,负责与 Master 通信交互

          一个 RpcEndpoint 的初始化会调用 $onStart()$。在 $ClientEndpoint.onStart()$ 中匹配到 launch 这个动作,并向 MasterEndpoint 发送 RequestSubmitDriver 消息。

    • SparkSubmitAction.KILL

    • SparkSubmitAction.REQUEST_STATUS

    • SparkSubmitAction.PRINT_VERSION

2.3. Master 接受 Application

1
2
3
4
5
6
7
8
9
10
11
12
13
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}"))
}

Master 在接受到 RequestSubmitDriver 后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中,然后调用 $Master.schedule()$ 为处于待分配资源的 Application 分配资源。在每次有新的 Application 加入或者新的资源加入时都会调用 $schedule()$ 进行调度。