一、概述

  1. https://juejin.cn/user/4117007604398775/posts
  2. https://blog.csdn.net/naisongwen/article/details/122725934
  3. https://cloud.tencent.com/developer/article/2049105
  4. https://juejin.cn/post/6992227979526930468 (delete)
  5. https://blog.csdn.net/yiweiyi329/article/details/121842572
  6. https://blog.csdn.net/yiweiyi329/article/details/122012474
  7. https://baijiahao.baidu.com/s?id=1730162628824294760&wfr=spider&for=pc
  8. https://www.cnblogs.com/yunqishequ/p/14450603.html

二、集成

2.1. Catalog

Iceberg 提供了 FlinkCatalog,用于与 Flink 集成并管理 lceberg 表。FlinkCatalog 通过 Spark SQL 接口暴露 Iceberg表,使得用户可以使用 Spark SQL 查询和操作 Iceberg 表。

2.2.1. DataSet

  1. RichinputFormat 接口

这是 Flink 老版数据源接口哦😯~

FlinkSource 是 Iceberg 提供 的用于 Flink 读取 Iceberg 的封装类,其关键是 $build$ 方法,该方法会返回一个 FlinkInputFormat 对象,FlinkinputFormat 继承了 Flink 数据源 RichinputFormat 接口,提供了跟 Flink 集成时,操作 Iceberg 表的元数据和数据的方式~

  1. Source 接口
    Iceberg 也提供了对接 Flink 新版 Source 接口的实现~,不过还处于 Experimental 阶段~
    https://blog.csdn.net/yiweiyi329/article/details/126691684

2.2.2. Table/SQL

Iceberg Table Source 继承 Flink Connector API ScanTableSource 接口,提供了一个将 Iceberg 数据湖中的数据读取到 Flink Table 中进行处理和分析的实现。

三、创建表

FlinkSQL 从建表 SQL 语句中解析出表名,表的 Schema,表的属性等信息信息,调用 Catalog Api 进行建表。

1
2
3
4
5
CREATE TABLE local.iceberg_db.table_demo (
id bigint,
data string
) USING parquet
-- USING 语句用来指定数据文件的格式,支持的选项有 parquet, ocr, avro, iceberg,默认是 parquet

四、写入

由于 Iceberg 采用乐观锁的方式来实现 Transaction 的提交。

也就是说两个任务 task1, task2 同时提交更改事务到 Iceberg 时,后开始的一方 task2 会不断重试,等 task1 顺利提交之后,再重新读取 metadata 信息提交 transaction。

考虑到这一点,采用多个并发算子去提交 Transaction 是不合适的,容易造成大量事务冲突,导致重试。所以,Iceberg 将写入流程拆成了两个算子:

  1. IcebergStreamWriter 主要用来写入记录到对应的 avro、 parquet、orc 文件,生成一个对应的 Iceberg DataFile。并发送给下游算子
  2. IcebergFilesCommitter 主要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Iceberg,完成本次 checkpoint 的数据写入。

因此 Iceberg 只能做近实时的入湖:

  • Iceberg 提交 Transaction 时是以文件粒度来提交的,无法以秒为单位提交 Transaction,否则会造成文件数量膨胀
  • Flink 写入以 Checkpoint 为单位,物理数据写入 Iceberg 之后并不能直接查询,当触发 Checkpoint 之后才会写 Metadata 文件,这时数据由不可见变为可见,Checkpoint 每次执行也会需要一定时间。

https://blog.csdn.net/john1337/article/details/118887581

https://cloud.tencent.com/developer/article/2016584

在 Iceberg 中并没有其他任何其他第三方服务的依赖,而 Hudi 在某些方面做了一些 service 的抽象,如将 metastore 抽象为独立的 Timeline,这可能会依赖一些独立的索引甚至是其他的外部服务来完成。

4.1. 数据写入

IcebergStreamWriter 的设计比较简单,主要任务是把 Record 转换成 Iceberg DataFile。

4.2. 元数据提交

IcebergFilesCommitter 相对复杂一点,为每个 checkpointId维护了一个 DataFile 文件列表,即

这样即使中间有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到Iceberg 表中。

五、总结