升级流式处理流水线

本页面提供有关升级流式处理流水线的指导和建议。例如,您可能需要升级到较新版本的 Apache Beam SDK,或者可能需要更新流水线代码。我们提供了不同的选项来适应不同的场景。

批量流水线会在作业完成时停止,而流式处理流水线通常需要连续运行才能提供无间断处理。因此,在升级流式处理流水线时,您需要考虑以下注意事项:

  • 您可能需要最大限度减少或避免流水线中断。在某些情况下,在部署新版本的流水线时,您或许可以容忍出现临时性处理中断。在其他情况下,您的应用可能无法承受任何中断。
  • 流水线更新流程需要处理架构更改,以最大限度地减少消息处理和其他附加系统的中断。例如,如果事件处理流水线中消息的架构发生更改,则下游数据接收器中可能也需要进行架构更改。

您可以使用下列方法之一来更新流式处理流水线,具体取决于您的流水线和更新要求:

如需详细了解更新期间可能遇到的问题以及如何防止这些问题,请参阅验证替换作业作业兼容性检查

最佳做法

  • 以独立于流水线代码更改的方式升级 Apache Beam SDK 版本。
  • 每次更改后测试流水线,然后再进行其他更新。
  • 定期升级流水线使用的 Apache Beam SDK 版本。

执行运行中更新

您可以更新某些正在进行的流式处理流水线,而无需停止作业。该方案称为运行中作业更新。运行中作业更新仅在以下有限的情况下可用:

  • 该作业必须使用 Streaming Engine。
  • 作业必须处于运行状态。
  • 您只需更改作业使用的工作器数量。

如需了解详情,请参阅“横向自动扩缩”页面中的设置自动扩缩范围

如需了解如何执行运行中作业更新,请参阅更新现有流水线

启动替换作业

如果更新后的作业与现有作业兼容,您可以使用 update 选项更新流水线。替换现有作业后,新作业将运行更新后的流水线代码。Dataflow 服务将保留作业名称,不过会使用更新后的作业 ID 运行替换作业。此过程可能会在现有作业停止、兼容性检查运行以及新作业启动时产生停机时间。如需了解详情,请参阅替换作业的影响

Dataflow 执行兼容性检查,以确保可以安全地将更新后的流水线代码部署到正在运行的流水线。某些代码更改会导致兼容性检查失败,例如在现有步骤中添加或移除辅助输入时。如果兼容性检查失败,您将无法执行就地作业更新。

如需了解如何启动替换作业,请参阅启动替换作业

如果流水线更新与当前作业不兼容,则需要停止并替换流水线。如果您的流水线无法容忍停机,请运行并行流水线

停止和替换流水线

如果您可以暂停处理,则可以取消排空该流水线,然后将其替换为更新后的流水线。取消流水线会导致 Dataflow 立即暂停处理并尽快关闭资源,这可能会导致正在处理的数据(称为运行中数据)丢失。为了避免数据丢失,在大多数情况下,排空都是首选操作。

排空流水线会立即关闭所有正在处理的窗口,并取消所有触发器。虽然运行中数据不会丢失,但排空可能会导致窗口的数据不完整。如果发生这种情况,进程内窗口会发出部分或不完整结果。如需了解详情,请参阅排空作业的影响。现有作业完成后,您可以启动包含更新后的流水线代码的新流式处理作业,以便继续处理。

使用此方法,从现有流式处理作业停止到替换流水线可以继续处理数据之间存在一些停机时间。但是,取消或排空现有流水线,然后使用更新后的流水线启动新作业,比运行并行流水线简单一些。

如需查看更详细的说明,请参阅排空 Dataflow 作业。排空当前作业后,启动具有相同作业名称的新作业。

使用 Pub/Sub 快照和还原功能重新处理消息

在某些情况下,替换或取消排空的流水线后,您可能需要重新处理先前交付的 Pub/Sub 消息。例如,您可能需要使用更新后的业务逻辑来重新处理数据。Pub/Sub 还原功能允许您从 Pub/Sub 快照重放消息。您可以将 Pub/Sub 还原与 Dataflow 搭配使用,以从创建订阅快照的时间开始重新处理消息。

在开发和测试期间,您还可以使用 Pub/Sub 还原反复重放已知消息,以验证流水线的输出。使用 Pub/Sub 还原时,不要在流水线消耗订阅时还原订阅快照。否则此类还原操作会导致 Dataflow 的水印逻辑失效,并可能会影响 Pub/Sub 消息的一次性处理。

在终端窗口中将 Pub/Sub 还原与 Dataflow 流水线搭配使用的推荐 gcloud CLI 工作流如下所示:

  1. 要创建订阅的快照,请使用 gcloud pubsub snapshots create 命令:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. 如需排空或取消流水线,请使用 gcloud dataflow jobs drain 命令或 gcloud dataflow jobs cancel 命令:

    gcloud dataflow jobs drain JOB_ID
    

    gcloud dataflow jobs cancel JOB_ID
    
  3. 如需还原至快照,请使用 gcloud pubsub subscriptions seek 命令:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. 部署消耗订阅的新流水线。

运行并行流水线

如果您需要避免流式处理流水线在更新时产生中断,请运行并行流水线。使用更新后的流水线代码创建新的流式传输作业,并与现有流水线并行运行新流水线。

创建新流水线时,使用与现有流水线相同的数据选取策略。让现有流水线继续运行,直到其水印超过更新后流水线处理的最早完整窗口的时间戳。然后,排空或取消现有流水线。更新后的流水线会继续在原位置运行,并实际自行接管处理。

下图演示了此流程。

流水线 B 与流水线 B 重叠 5 分钟窗口。

在该图中,流水线 B 是沿用流水线 A 的已更新作业。值 t流水线 B 处理的最早完整窗口的时间戳。值 w流水线 A 的水印。为简单起见,假定完美的水印没有延迟数据。处理时间和实际用时在水平轴上表示。两个流水线都使用五分钟的固定(翻滚)窗口。结果会在水印经过每个时间段的末尾时触发。

由于在两个流水线重叠的时间段内发生并发输出,因此请配置两个流水线以将结果写入不同的目的地。然后,下游系统可以对两个目标接收器(例如数据库视图)使用抽象来查询组合结果。这些系统还可以使用抽象对重叠期中的结果进行重复信息删除。

以下示例介绍如何使用流水线从 Pub/Sub 读取输入数据、执行一些处理并将结果写入 BigQuery 的方法。

  1. 在初始状态下,现有流式处理流水线(流水线 A)正在使用订阅(订阅 A)运行并从 Pub/Sub 主题(主题)中读取消息。系统会将结果写入 BigQuery 表(表 A)。结果可通过 BigQuery 视图消耗,该视图充当遮盖底层表更改的表层。此过程称为表层模式的设计方法应用。下图显示了初始状态。

    包含 1 个订阅并写入单个 BigQuery 表的 1 个流水线。

  2. 为更新后的流水线创建新的订阅(订阅 B)。部署更新后的流水线(流水线 B),该流水线使用订阅 B 从 Pub/Sub 主题(主题)读取内容,并写入单独的 BigQuery 表(表 B)。下图演示了此流程。

    2 个流水线,每个流水线各有 1 个订阅。每个流水线都会写入单独的 BigQuery 表。表层视图从这两个表读取。

    此时,流水线 A流水线 B 正并行运行并将结果写入单独表。您可以将时间 t 记录为流水线 B 处理的最早完整时间段的时间戳。

  3. 流水线 A 的水印超过时间 t 时,排空流水线 A。排空流水线时,所有打开的窗口都会关闭,并且运行中数据的处理将完成。如果完整时间段很重要(假设没有延迟数据),请在排空流水线 A 之前让两个流水线运行,直到完成重叠时间段为止。处理完所有运行中数据并将其写入表 A 后,停止流水线 A 的流式处理作业。下图展示了此阶段。

    流水线 A 排空,不再读取订阅 A,且在排空完成后,不再将数据发送到表 A。所有处理均由第二个流水线处理。

  4. 此时,只有流水线 B 正在运行。您可以从 BigQuery 视图(表层视图)进行查询,该视图充当表 A表 B 的表层。对于在两个表中具有相同时间戳的行,请将试图配置为返回表 B 中的行,或者回退到表 A(如果表 B 中不存在这些行)下图显示了从表 A表 B 中读取内容的视图(表层视图)。

    流水线 A 已不存在,仅流水线 B 运行。

    此时,您可以删除订阅 A

当在新流水线部署中检测到问题时,使用并行流水线可以简化回滚操作。在该示例中,您可能需要在监控流水线 B 是否正常运行时,使流水线 A 保持运行状态。如果流水线 B 出现任何问题,您可以回滚到流水线 A

处理架构变更

数据处理系统通常需要随着时间的推移适应架构变更,有时是由于业务要求的更改,或者出于技术原因。应用架构更新通常需要仔细规划和执行,以避免中断业务信息系统。

假设流水线从 Pub/Sub 主题读取包含 JSON 载荷的消息。该流水线将每条消息转换为 TableRow 实例,然后将行写入 BigQuery 表中。输出表的架构类似于流水线处理的消息。在下图中,架构是指架构 A

使用架构 A 读取订阅和写入 BigQuery 输出表的流水线。

随着时间的推移,消息架构可能会发生各种重要的变化。例如添加、移除或替换字段。架构 A 演变为一种新的架构。在下面的讨论中,我们把这个新架构称为架构 B。在这种情况下,需要更新流水线 A,并且输出表架构需要支持架构 B

对于输出表,您可以在不产生停机时间的情况下执行一些架构变更。例如,您可以添加新字段或放宽列模式(例如将 REQUIRED 更改为 NULLABLE),且不产生停机时间。这些变更通常不会影响现有查询。但是,修改或移除现有架构字段的架构变更会破坏查询或导致其他中断。以下方法可在不产生停机时间的情况下适应更改。

将流水线写入主表的数据和写入一个或多个暂存表的数据分开。主表会存储流水线写入的历史数据。暂存表会存储最新的流水线输出。您可以对主表和暂存表定义 BigQuery 表层视图,以允许消费者查询历史数据和最新数据。

下图修订了上一个流水线流程,以添加暂存表(暂存表 A)、主表和表层视图。

读取订阅和写入 BigQuery 暂存表的流水线。第二个(主)表具有先前架构版本的输出。表层视图从暂存表和主表中读取。

在修订后的流程中,流水线 A 处理使用架构 A 的消息,并将输出写入具有兼容架构的暂存表 A。主表包含先前流水线版本写入的历史数据,以及定期从暂存表合并的数据。消费者可以使用表层视图查询最新数据,包括历史数据和实时数据。

当消息架构从架构 A 变更为架构 B 时,您可以更新流水线代码以与使用架构 B 的消息兼容。需要使用新实现更新现有流水线。通过运行并行流水线,您可以确保流式处理数据处理不会中断。终止和替换流水线会导致处理中断,因为在一段时间内没有流水线运行。

更新后的流水线会写入使用架构 B 的其他暂存表(暂存表 B)。在更新流水线之前,您可以使用已编排的工作流创建新的暂存表。更新表层视图以包含新的暂存表中的结果,这有可能采用相关的工作流步骤。

下图显示了更新后的流程,其中显示了暂存表 B架构 B,以及如何更新表层视图,以包含来自主表和两个暂存表的内容。

流水线现在使用架构 B 并写入暂存表 B。表层视图从主表、暂存表 A 和暂存表 B 读取。

作为流水线更新中的单独流程,您可以定期或根据需要将暂存表合并到主表中。下图显示了如何将暂存表 A 合并到主表中。

暂存表 A 已合并到主表中。表层视图从暂存表 B 和主表中读取内容。

后续步骤