一、概述

1
2
3
4
5
6
7
val df: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/siros?...")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "3494269")
.option("dbtable", "satcat")
.load()

二、read()

创建 DataFrameReader 对象,进行数据读取任务。

1
def read: DataFrameReader = new DataFrameReader(self)

DataFrameReader 用于从外部数据源加载数据到 DataFrame 中。它支持多种格式,如 JSON,CSV,Parquet,JDBC 等

三、load()

在 Spark 中调用 load() 触发加载数据源操作。

3.1. 配置项 pathOptionBehavior

spark.sql.legacy.pathOptionBehavior.enabled 用于控制在使用 spark.sql() 或 spark.catalog() 时,是否使用绝对路径或相对路径来指定表或视图。

该配置项的默认值是 true 意味着使用 spark.sql() 等 API 时,使用相对路径来指定表或视图(相对于当前数据库的路径)。当将此配置设置为 false 时,将使用绝对路径来指定表或视图(无论当前的数据库是什么)

1
2
3
4
5
6
7
8
9
10
11
12
def load(paths: String*): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read")
}

val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior
if (!legacyPathOptionBehavior &&
(extraOptions.contains("path") || extraOptions.contains("paths")) && paths.nonEmpty) {
throw QueryCompilationErrors.pathOptionNotSetCorrectlyWhenReadingError()
}
...
}

3.2. 查找数据源

3.2.1. lookupDataSource()

查找数据源时,首先从内置数据源中 backwardCompatibilityMap 进行查找,查找失败时,以输入的数据源类路径加类名 . DefaultSource 构建出数据源实例。

1
val cls = lookupDataSource(provider, conf)
  1. 创建 provider1

    从 backwardCompatibilityMap 获取 provider 所对应的默认的 DataSourceProvider 的全类名。如果 provider 为 jdbc、json、parquet、orc 等相关的全类名,则可直接返回对应的 DataSourceProvider 的全类名,否则返回 provider 本身。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private val backwardCompatibilityMap: Map[String, String] = {
    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
    val json = classOf[JsonFileFormat].getCanonicalName
    val parquet = classOf[ParquetFileFormat].getCanonicalName
    // ...
    Map(
    "org.apache.spark.sql.jdbc" -> jdbc,
    "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
    "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
    //...
    )
    }
  2. 创建 provider2

    1
    val provider2 = s"$provider1.DefaultSource" 

    以数据湖 Hudi 为例: provider1 = hudi; provider2 = hudi.DefaultSource

  3. 使用 ServiceLoader 加载所有继承 DataSourceRegister 的类,得到 serviceLoader

    1
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

  4. 获取 getContextOrSparkClassLoader

    1
    val loader = Utils.getContextOrSparkClassLoader
  5. 搜索 DataSource

    • provider1 不是 shortName,则使用 SparkClassLoader 加载 provider1,失败则加载 provider2

      1
      2
      3
      4
      5
      6
      7
      8
      case Nil =>
      try {
      Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
      case Success(dataSource) =>
      dataSource
      case Failure(error) =>
      ...
      }
    • provider1 是 shortName,则直接返回其对应的 DataSource Class。

      1
      2
      case head :: Nil =>
      head.getClass // org.apache.hudi.DefaultSource

3.2.2. lookupDataSourceV2()

  1. 获取 useV1Sources

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    val useV1Sources = conf.getConf(SQLConf.USE_V1_SOURCE_LIST).toLowerCase(Locale.ROOT).split(",").map(_.trim)

    val USE_V1_SOURCE_LIST = buildConf("spark.sql.sources.useV1SourceList")
    .internal()
    .doc("A comma-separated list of data source short names or fully qualified data source " +
    "implementation class names for which Data Source V2 code path is disabled. These data " +
    "sources will fallback to Data Source V1 code path.")
    .version("3.0.0")
    .stringConf
    .createWithDefault("avro,csv,json,kafka,orc,parquet,text")
  2. lookupDataSource()

    调用 lookupDataSource() 查找数据源 cls

  3. 校验 cls

    1
    2
    3
    4
    5
    cls.newInstance() match {
    case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
    case t: TableProvider if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) => Some(t)
    case _ => None
    }

3.3. 加载数据源

为了保证代码的兼容性,Spark 会先尝试加载一次实现类,判断是否为 DataSourceV2,如果不是则使用 loadV1Source() 方法继续加载实现类。

3.3.1. V2

1
DataSourceV2Utils.loadV2Source(sparkSession, provider, userSpecifiedSchema, extraOptions,source, paths: _*)
  1. 获取 Tuple (Table, TableCatalog, Identifier)

    • SupportsCatalogOptions

      • userSpecifiedSchema.nonEmpty

        1
        2
        throw new IllegalArgumentException(
        s"$source does not support user specified schema. Please don't specify the schema.")
      • userSpecifiedSchema.empty

        • Identifier: extractIdentifier()

        • TableCatalog: CatalogV2Util#getTableProviderCatalog()

        • Table: CatalogV2Util.getTable()

    • other

      • Identifier: None
      • TableCatalog: None
      • Table: DataSourceV2Vtils#getTableFromProvider()
  2. 加载数据

    • SupportsRead & BATCH_READ: DataSet#ofRows()

      1
      Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, catalog, ident, dsOptions))
    • other: 加载数据源 V1

3.3.2. V1

Spark 在 loadV1Source() 中调用 DataSource 的 apply() 进行初始化,并调用其 resolveRelation() 方法创建 BaseRelation,然后通过 SparkSession 创建 DataFrame 返回。

1
loadV1Source(paths: _*)
  1. DataSource.apply()

    通过 apply() 初始化 DataSource。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    sparkSession.baseRelationToDataFrame(
    DataSource.apply(
    sparkSession,
    paths = finalPaths,
    userSpecifiedSchema = userSpecifiedSchema,
    className = source,
    options = finalOptions.originalMap
    ).resolveRelation()
    )
  2. 解析 Relation: DataSource#resolveRelation()

    • providingInstance()

      主要操作是调用 DataSource#lookupDataSource() 方法查找数据源,根据 SPI (再次)扫描 DataRegister 实现类

      1
      2
      3
      4
      5
      6
      7
      8
      private[sql] def providingInstance(): Any = providingClass.getConstructor().newInstance()  
      lazy val providingClass: Class[_] = {
      val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
      cls.newInstance() match {
      case f: FileDataSourceV2 => f.fallbackFileFormat
      case _ => cls
      }
      }
    • match{}

      DataSource#lookupDataSource() 返回的为 Spark32PlusDefaultSource,它的父类 DefaultSource 既实现了 SchemaRelationProvider 也实现了 RelationProvider, 但是这里的 userSpecifiedSchema 为 None

    • createRelation() 在 自定义实现的数据源中实现~

  3. SparkSession#baseRelationToDataFrame()

    SparkSession#baseRelationToDataFrame() 方法将 BaseRelation 转换为 DataFrame。

    1
    2
    3
    def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
    Dataset.ofRows(self, LogicalRelation(baseRelation))
    }