一、概述

https://mp.weixin.qq.com/s/TVkuyBwCQmIfSR3VYFrGaQ

https://www.learntospark.com/2020/02/how-to-read-and-write-parquet-in-apache-spark.html

https://www.mianshigee.com/note/detail/221583thf/

https://www.sohu.com/a/507896675_120099884

https://mp.weixin.qq.com/s/HH8s6o7C50uaq2hs4axd0A

1.1. Parquet

二、ParquetFileFormat

2.1. supportBatch()

1
2
3
4
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
}

2.2. Schema Merging

似于 Protocol Buffer, Avro 和 Thrift。Parquet 也支持 Schema Evolution。用户能能从一个简单的 Schema 开始,逐渐添加更多的列到 Schema 中。最后,用户可以得到多种不同的复杂的 Parquet 文件,但是它们有着互相兼容的架构。Parquet 可以检测到这种情况,并合并这些 Schema。

https://mp.weixin.qq.com/s/g1xJGBOziIQQxPq1ocl9xQ

2.3. 初始化 reader

https://www.jianshu.com/p/709d88ca4497

三、数据结构

3.1. ColumnarBatch

3.2. ParquetColumnVector

四、文件读取

Spark SQL 中读取 Parquet 文件的过程入口点是 DataSourceScanExec,Spark SQL 通过三个主要 Reader 组件实现了矢量化 Parquet 读取: VectorizedParquetRecordReader、VectorizedColumnReader 和 VectorizedValuersReader。

4.1. DataSourceScanExec

真正读取数据是 DatasourceScanExec

DataSourceV2ScanExec 有 V2 版本,parauet, orc 使用的是 V1版 org.apache.spark.sql.execution.DataSourceScanExec.scala

$inputRDD()$ 方法利用 relation.fileFormat.buildReaderWithPartitionValues 作为 readFile 的方法,创建 BucketedReadRDD 或者 NonBucketedReadRDD。DataSourceStrategy 和 InMemoryScans 策路最后都会生成 RoWDataSourceScanExec, 最终会调用
CatalystScan\PrunedScanl TableScan的 buildScan 方法生成 RDD[Row], 再调用 toCatalystRDD 将 RDD[RoW] 转换为 RDDInterna[Row]。

$FileSourceScanExec.inputRDD$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
lazy val inputRDD: RDD[InternalRow] = {
val options = relation.options +
(FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

val readRDD = if (bucketedScan) {
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions)
} else {
createReadRDD(readFile, dynamicallySelectedPartitions)
}
sendDriverMetrics()
readRDD
}

4.1.1. 获取 PartitionedFile

4.1.2. readRDD

在 $ParquetFileFormat.buildReaderWithPartitionValues()$ 实现中, 会使用 split 来初始化 reader, 并且根据配置可以把 reader 分为否是 vectorized 的

4.2. Parquet Reader

例如对于 Parquet 文件,Spark 使用 PartquetRecordReader 或 VectorizedParquetRecordReader 类来读取 Parquet 文件,分别对应非向量化读和向量化的读取。

4.2.1. Vectorized Reader

  1. VectorizedParquetRecordReader

    在初始化 VectorizedParquetRecordReader 并执行 $initBatch()$ 方法创建 ColumnBatch 实例和相关 ColumnVector 实例后,就可以通过 $nextKeyValue()$ 方法逐批读取 parquet 文件。

  2. VectorizedColumnReader

    对于每个行组,都会创建一个新的 VectorizedColumnReader 数组,每个请求的列都有一个 VectorizedColumnReader 实例。VectorizedParquetRecordReader 中的 $nextBatch()$ 方法会在 VectorizedColumnReader 数组中循环,将批次数据逐列读入相应的 ColumnVectors 中。

  3. VectorizedValuesReader

    根据数据编码方法,VectorizedColumnReader 会创建相应的 VectorizedValuesReader 实现实例来读取和解码数据。

4.2.2. PartquetRecordReader

五、文件写入

5.1. Parquet Writer

https://blog.csdn.net/orisonchan/article/details/86603701

https://blog.csdn.net/weixin_33885676/article/details/89622568

https://blog.csdn.net/u013494310/article/details/44832857/

https://zhuanlan.zhihu.com/p/147887693?utm_source=wechat_session

5.1.1. Spark

  1. ParquetWriteSupport

    ParquetRecordWriter 其 overwrite 的写方法,就是调用 InternalParquetRecordWriter 的写方法,然后 InternalParquetRecordWriter 的写方法又调用 writeSupport 的写方法,在 Spark 的 ParquetWriteSupport 对象中,会发现最后调用的是 recordConsumer 的 addX 方法(如addBoolean(),addInteger()),而 recordConsumer 实际对象是 MessageColumnIORecordConsumer

  2. ParquetOutputWriter

  3. FileFormatDataWriter

5.1.2. Hadoop

  1. ParquetRecordWriter

六、总结