更新现有流水线

Apache Beam SDK 可以使用新的流水线代码在 Dataflow 托管式服务更新正在运行的流式作业

您可能因下列多种原因而需要更新现有的 Dataflow 作业:

  • 需要增强或改进流水线代码。
  • 需要修复流水线代码中的错误。
  • 需要通过更新流水线来处理数据格式的更改,或者处理数据源中的版本更改或其他更改。
  • 您希望为所有 Dataflow 工作器修补与 Container-Optimized OS 相关的安全漏洞。
  • 您希望对流处理 Apache Beam 流水线进行扩缩,以便使用不同数量的工作器。如需查看说明和限制,请参阅在流处理模式下手动扩缩

在更新作业时,Dataflow 服务会在当前运行的作业与潜在的替换作业之间执行兼容性检查。兼容性检查可确保中间状态信息和已缓冲数据等内容从原作业传输到替换作业中。

更新过程及其影响

如果要更新 Dataflow 服务上的作业,您需要将现有作业替换为运行更新后的流水线代码的新作业。Dataflow 服务将保留作业名称,不过会使用更新后的作业 ID 运行替换作业。

替换作业会保留来自原作业的所有中间状态数据,以及来自原作业的已缓冲数据记录或当前“运行中”的元数据。例如,流水线中的某些记录在等待数据选取功能解析的过程中可能已进行了缓冲。

运行中数据

“运行中”数据仍由新流水线中的转换进行处理。 不过,您添加到替换流水线代码中的其他转换可能会生效,也可能不会生效,具体取决于记录的缓冲位置。例如,假设您现有的流水线包含下列转换:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

您可以将您的作业替换为新的流水线代码,如下所示:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

即使您添加了一个转换来过滤掉以字母“A”开头的字符串,下一个转换 (FormatStrings) 可能仍会看到从原作业传输过来的以“A”开头的已缓冲字符串或运行中字符串。

更改数据选取策略

您可以为替换流水线中的 PCollection 更改数据选取触发器策略,但务必谨慎。更改数据选取或触发器策略不会影响已缓冲数据或其他运行中数据。

建议您仅尝试对流水线的数据选取策略进行少量更改,例如更改固定时间窗口或滑动时间窗口的持续时间。如果对数据选取或触发器进行重大更改(例如更改数据选取算法),则可能会对流水线输出造成不可预测的结果。

启动替换作业

要更新作业,您需要启动新的作业以替换正在进行的作业。 为执行更新过程,您在启动替换作业时,除了设置作业的常规选项外,还需要设置下列流水线选项:

Java

  • 传递 --update 选项。
  • 将 PipelineOptions 中的 --jobName 选项设置为与待更新作业相同的名称。
  • --region 选项设置为要更新的作业的地区。
  • 如果流水线中有任何转换名称已更改,则必须提供转换映射,并使用 --transformNameMapping 选项传递它。

Python

  • 传递 --update 选项。
  • 将 PipelineOptions 中的 --job_name 选项设置为与待更新作业相同的名称。
  • --region 选项设置为要更新的作业的地区。
  • 如果流水线中有任何转换名称已更改,则必须提供转换映射,并使用 --transform_name_mapping 选项传递它。

指定替换作业名称

Java

启动替换作业时,您为 --jobName 选项传递的值必须与待替换作业的名称完全匹配。

Python

启动替换作业时,您为 --job_name 选项传递的值必须与待替换作业的名称完全匹配。

要查找正确的作业名称值,请在 Dataflow 监控界面中选择您的原作业,然后在作业信息侧边栏中找到作业名称字段:

图 1:一个正在运行的 Dataflow 作业的“作业信息”侧边栏,其中有“作业名称”字段。

或者,您可以使用 Dataflow 命令行界面来查询现有作业的列表。在 Shell 或终端窗口输入 gcloud dataflow jobs list 命令以获取 Google Cloud 项目中的 Dataflow 作业列表,并找到要替换的作业的 NAME 字段:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

创建转换映射

Java

如果您的替换流水线更改了原流水线中的任何转换名称,则 Dataflow 服务需要使用转换映射。转换映射会将原流水线代码中已命名的转换映射到替换流水线代码中的名称。您可以使用 --transformNameMapping 命令行选项按以下通用格式传递映射:

--transformNameMapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

对于在原流水线和替换流水线之间有改动的转换名称,您只需在 --transformNameMapping 中提供映射条目即可。

Python

如果您的替换流水线更改了原流水线中的任何转换名称,则 Dataflow 服务需要使用转换映射。转换映射会将原流水线代码中已命名的转换映射到替换流水线代码中的名称。您可以使用 --transform_name_mapping 命令行选项按以下通用格式传递映射:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

对于在原流水线和替换流水线之间有改动的转换名称,您只需在 --transform_name_mapping 中提供映射条目即可。

确定转换名称

映射中每个实例的转换名称就是您在流水线中应用转换时提供的名称。例如:

Java

.apply("FormatResults", ParDo
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Python

| 'FormatResults' >> beam.ParDo(MyDoFn())

您还可以在 Dataflow 监控界面中检查作业的执行图,以获取原作业的转换名称。

图 2:Dataflow 监控界面中显示的 WordCount 流水线的执行图。

复合转换的命名

流水线中的转换具有层次结构,相应地,转换名称也是分层的。如果您的流水线中包含复合转换,则嵌套转换将根据其外层的转换进行命名。例如,假设流水线包含名为 CountWidgets 的复合转换,此复合转换中包含一个名为 Parse 的内层转换,则该内层转换的全名将是 CountWidgets/Parse,并且您必须在转换映射中指定该全名。

如果新流水线将复合转换映射为其他名称,则所有嵌套转换也将自动重命名。您需要为转换映射中的内层转换指定更改后的名称。

重构转换层次结构

如果替换流水线使用与原流水线不同的转换层次结构(例如,由于您重构了复合转换;或者流水线依赖于来自某个库的复合转换,而该库已更改),则您需要明确声明映射。

例如,假设原流水线应用了一个复合转换 CountWidgets,其中包含一个名为 Parse 的内层转换。现在,假设替换流水线重构了 CountWidgets,并在另一个名为 Scan 的转换中嵌套了 Parse。要成功执行更新,您必须将原流水线的完整转换名称 (CountWidgets/Parse) 明确映射到新流水线的相应转换名称 (CountWidgets/Scan/Parse):

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

如果您在替换流水线中彻底删除了某个转换,则必须提供一个 null 映射。假设替换流水线彻底移除了 CountWidgets/Parse 转换,则应使用以下命令:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

如果您在替换流水线中彻底删除了某个转换,则必须提供一个 null 映射。假设替换流水线彻底移除了 CountWidgets/Parse 转换,则应使用以下命令:

--transform_name_mapping={"CountWidgets/Parse":""}

作业兼容性检查

在启动替换作业时,Dataflow 服务会在替换作业和原作业之间执行兼容性检查。如果兼容性检查通过,则原作业将停止。然后,替换作业将在 Dataflow 服务上启动,同时沿用同一作业名称。如果兼容性检查失败,原作业将继续在 Dataflow 服务上运行,而替换作业将返回错误。

Java

Python

兼容性检查可确保根据您提供的转换映射的指定,Dataflow 服务能够将中间状态数据从原作业中的步骤传输到替换作业。兼容性检查还可确保流水线的 PCollection 使用相同的编码器。更改 Coder 可能会导致兼容性检查失败,这是因为任何运行中数据或已缓冲记录可能无法在替换流水线中正确序列化。

防止兼容性中断问题

原流水线与替换流水线之间的某些差异可能导致兼容性检查失败。这些差异包括:

  • 更改流水线图但未提供映射。更新作业时,Dataflow 服务尝试将原作业中的转换与替换作业中的转换进行匹配,以便传输每个步骤的中间状态数据。如果您已重命名或移除任何步骤,则需要提供转换映射,以便 Dataflow 相应地匹配状态数据。
  • 更改某个步骤的侧边输入。如果向替换流水线中的转换添加侧边输入,或从转换中移除侧边输入,则将导致兼容性检查失败。
  • 更改某个步骤的编码器。在更新作业时,Dataflow 服务会保留当前已缓冲的所有数据记录(例如,当数据选取功能正在解析时),然后在替换作业中处理这些数据记录。如果替换作业使用不同或不兼容的数据编码方式,则 Dataflow 服务将无法对这些记录进行序列化或反序列化。
  • 您从流水线中移除了“有状态”操作。如果您从流水线中移除了某些有状态操作,则替换作业的 Dataflow 兼容性检查可能会失败。Dataflow 服务可将多个步骤组合在一起,以提升效率。如果您从某个组合步骤中移除了与状态相关的操作,则检查将失败。有状态操作包括:

    • 产生或使用辅助输入的转换。
    • I/O 读取。
    • 使用有键状态 (keyed state) 的转换。
    • 具有窗口合并功能的转换。
  • 您正尝试在不同的地区中运行替换作业。您必须在运行原作业的地区中运行替换作业。

更新架构

Apache Beam 允许 Pcollection 具有包含命名字段的架构,在这种情况下,不需要显式编码器。如果给定架构的字段名称和类型(包括嵌套字段)未发生更改,则该架构不会导致更新检查失败(但如果不兼容)。

演变架构

通常,由于业务需求变化,必须改进 PCollection 的架构。Dataflow 服务允许您在更新流水线时对架构进行以下更改:*向架构添加一个或多个新字段,包括嵌套字段。* 将必填字段(不可为 null)设为可选(可以为 null)。

在更新过程中,目前不支持移除字段、更改字段名称或更改字段类型。