数据湖-Hudi-源码学习-Query Engines-Flink-写数据
一、概述
- https://juejin.cn/user/4117007604398775/posts
- https://blog.csdn.net/naisongwen/article/details/122725934
- https://cloud.tencent.com/developer/article/2049105
- https://juejin.cn/post/6992227979526930468 (delete)
- https://blog.csdn.net/yiweiyi329/article/details/121842572
- https://blog.csdn.net/yiweiyi329/article/details/122012474
- https://baijiahao.baidu.com/s?id=1730162628824294760&wfr=spider&for=pc
- https://www.cnblogs.com/yunqishequ/p/14450603.html
二、集成
2.1. Catalog
Iceberg 提供了 FlinkCatalog,用于与 Flink 集成并管理 lceberg 表。FlinkCatalog 通过 Spark SQL 接口暴露 Iceberg表,使得用户可以使用 Spark SQL 查询和操作 Iceberg 表。
2.2. Flink Connector
2.2.1. DataSet
- RichinputFormat 接口
这是 Flink 老版数据源接口哦😯~
FlinkSource 是 Iceberg 提供 的用于 Flink 读取 Iceberg 的封装类,其关键是 $build$ 方法,该方法会返回一个 FlinkInputFormat 对象,FlinkinputFormat 继承了 Flink 数据源 RichinputFormat 接口,提供了跟 Flink 集成时,操作 Iceberg 表的元数据和数据的方式~
- Source 接口
Iceberg 也提供了对接 Flink 新版 Source 接口的实现~,不过还处于Experimental
阶段~
https://blog.csdn.net/yiweiyi329/article/details/126691684
2.2.2. Table/SQL
Iceberg Table Source 继承 Flink Connector API ScanTableSource 接口,提供了一个将 Iceberg 数据湖中的数据读取到 Flink Table 中进行处理和分析的实现。
三、创建表
FlinkSQL 从建表 SQL 语句中解析出表名,表的 Schema,表的属性等信息信息,调用 Catalog Api 进行建表。
1 | CREATE TABLE local.iceberg_db.table_demo ( |
四、写入
由于 Iceberg 采用乐观锁的方式来实现 Transaction 的提交。
也就是说两个任务 task1, task2 同时提交更改事务到 Iceberg 时,后开始的一方 task2 会不断重试,等 task1 顺利提交之后,再重新读取 metadata 信息提交 transaction。
考虑到这一点,采用多个并发算子去提交 Transaction 是不合适的,容易造成大量事务冲突,导致重试。所以,Iceberg 将写入流程拆成了两个算子:
- IcebergStreamWriter 主要用来写入记录到对应的 avro、 parquet、orc 文件,生成一个对应的 Iceberg DataFile。并发送给下游算子
- IcebergFilesCommitter 主要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Iceberg,完成本次 checkpoint 的数据写入。
因此 Iceberg 只能做近实时的入湖:
- Iceberg 提交 Transaction 时是以文件粒度来提交的,无法以秒为单位提交 Transaction,否则会造成文件数量膨胀
- Flink 写入以 Checkpoint 为单位,物理数据写入 Iceberg 之后并不能直接查询,当触发 Checkpoint 之后才会写 Metadata 文件,这时数据由不可见变为可见,Checkpoint 每次执行也会需要一定时间。
https://blog.csdn.net/john1337/article/details/118887581
https://cloud.tencent.com/developer/article/2016584
在 Iceberg 中并没有其他任何其他第三方服务的依赖,而 Hudi 在某些方面做了一些 service 的抽象,如将 metastore 抽象为独立的 Timeline,这可能会依赖一些独立的索引甚至是其他的外部服务来完成。
4.1. 数据写入
IcebergStreamWriter 的设计比较简单,主要任务是把 Record 转换成 Iceberg DataFile。
4.2. 元数据提交
IcebergFilesCommitter 相对复杂一点,为每个 checkpointId维护了一个 DataFile 文件列表,即
这样即使中间有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到Iceberg 表中。