本页介绍了在 Dataflow 中从 Pub/Sub 读取数据的最佳实践。
Apache Beam 提供了 Pub/Sub I/O 连接器的参考实现,供非 Dataflow 运行程序使用。不过,Dataflow 运行程序会使用自己的连接器自定义实现。此实现方案利用 Google Cloud 内部 API 和服务,为正好一次消息处理提供低延迟水印、高水印精度和高效去重。该连接器适用于 Java、Python 和 Go。
一次性处理
Pub/Sub 可将事件发布方与事件使用方分离开来。应用会将消息发布到主题,而 Pub/Sub 会异步将消息传送给订阅方。
Pub/Sub 会为成功发布到主题的每条消息分配一个唯一的消息 ID。默认情况下,Pub/Sub 执行“至少一次”消息传送。为了实现至少一次语义,如果 Pub/Sub 在确认时限内未收到订阅方的确认,则会重试传送。重试可能会导致消息被传送多次。例如,如果订阅方在截止日期之后确认,或者确认因暂时性网络问题而丢失,则可能会发生重新传送。
如果您使用正好一次流式处理模式运行 Dataflow 流水线,Dataflow 会去重消息以实现“正好一次”语义。如果您的流水线可以容忍一些重复记录,请考虑改用“至少一次”流处理模式。此模式可以显著降低流水线的延迟时间和总费用。但代价是,某些消息可能会被处理两次。如需了解详情,请参阅选择要使用的流式传输模式。
按消息属性去重
默认情况下,Dataflow 会根据消息 ID 删除重复消息。不过,应用可能会将同一记录作为两个不同的 Pub/Sub 消息发送两次。例如,原始来源数据可能包含重复的记录,或者应用可能会错误地将同一消息发布两次。如果确认消息因网络问题或其他中断而丢失,则可能会因重试而发生后一种情况。在这些情况下,重复消息具有不同的消息 ID。
根据您的具体情况,您的数据可能包含一个可用于去重操作的唯一字段。例如,记录可能包含唯一的交易 ID。您可以将 Pub/Sub I/O 连接器配置为根据消息属性的值(而非使用 Pub/Sub 消息 ID)来删除重复消息。只要发布方在重试期间始终设置此属性,Dataflow 便可以检测到重复项。消息必须在 10 分钟内发布到 Pub/Sub,以便进行重复信息删除。
如需详细了解如何使用 ID 属性,请参阅以下 SDK 参考主题:
withIdAttribute
(Java)ReadFromPubSub
(Python)ReadOptions
(Go)
订阅
配置流水线时,您可以指定要读取的 Pub/Sub 主题或 Pub/Sub 订阅。如果您指定了订阅,请勿将同一 Pub/Sub 订阅用于多个流水线。如果两个流水线从单个订阅中读取数据,则每个流水线都会以非确定性方式接收部分数据,这可能会导致重复消息、水印延迟和自动扩缩效率低下。请为每个流水线创建单独的订阅。
如果您指定了主题,则该连接器会创建新的临时订阅。此订阅在每个流水线中都是唯一的。
时间戳和水印
所有 Pub/Sub 消息都有一个时间戳,表示 Pub/Sub 收到消息的时间。您的数据可能还有事件时间戳,即记录由来源生成的时间。
您可以将连接器配置为从 Pub/Sub 消息的属性中读取事件时间戳。在这种情况下,连接器会使用事件时间戳进行水印处理。否则,默认情况下,它会使用 Pub/Sub 消息时间戳。
如需详细了解如何使用事件时间戳,请参阅以下 SDK 参考主题:
withTimestampAttribute
(Java)ReadFromPubSub
(Python)ReadOptions
(Go)
Pub/Sub 连接器可以访问 Pub/Sub 的专有 API,该 API 可提供订阅中最早的未确认消息的存留时间。此 API 的延迟时间比 Cloud Monitoring 中的延迟时间更短。这使得 Dataflow 可以将流水线水印提前并以低延迟时间发出窗口计算结果。
如果您将连接器配置为使用事件时间戳,Dataflow 会创建第二个 Pub/Sub 订阅。它使用此订阅检查仍在积压中的消息的事件时间。此方法可让 Dataflow 准确估算事件时间积压。如需了解详情,请参阅介绍 Dataflow 如何计算 Pub/Sub 水印的 StackOverflow 页面。
Pub/Sub 还原
借助 Pub/Sub 还原功能,用户可以重放以前确认的消息。您可以将 Pub/Sub 还原与 Dataflow 搭配使用,以重新处理流水线中的消息。
不过,不建议在正在运行的流水线中使用 Pub/Sub 还原功能。在正在运行的流水线中向后还原可能会导致重复消息或消息丢失。它还会使 Dataflow 的水印逻辑失效,并且与包含已处理数据的流水线状态相冲突。
如需使用 Pub/Sub 还原功能重新处理消息,建议采用以下工作流:
- 创建订阅的快照:
- 为 Pub/Sub 主题创建新订阅。新订阅会继承快照。
- 排空或取消当前 Dataflow 作业。
- 使用新订阅重新提交流水线。
如需了解详情,请参阅使用 Pub/Sub 快照和还原功能重新处理消息。
不支持的 Pub/Sub 功能
Dataflow 运行程序的 Pub/Sub I/O 连接器实现不支持以下 Pub/Sub 功能。
指数退避算法
创建 Pub/Sub 订阅时,您可以将其配置为使用指数退避算法重试政策。不过,指数退避算法不适用于 Dataflow。请改为使用立即重试重试政策创建订阅。
指数退避算法会在收到否定确认或确认截止时间到期时触发。不过,Dataflow 不会在流水线代码失败时发送否定确认。相反,Dataflow 会无限地重试消息处理,同时不断延长消息的确认时限。
死信主题
请勿将 Pub/Sub 死信主题与 Dataflow 搭配使用,原因如下:
Dataflow 会出于各种内部原因(例如,如果某个 worker 正在关闭)而发送否定确认。因此,即使流水线代码中没有故障,消息也有可能传送至死信主题。
Dataflow 可能会在流水线完全处理数据之前确认消息。具体来说,Dataflow 会在第一个融合阶段成功处理消息后对其进行确认,并且该处理过程的副作用已写入永久性存储空间。如果流水线有多个融合阶段,并且在第一个阶段之后的任何时间点发生故障,则消息已确认,不会发送到死信主题。
请改为在流水线中显式实现死信模式。某些 I/O 接收器内置了对死信队列的支持。以下示例实现了死信模式;
Pub/Sub“正好一次”传送
由于 Dataflow 具有自己的一次性处理机制,因此不建议将 Pub/Sub 一次性传送与 Dataflow 搭配使用。启用 Pub/Sub“正好一次”传送会降低流水线性能,因为它限制了可用于并行处理的消息数量。
Pub/Sub 消息排序
消息排序是 Pub/Sub 的一项功能,可让订阅方按消息发布的顺序接收消息。
不建议将消息排序与 Dataflow 搭配使用,原因如下:
- Pub/Sub I/O 连接器可能无法保留消息顺序。
- Apache Beam 未定义有关处理元素的顺序的严格准则。因此,顺序可能不会在下游转换中保留。
- 将 Pub/Sub 消息排序与 Dataflow 搭配使用可能会增加延迟时间并降低性能。
后续步骤
- 使用 Pub/Sub 和 Dataflow 进行流处理:Qwik Start(自学实验)
- 从 Pub/Sub 流式传输到 BigQuery
- 使用 Dataflow 从 Pub/Sub 流式传输消息
- 流处理流水线
- Dataflow 中的“正好一次”处理
- Lambda 之后:Cloud Dataflow 中的“正好一次”处理第 1 部分和第 3 部分:来源和接收器(博客)