Dataflow 中的“正好一次”处理

Dataflow 支持对记录进行“正好一次”处理。本页面介绍了 Dataflow 如何实现“正好一次”处理,同时确保较短的延迟时间。

概览

批处理流水线始终使用“正好一次”处理。流处理流水线默认使用“正好一次”处理,但也可以使用“至少一次”处理

“正好一次”处理可确保处理记录的结果,包括每个流水线阶段的结果。具体而言,对于从来源到达流水线或从上一阶段到达此阶段的每条记录,Dataflow 可确保以下各项:

  • 系统会处理该记录,不会丢失。
  • 任何在流水线中保留的处理结果最多会反映一次。

换句话说,记录至少会被处理一次,并且结果会被提交正好一次。

“正好一次”处理可确保结果准确无误,并且输出中不会有重复记录。Dataflow 经过优化,可以最大限度地缩短延迟时间,同时保持“正好一次”语义。不过,“正好一次”处理仍然会产生执行去重操作的费用。对于可以容忍重复记录的应用场景,通常可以通过启用“至少一次”模式来降低费用并缩短延迟时间。如需详细了解如何在“正好一次”和“至少一次”流处理之间进行选择,请参阅设置流水线流处理模式

迟到数据

“正好一次”处理可确保流水线的准确性:如果流水线处理了记录,则 Dataflow 会确保该记录反映在输出中,并且该记录不会重复。

不过,在流式处理流水线中,“正好一次”处理无法保证结果完整,因为记录可能会延迟到达。举例来说,假设您的流水线在某个时间范围执行聚合,例如 Count。使用“正好一次”处理时,及时到达该时间范围的记录的结果是准确的,但延迟记录可能会被丢弃。

一般来说,无法保证流式处理流水线中的记录完整性,因为从理论上讲,记录可能会延迟到达。在极端情况下,您需要无限期地等待结果。实际上,Apache Beam 可让您配置丢弃延迟数据的阈值以及何时发出聚合结果。如需了解详情,请参阅 Apache Beam 文档中的水印和延迟数据部分。

副作用

副作用不保证具有“正好一次”语义。重要的是,这包括将输出写入外部存储空间,除非接收器也实现了“恰好一次”语义。

具体来说,Dataflow 不保证每条记录正好经历每个转换一次。由于重试或工作器失败,Dataflow 可能会通过转换多次发送记录,甚至在多个工作器上同时发送记录。

作为“正好一次”处理的一部分,Dataflow 会对输出内容进行重复信息删除。但是,如果转换中的代码有副作用,则这些副作用可能会多次发生。例如,如果转换会进行远程服务调用,则系统可能会针对同一记录多次进行该调用。在某些情况下,副作用甚至可能会导致数据丢失。例如,假设转换读取文件以生成输出,然后立即删除文件,而不会等待输出提交。如果在提交结果时发生错误,Dataflow 会重试转换,但现在转换无法读取已删除的文件。

日志记录

处理输出的日志表示处理已发生,但并未表明数据是否已提交。因此,即使已处理数据的结果仅提交到永久性存储空间一次,日志文件也可能会表明数据已多次处理。此外,日志并不总是反映已处理和已提交的数据。日志可能因限制而被丢弃或因其他日志记录服务问题而丢失。

“正好一次”流处理

本部分介绍 Dataflow 如何对流式作业实现“正好一次”处理,包括 Dataflow 如何管理非确定性处理、延迟数据和自定义代码等各种复杂情况。

Dataflow 流式重排

通过为每个工作器分配工作范围,流式 Dataflow 作业可以在许多不同的工作器上并行运行。虽然分配可能会随着时间的推移而变化,以响应工作器故障、自动扩缩或其他事件,但在每次 GroupByKey 转换后,具有相同键的所有记录都将在同一工作器上处理。复合转换(例如 CountFileIO 等)通常会使用 GroupByKey 转换。为确保给定键的记录最终位于同一个工作器上,Dataflow 工作器使用远程过程调用 (RPC) 在它们自身之间重排数据。

为了保证在重排期间记录不会丢失,Dataflow 会使用上游备份。若使用上游备份,发送记录的工作器会重试 RPC,直到获得对收到记录的肯定确认。 处理记录的副作用将提交到下游的永久性存储空间。如果发送记录的工作器不可用,Dataflow 会继续重试 RPC,以确保每条记录至少传送一次。

由于这些重试可能会创建重复项,因此每条消息都标有唯一 ID。每个接收器都会存储已看到和处理的所有 ID 的目录。收到记录后,Dataflow 会在目录中查找其 ID。如果找到该 ID,则说明记录已被接收并提交,并且将作为重复记录丢弃。为确保记录 ID 的稳定性,每个步骤的每项输出都会保存到存储空间。因此,如果由于重复的 RPC 调用而多次发送同一消息,则该消息仅会提交到存储空间一次。

确保较短的延迟时间

为了实现“正好一次”处理,必须降低 I/O,尤其是通过阻止每条记录的 I/O 来进行降低。为了实现此目标,Dataflow 会使用布隆过滤器和垃圾回收功能。

Bloom 过滤器

布隆过滤器是紧凑的数据结构,允许快速设置成员资格检查。在 Dataflow 中,每个工作器都会保留它看到的每个 ID 的布隆过滤器。当新的记录 ID 到达时,工作器会在过滤器中查找该 ID。如果过滤器返回 false,则此记录不是重复记录,并且工作器不会在稳定存储空间中查找该 ID。

Dataflow 会存储一组按时间划分的滚动布隆过滤器。记录到达后,Dataflow 会根据系统时间戳选择合适的过滤器进行检查。此步骤可防止布隆过滤器在系统进行垃圾回收时达到饱和状态,并且还会限制在启动时需要扫描的数据量。

垃圾回收

为了避免存储空间被记录 ID 填满,Dataflow 会使用垃圾回收来移除旧记录。Dataflow 使用系统时间戳计算垃圾回收水印。

此水印基于给定阶段等待的物理时间。因此,它还提供了有关流水线哪些部分运行缓慢的信息。此元数据是 Dataflow 监控界面中显示的系统延隔指标的基础。

如果记录到达的时间戳早于水印,并且此时间的 ID 已被系统执行垃圾回收,则忽略该记录。由于触发垃圾回收的低水印在确认记录递送之前不会前进,因此这些延迟到达的记录是重复的。

非确定性来源

Dataflow 使用 Apache Beam SDK 将数据读入流水线。如果处理失败,Dataflow 可能会重试从来源读取数据。在这种情况下,Dataflow 需要确保源生成的每条唯一记录都正好记录一次。对于确定性来源(例如 Pub/Sub Lite 或 Kafka),系统会根据记录的偏移量读取记录,从而减少对此步骤的需求。

由于 Dataflow 无法自动分配记录 ID,因此非确定性来源必须向 Dataflow 告知记录 ID 以避免重复。当来源为每条记录提供唯一 ID 时,连接器会在流水线中使用重排来移除重复记录。具有相同 ID 的记录会被滤除。 如需查看示例,了解 Dataflow 在使用 Pub/Sub 作为来源时如何实现“正好一次”处理,请参阅“使用 Pub/Sub 进行流式传输”页面中的“正好一次”处理部分。

在流水线中执行自定义 DoFn 时,Dataflow 不保证此代码仅针对每条记录运行一次。为了保证在工作器故障时至少处理一次,Dataflow 可能会通过转换多次来运行给定记录,或者可能会在多个工作器上同时运行同一记录。如果您在流水线中添加可以执行像联系外部服务之类的操作的代码,则这些操作可能会针对给定记录运行多次。

如需使非确定性处理有效地确定性,请使用检查点。使用检查点时,转换的每项输出都会与其唯一 ID 保存到稳定的存储空间,然后再传送到下一阶段。在 Dataflow 的重排传送中重试时,会中继已设置检查点的输出。 虽然您的代码可能会运行多次,但 Dataflow 可确保仅存储其中一个运行的输出。 Dataflow 使用一致存储区,以防止重复数据写入稳定存储空间。

“正好一次”输出传送

Apache Beam SDK 包含内置接收器,可确保它们不会产生重复项。请尽可能使用其中一个内置接收器。

如果您需要编写自己的接收器,最好的方法是使函数对象具有幂等性,以便它可以根据需要经常重试,而不会导致意外的副作用。 但是,实现接收器功能的转换的某些组件通常是不确定的,并且在重试时可能会发生变化。

例如,在窗口化聚合中,窗口中的记录集可能是不确定的。具体而言,窗口可能会尝试使用元素 e0、e1、e2 触发。工作器在提交窗口处理之前可能会崩溃,但不会在这些元素作为副作用发送之前崩溃。当工作器重启时,该窗口会再次触发,并且延迟元素 e3 到达。由于此元素在提交窗口之前到达,不会计为延迟数据,因此系统会使用元素 e0、e1、e2、e3 再次调用 DoFn。然后,这些元素会发送到副作用操作。在这种情况下,幂等性无济于事,因为每次发送的逻辑记录集都不同。

如需解决 Dataflow 中的不确定性问题,请使用内置的 Reshuffle 转换。当 Dataflow 对数据进行随机排序时,会持久地写入数据,以便在随机排序发生后重试操作时,任何非确定性生成的元素都保持稳定。使用 Reshuffle 转换可确保只有一个版本的 DoFn 输出可以越过随机播放分界线。以下模式可确保副作用操作始终收到要输出的确定性记录:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

要确保 Dataflow 运行程序知道在执行 DoFn 之前元素必须稳定,请将 RequiresStableInput 注解添加到 DoFn

了解详情