一、概述

Spark Datasource API 是一套连接外部数据源和 Spark 引擎的框架,提供一种快速读取外界数据的能力,它可以方便地把不同的数据格式通过 DataSource API 注册成 Spark 的表,然后通过 Spark SQL 直接读取。它可以充分利用 Spark 分布式的优点进行并发读取,通过 SparkSQL Catayst 优化引擎,能够加快任务的执行。

Spark Datasource API 同时提供了一套优化机制,如将列剪枝和过滤操作下推至数据源侧,减少数据读取数量,提高数据处理效率。

DataSource

二、设计

DataSource API 是 Apache Spark 中非常流行的功能。许多开发人员广泛使用它来将第三方应用程序连接到 Apache Spark。Spark DataSource API 目前有 V1 和 V2 两个版本,V1 在 2.3.x 之前就已经存在了,V2 API 在 Spark 2.3.0 中引入,并在 2.4.0 中进行了修改。在 3.0.0 中,Spark 对 V2 API 进行了重大更改,但是为了向后兼容,V1 被保留不变。

2.1. V1

2.1.1. 设计

V1 数据源包括以下接口:

  1. 数据源

    • BaseRelation: 代表一个抽象的数据源。

    • InsertableRelation: 用于插入数据的 BaseRelation

    • DataSourceRegister: 注册数据源的简称,在使用数据源时不用写数据源的全限定类名,而只需要写自定义的 shortName 即可

  2. RelationProvider 接口

    • RelationProvider: 从指定数据源中生成自定义的 relation, $createRelation$ 方法会基于给定的参数生成新的 relation

    • SchemaRelationProvider: 基于给定的参数和给定的 Schema 信息生成新的 relation

    • CreatableRelationProvider: 用于将 DataFrame 落地存储

    • StreamSourceProvider: 产生一个流式的数据源

    • StreamSinkProvider: 产生一个流式的落地sink

  3. 数据扫描接回

    • TableScan: 全表数据扫描
    • PrunedScan: 返回指定列数据,其他的列数据源不用返回
    • PrunedFilteredScan: 指定列的同时,附加一些过滤条件,只返回满足过滤条件的数据

通过实现 RelationProvider 接口,表明该类是一种新定义的数据源,可以供 Spark SQL 取数所用。传入 $createRelation()$ 方法的参数可以用来做初始化,如文件路径、权限信息等。BaseRelation 抽象类则用来定义数据源的表结构,它的来源可以是数据库、Parquet 文件等外部系统,也可以直接由用户指定。该类还必须实现某个
Scan 接口,Spark 会调用 $buildScan()$ 方法来获取数据源的 RDD。

2.1.2. 不足

  1. 其输入参数包括 DataFrame/SQLContext,因此 DataSource API 兼容性取决于这些上层的 API。

    $createRelation()$ 接收 SQLContext 作为参数; $buildScan()$ 方法返回的是 RDD[Row] 类型;而在实现写操作时,$insert()$ 方法会直接接收 DataFrame 类型的参数。

    1
    2
    3
    trait InsertableRelation {
    def insert (data: DataFrame, overwrite: Boolean): Unit
    }

    这些类型都属于较为上层的 Spark API,其中某些类已经发生了变化,如 SQLContext 已被 SparkSession 取代,而 DataFrame 也改为了 Dataset[Row] 类型的一个别称。这些改变不应该体现到底层的数据源 API 中。

    以看到高层次的 API 随着时间的推移而发展。较低层次的数据源 API 依赖于高层次的 API 不是一个好主意

  2. 可扩展性不好,并且算子的下推能力受限。

    除了 TableScan 接口,V1 API 还提供了 PrunedScan 接口,用来裁剪不需要的字段; PrunedFilteredScan 接口则可以将过滤条件下推到数据源中。如果想添加新的优化算子:如 LIMIT 子句,就需要引入一系列的 Scan 接口组合:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    trait LimitedScan {
    def buildScan(limit: Int): RDD [Row]
    }
    trait PrunedLimitedScan {
    def buildScan(requiredColumns: Array [String], Limit: Int):RDD [Row]
    }
    trait PrunedFilteredLimitedScan {
    def buildScan(requiredColumns: Array [String], filters: Array [Filter], limit: Int): RDD[Row]
    }
  3. 缺乏对列式存储读取的支持

    Spark V1 版本的 Data Source API 仅支持以行式的形式读取数据。即使 Spark 内部引擎支持列式数据表示,它也不会暴露给数据源。然而使用列式数据进行分析会提升性能,Spark 没必要读取列式数据的时候把其转换成行式,然后再在 Spark 内部引擎里面转换成列式进行分析~

  4. 缺乏分区和排序信息

    对于支持数据分区的数据源,如 HDFS、Kafka 等,V1 API 没有提供原生的支持,因而也不能利用数据局部性(Data Locality)

  5. 写操作不支持事务

    Spark 任务是有可能失败的,使用 V1 API 时就会留下部分写入的数据。对于 HDFS 可以用 _SUCCESS 来标记该次写操作是否执行成功。但这一逻辑需要最终用户来实现,而 V2 API 则提供了明确的接口来支持事务性的写操作。

  6. 不支持流处理

    越来越多的场景需要流式处理,但是 DataSource API V1 不支持这个功能,导致像 Kafka 这样的数据源必须调用一些专用的内部 API 或者独自实现。

2.1.3. 自定义

2.2. V2

Data Source V2 API 最早在 Spark 2.3 提出,V2 API 使用了一个标记性的 DataSourceV2 接口,实现接口的类还必须实现 ReadSupport 或 WriteSupport,用来表示自身支持读或写操作。ReadSupport 接口中的方法会被用来创建 DataSourceReader 类,同时接收到初始化参数;该类继而创建 DataReaderFactory 和 DataReader 类,后者负责真正的读操作,接口中定义的方法和迭代器类似。此外,DataSourceFeader 还可以实现各类 Support 接口,我明自己支持某些优化下推操作,如裁剪字段、过滤条件等。WriteSupport API 的层级结构与之类似。这些接口都是用 Java 语言编写,以获得更好的交互支持。

2.2.1. 设计

2.2.2. 自定义

2.3. V2 Improvement

Data Source V2 API 是 Spark3 引入的一个重要特性,最早在 Spark 2.3 提出,在 Spark 3.0 被重新设计,具有非常良好的扩展性,使得 API 可以一直进化,每个版本都新增了大量的 API。

下图用多种颜色标记不同的 Spark 版本提供的 Data Source V2 API:

2.2.1. 设计

DataSource API V2 版本旨在提供一个高性能的,易于维护的,易于扩展的外部数据源接口。

在 Spark 2.4.x 中,数据源 API 中的主要接口是 DatasourceV2,所有自定义数据源都需要实现它或其中一个专业化接口,如 ReadSupport 或 WriteSupport。在 3.0.x 中,此接口被删除。引入了一个新的 TableProvider 接口。它是所有不需要支持 DDL 的自定义数据源的基本接口。

2.2.2. 工作流程

2.2.3. 自定义

三、工作流程

3.1. 读取数据

  1. $format()$ 方法传入 source 字符串

    1
    spark.write.format("hudi")
  2. 根据 source 字符串 查找数据源

    $DataSource.lookupDataSource()$ 找到 source 对应的 DataSource 类(一般包括 FileFormat 和 RelationProvider 两类)

  3. $DataSource.resolveRelation()$ 会根据 DataSource 类型创建 BaseRelation(一般包括 HadoopFsRelation 和继承 BaseRelation 且实现以下接口的类: TableScan、PrunedScan、PrunedFilteredScan、InsertableRelation、CatalystScan )。

  4. $SparkSession.baseRelationToDataFrame()$ 将 BaseRelation 传入创建 LogicalRelation 逻辑计划,并利用 LogicalRelation 创建 DataSet。

  5. FileSourceStrategy、DataSourceStrategy、DataSourceV2Strategy、InMemoryScans 将 LogicalRelation 逻辑计划转换为物理计划,生成具体的 DataSourceRDD

  6. $compute()$ 函数实现真正的读取逻辑。

3.2. 写入数据

https://mp.weixin.qq.com/s/bFrQ-4c7cOaWDgszeBpLlQ