Spark-源码学习-架构设计-DataSource 体系-保存数据
一、概述
1 | df.write.format("hudi") |
二、write()
初始化 DataFrameWriter
1 | def write: DataFrameWriter[T] = { |
三、save()
save() 方法首先添加 path 参数,然后判断 source 是否等于 Hive,接下来通过 DataSource#lookupDataSource() 查找 hudi 对应的 DataSouce 类。
3.1. lookupV2Provider()
其实我们在上篇文章中讲isV2Provider
时涉及到Spark3.2.1版本的lookupDataSource
方法了,spark2.4.4的也差不多,我们再来看一下:其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource
org.apache.hudi.Spark2DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
等,然后过滤shortName=hudi的,只有Spark2DefaultSource
满足,所以直接返回Spark2DefaultSource
。这里和Spark3.2.1不同的是: Spark2对应Spark2DefaultSource
Spark3对应Spark3DefaultSource
3.2. V2
3.2.1. Append/Overwrite
获取 Tuple(Table, TableCatalog, Identifier)
SupportsCatalogOptions
Identifier: SupportsCatalogOptions#extractSessionConfigs()
TableCatalog: CatalogV2Util#getTableProviderCatalog()
Table: TableCatalog#loadTable()
TableProvider
BATCH_WRITE
- Table: getTable()
- TableCatalog: None
- Identifier: None
STREAMING_WRITE
1
return saveToV1Source()
初始化 DataSourceV2Relation
1
val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
检查分区
1
checkPartitioningMatchesV2Table(table)
runCommand
1
2
3
4private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
SQLExecution.withNewExecutionId(qe, Some(name))(qe.toRdd)
}Append
1
2
3runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan, extraOptions.toMap)
}Overwrite
1
2
3
4runCommand(df.sparkSession, "save") {
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), extraOptions.toMap)
}
3.2.2. createMode
SupportsCatalogOptions
1
2
3
4
5
6
7
8
9
10runCommand(df.sparkSession) {
CreateTableAsSelect(
catalog,
ident,
partitioningAsV2,
df.queryExecution.analyzed,
Map(TableCatalog.PROP_PROVIDER -> source) ++ location,
extraOptions.toMap,
ignoreIfExists = createMode == SaveMode.Ignore)
}TableProvider
BATCH_WRITE
1
2
3throw new AnalysisException(s"TableProvider implementation $source cannot be " +
s"written with $createMode mode, please use Append or Overwrite " +
"modes instead.")STREAMING_WRITE
1
saveToV1Source()
3.3. V1
1 | saveToV1Source() |
saveToV1Source() 核心是后面的runCommand
,先看他的参数 DataSource.planForWriting
3.1. 获取数据源写入 Command
3.1.1. 初始化 DataSource
3.1.2. DataSource#planForWriting()
DataSource.lookupDataSource返回的为
Spark2DefaultSource,并且它的父类
DefaultSource实现了
CreatableRelationProvider,所以返回
SaveIntoDataSourceCommand
1 | def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { |
3.2. 运行 Command
1 | private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = { |
这里的executedPlan
为ExecutedCommandExec
,因为SaveIntoDataSourceCommand
是RunnableCommand
也是Command
的子类,同样在我之前写的文章Hudi Spark SQL源码学习总结-Create Table我们可知无论是df.logicalPlan
还是executedPlan
都会触发一遍完整的Spark SQL的parsing
、analysis
、optimization
、planning
,并且在planning
阶段的planner.plan
方法中会遍历strategies
并应用其apply方法,其中有一个BasicOperators
,它的apply方法为
1 | object BasicOperators extends Strategy { |
SaveIntoDataSourceCommand 是 RunnableCommand 的子类,所以返回 ExecutedCommandExec,它是 SparkPlan 的子类,至于如何触发 Spark SQL 的parsing
、analysis
、optimization
、planning
这里的SparkPlan
的子类是ExecutedCommandExec
,它的doExecute
会调用sideEffectResult
,继而调用cmd.run
,这里的cmd
为SaveIntoDataSourceCommand
它的run方法会调用自定义数据源的 createRelation() 方法~,