一、概述

1
2
3
4
5
6
7
8
9
10
11
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tmp.getCanonicalPath)

二、write()

初始化 DataFrameWriter

1
2
3
4
5
6
7
def write: DataFrameWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
"'write' can not be called on streaming Dataset/DataFrame")
}
new DataFrameWriter[T](this)
}

三、save()

save() 方法首先添加 path 参数,然后判断 source 是否等于 Hive,接下来通过 DataSource#lookupDataSource() 查找 hudi 对应的 DataSouce 类。

3.1. lookupV2Provider()

其实我们在上篇文章中讲isV2Provider时涉及到Spark3.2.1版本的lookupDataSource方法了,spark2.4.4的也差不多,我们再来看一下:其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource org.apache.hudi.Spark2DefaultSource org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat 等,然后过滤shortName=hudi的,只有Spark2DefaultSource满足,所以直接返回Spark2DefaultSource。这里和Spark3.2.1不同的是: Spark2对应Spark2DefaultSource Spark3对应Spark3DefaultSource

3.2. V2

3.2.1. Append/Overwrite

  1. 获取 Tuple(Table, TableCatalog, Identifier)

    • SupportsCatalogOptions

      • Identifier: SupportsCatalogOptions#extractSessionConfigs()

      • TableCatalog: CatalogV2Util#getTableProviderCatalog()

      • Table: TableCatalog#loadTable()

    • TableProvider

      • BATCH_WRITE

        • Table: getTable()
        • TableCatalog: None
        • Identifier: None
      • STREAMING_WRITE

        1
        return saveToV1Source()
  2. 初始化 DataSourceV2Relation

    1
    val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
  3. 检查分区

    1
    checkPartitioningMatchesV2Table(table)
  4. runCommand

    1
    2
    3
    4
    private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = {
    val qe = session.sessionState.executePlan(command)
    SQLExecution.withNewExecutionId(qe, Some(name))(qe.toRdd)
    }
    • Append

      1
      2
      3
      runCommand(df.sparkSession, "save") {
      AppendData.byName(relation, df.logicalPlan, extraOptions.toMap)
      }
    • Overwrite

      1
      2
      3
      4
      runCommand(df.sparkSession, "save") {
      OverwriteByExpression.byName(
      relation, df.logicalPlan, Literal(true), extraOptions.toMap)
      }

3.2.2. createMode

  1. SupportsCatalogOptions

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    runCommand(df.sparkSession) {
    CreateTableAsSelect(
    catalog,
    ident,
    partitioningAsV2,
    df.queryExecution.analyzed,
    Map(TableCatalog.PROP_PROVIDER -> source) ++ location,
    extraOptions.toMap,
    ignoreIfExists = createMode == SaveMode.Ignore)
    }
  2. TableProvider

    • BATCH_WRITE

      1
      2
      3
      throw new AnalysisException(s"TableProvider implementation $source cannot be " +
      s"written with $createMode mode, please use Append or Overwrite " +
      "modes instead.")
    • STREAMING_WRITE

      1
      saveToV1Source()

3.3. V1

1
saveToV1Source()

saveToV1Source() 核心是后面的runCommand,先看他的参数 DataSource.planForWriting

3.1. 获取数据源写入 Command

3.1.1. 初始化 DataSource

3.1.2. DataSource#planForWriting()

DataSource.lookupDataSource返回的为Spark2DefaultSource,并且它的父类DefaultSource实现了CreatableRelationProvider,所以返回SaveIntoDataSourceCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
providingInstance() match {
case dataSource: CreatableRelationProvider =>
disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = true)
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false)
DataSource.validateSchema(data.schema, sparkSession.sessionState.conf)
planForWritingFileFormat(format, mode, data)
case _ => throw new IllegalStateException(
s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

3.2. 运行 Command

1
2
3
4
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
qe.assertCommandExecuted()
}

这里的executedPlanExecutedCommandExec,因为SaveIntoDataSourceCommandRunnableCommand也是Command的子类,同样在我之前写的文章Hudi Spark SQL源码学习总结-Create Table我们可知无论是df.logicalPlan还是executedPlan都会触发一遍完整的Spark SQL的parsinganalysisoptimizationplanning,并且在planning阶段的planner.plan方法中会遍历strategies并应用其apply方法,其中有一个BasicOperators,它的apply方法为

1
2
3
4
5
6
7
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
...
}
}

SaveIntoDataSourceCommand 是 RunnableCommand 的子类,所以返回 ExecutedCommandExec,它是 SparkPlan 的子类,至于如何触发 Spark SQL 的parsinganalysisoptimizationplanning

这里的SparkPlan的子类是ExecutedCommandExec,它的doExecute会调用sideEffectResult,继而调用cmd.run,这里的cmdSaveIntoDataSourceCommand

它的run方法会调用自定义数据源的 createRelation() 方法~,