一、概述

Hudi 支持在写入时自动清理未成功提交的数据。 Apache Hudi 在写入时引入标记 $marker$ 机制来有效跟踪写入存储的数据文件。

ACID 四属性的关系可大概表述为:原子性是要求,一致性是目标,隔离性是手段,持久性是结果。如何做到原子性的隔离是实现事务的重中之重。在宏观上,实现事务特性是通过并发控制。在微观上,实现事务要靠隔离

二、设计

Hudi 中的 $marker$ 机制是一个表示存储中存在对应的数据文件的标签,Hudi 使用它在故障和回滚场景中自动清理未提交的数据。

每个标记条目由三部分组成:

  1. 数据文件名

  2. 标记扩展名 (.marker)

  3. 创建文件的 I/O 操作(CREATE: 插入、MERGE: 更新/删除 或 APPEND 两者之一)

例如标记 91245ce3-b682-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet.marker.CREATE 指示:

  1. 相应的数据文件是 91245ce3-b682-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet
  2. IO 类型是 CREATE

在写入每个数据文件之前,Hudi 写入客户端首先在存储中创建并持久化一个标记,在提交成功后会被写入容户端显式删除。

标记对于写客户端有效地执行不同的操作很有用,标记主要有如下两个作用:

  1. 删除重复/部分数据文件

    通过 Spark 写入Hudi 时会有多个 Executor 进行并发写入。一个 Executor 可能失败,留下部分数据文件写入,在这种情況下 Spark 会重试 Task,当启用 speculative.execution 时,可以有多次 attempts 成功将相同的数据写入不同的文件,但最终只有一次 attempt 会交给 Spark Driver 程序进程进行提交。
    标记有助于有效识别写入的部分数据文件,其中包含与后来成功写入的数据文件相比的重复数据,并在写入和提交完成之前清理这些重复的数据文件。

  2. 回滚失败的提交

    写入时可能在中间失败,留下部分写入的数据文件。在这种情况下,标记条目会在提交失败时保留在存储中。在接下来的写操作中,写客户端首先回滚失败的提交,通过标记识别这些提交中写入的数据文件并删除它们。

2.1. Direct marker 机制

现有的标记机制简单地创建与每个数据文件相对应的新标记文件,标记文件名如前面所述。每个 marker 文件被写入在相同的目录层次结构中,即提交即时分区路径,在Hudi表的基本路径下的临时文件夹 .hoodie/.temp 下。例如,下图显示了向 Hudi 表写入数据时创建的标记文件和相应数据文件的示例。在获取或删除所有marker文件路径时,该机制首先列出临时文件夹.hoodie/.temp/<commit_instant>下的所有路径,然后进行操作。

虽然扫描整个表以查找未提交的数据文件效率更高,但随着要写入的数据文件数量的增加,要创建的标记文件的数量也会增加。这可能会为 AWS S3 等云存储带来性能瓶颈。在 AWS S3 中,每个文件创建和删除调用都会触发一个 HTTP 请求,并且对存储桶中每个前缀每秒可以处理的请求数有速率限制。当并发写入的数据文件数量和 marker 文件数量巨大时,marker 文件的操作会成为写入性能的显着性能瓶颈。而在像 HDFS 这样的存储上,用户可能几乎不会注意到这一点,其中文件系统元数据被有效地缓存在内存中。

2.2. 基于时间线服务器的标记机制

利用时间线服务器的新标记机制优化了存储标记的相关延迟。Hudi 中的时间线服务器用作提供文件系统和时间线视图。如下图所示,新的基于时间线服务器的标记机制将标记创建和其他标记相关操作从各个执行器委托给时间线服务器进行集中处理。时间线服务器在内存中为相应的标记请求维护创建的标记,时间线服务器通过定期将内存标记刷新到存储中有限数量的底层文件来实现一致性。通过这种方式,即使数据文件数量庞大,也可以显着减少与标记相关的实际文件操作次数和延迟,从而提高写入性能。

为了提高处理标记创建请求的效率,我们设计了在时间线服务器上批量处理标记请求。每个标记创建请求在 Javalin 时间线服务器中异步处理,并在处理前排队。对于每个批处理间隔,例如 20 毫秒,调度线程从队列中拉出待处理的请求并将它们发送到工作线程进行处理。每个工作线程处理标记创建请求,并通过重写存储标记的底层文件。有多个工作线程并发运行,考虑到文件覆盖的时间比批处理时间长,每个工作线程写入一个不被其他线程触及的独占文件以保证一致性和正确性。批处理间隔和工作线程数都可以通过写入选项进行配置。

请注意工作线程始终通过将请求中的标记名称与时间线服务器上维护的所有标记的内存副本进行比较来检查标记是否已经创建。存储标记的底层文件仅在第一个标记请求(延迟加载)时读取。请求的响应只有在新标记刷新到文件后才会返回,以便在时间线服务器故障的情况下,时间线服务器可以恢复已经创建的标记。这些确保存储和内存中副本之间的一致性,并提高处理标记请求的性能。

三、实现

3.1. WriteMarkers

3.2. DirectWriteMarkers

3.3. TimelineServerBasedWriteMarkers