流水线生命周期

本页面概要介绍了从流水线代码到 Dataflow 作业的流水线生命周期。

本页介绍了以下概念:

  • 执行图是什么,以及 Apache Beam 流水线如何成为 Dataflow 作业
  • Dataflow 如何处理错误
  • Dataflow 会自动对流水线中的处理逻辑进行并行化处理,并将其分布到执行作业的各个工作器
  • Dataflow 可能会进行的作业优化

执行图

当您的 Dataflow 流水线运行时,Dataflow 会根据用于构造 Pipeline 对象的代码创建一个执行图,其中包括所有转换及其关联的处理函数(如 DoFn 对象)。此图形称为流水线“执行图”,此阶段称为“图表构建时间”

在图形构造期间,Apache Beam 会从流水线代码的主入口点在本地执行代码,在调用源、接收器或转换步骤时停止,并将这些调用转换为图形的节点。因此,流水线的入口点(Java 和 Go main 方法或 Python 脚本的顶层)会在运行流水线的机器上本地执行。在 DoFn 对象的方法中声明的相同代码会在 Dataflow 工作器中执行。

例如,Apache Beam SDK 附带的 WordCount 示例包含一系列转换,这些转换用于对文本集合中的各个词以及每个词的出现次数进行读取、提取、统计、格式设置和写入操作。下图显示了如何将 WordCount 流水线中的转换扩展成执行图:

将 WordCount 示例程序中的转换扩展到步骤执行图以供 Dataflow 服务执行。

图 1:WordCount 示例执行图

通常情况下,执行图中转换的顺序会与您在构造流水线时指定的顺序有所不同。存在此差异是因为 Dataflow 服务在托管云端资源上运行之前会对执行图执行各种优化和融合处理。Dataflow 服务在执行流水线时会遵循数据依赖关系。但是,那些彼此间不存在数据依赖关系的步骤可以按任意顺序运行。

如需查看 Dataflow 为您的流水线生成的未经优化的执行图,请在 Dataflow 监控界面中选择您的作业。如需详细了解如何查看作业,请参阅使用 Dataflow 监控界面

在构造图形期间,Apache Beam 会验证流水线引用的任何资源(如 Cloud Storage 存储桶、BigQuery 表和 Pub/Sub 主题或订阅)是否确实存在,以及是否可访问。验证是通过对相应服务的标准 API 调用完成的,因此用于运行流水线的用户账号务必与必要服务正确连接并有权调用服务的 API。在将流水线提交到 Dataflow 服务之前,Apache Beam 还会检查其他错误,并确保流水线图不包含任何非法操作。

然后,系统会将执行图转换为 JSON 格式,并将 JSON 执行图传输到 Dataflow 服务端点。

Dataflow 服务随后会对 JSON 执行图进行验证。验证完成后,该图形即成为 Dataflow 服务中的一项作业。您可以使用 Dataflow 监控界面查看作业、执行图、状态和日志信息。

Java

Dataflow 服务向您运行 Dataflow 程序的机器发送响应。此响应会封装在 DataflowPipelineJob 对象中,该对象包含 Dataflow 作业的 jobId。请通过 Dataflow 监控界面Dataflow 命令行界面使用 jobId 对您的作业进行监控、跟踪和问题排查。如需了解详情,请参阅 DataflowPipelineJob API 参考文档

Python

Dataflow 服务向您运行 Dataflow 程序的机器发送响应。此响应会封装在 DataflowPipelineResult 对象中,该对象包含 Dataflow 作业的 job_id。请通过 Dataflow 监控界面Dataflow 命令行界面使用 job_id 对您的作业进行监控、跟踪和问题排查。

Go

Dataflow 服务向您运行 Dataflow 程序的机器发送响应。此响应会封装在 dataflowPipelineResult 对象中,该对象包含 Dataflow 作业的 jobID。请通过 Dataflow 监控界面Dataflow 命令行界面使用 jobID 对您的作业进行监控、跟踪和问题排查。

当您在本地执行流水线时,系统也会构造执行图,但不会将其转换为 JSON 格式或传输到服务,而是在 Dataflow 程序启动所在的同一台机器上本地运行该图形。如需了解详情,请参阅配置 PipelineOptions 以在本地执行流水线

错误和异常处理

您的流水线在处理数据时可能会抛出异常。其中一些错误是瞬态的(例如,暂时无法访问外部服务)。有些错误是永久性的(例如,因输入数据损坏或无法解析引发的错误,或者计算期间的 NULL 指针)。

Dataflow 会处理任意软件包中的元素,并会在针对该软件包中的任何元素抛出错误时重试整个软件包。以批量模式运行时,含有失败项的软件包将重试四次。单个软件包失败四次后,流水线会完全失败。以流式模式运行时,含有失败项的软件包会无限地重试,这可能会导致您的流水线永久性停滞。

执行批处理时,在流水线作业完全失败(即任何给定软件包四次重试都失败)之前,您可能会看到大量单独的失败情况。例如,如果您的流水线尝试处理 100 个软件包,那么,在某单个软件包 4 次重试都失败并退出之前,Dataflow 可能会产生数百个个别错误。

启动工作器错误(例如,在工作器上安装软件包失败)是暂时性的。这种情况会导致无限重试,并可能导致流水线永久性停滞。

并行处理和分布

Dataflow 服务会自动对流水线中的处理逻辑进行并行化处理,并将其分布到您指定用于执行作业的各个工作器。Dataflow 会使用编程模型中的抽象层来表示并行处理函数。例如,流水线中的 ParDo 转换可让 Dataflow 将处理代码(由 DoFn 对象表示)自动分布到多个工作器,以实现并行运行。

作业并行有两种类型:

  • 当流水线数据同时在多个工作器上拆分和处理时,会进行横向并行处理。Dataflow 运行时环境由一组分布式工作器提供支持。当池包含的工作器较多时,流水线的潜在并行性更高,但该配置的费用也更高。从理论上讲,横向并行处理没有上限。不过,Dataflow 会将工作器池限制为 4000 个工作器,以优化整个舰队的资源使用情况。

  • 当流水线数据由同一工作器上的多个 CPU 核心拆分和处理时,会进行纵向并行处理。每个工作器都由 Compute Engine 虚拟机提供支持。虚拟机可以运行多个进程,以充分利用其所有 CPU 核心。具有更多核心的虚拟机具有更高的潜在纵向并行处理能力,但此配置会产生更高的费用。核心数越多,内存用量通常也会越高,因此核心数通常会随内存大小而缩放。鉴于计算机架构的物理限制,纵向并行处理上限远低于横向并行处理上限。

托管式并行性

默认情况下,Dataflow 会自动管理作业并行度。Dataflow 会监控作业的运行时统计信息(例如 CPU 和内存用量),以确定如何扩缩作业。根据您的作业设置,Dataflow 可以横向扩缩作业(称为横向自动扩缩),也可以纵向扩缩作业(称为纵向扩缩)。自动扩缩并行度可优化作业费用和作业性能。

为了提高作业性能,Dataflow 还会在内部优化流水线。常见的优化包括融合优化组合优化。通过合并流水线步骤,Dataflow 消除了与在分布式系统中协调步骤和单独运行每个步骤相关的不必要的成本。

影响并行性的因素

以下因素会影响 Dataflow 作业中并行处理的效果。

输入来源

如果输入源不允许并行处理,则输入源注入步骤可能会成为 Dataflow 作业中的瓶颈。例如,当您从单个压缩文本文件中注入数据时,Dataflow 无法并行处理输入数据。由于大多数压缩格式在注入期间不能任意拆分为分片,因此 Dataflow 需要从文件开头按顺序读取数据。流水线的非并行部分会降低流水线的总体吞吐量。解决此问题的方法是使用更具可扩展性的输入源。

在某些情况下,步骤合并也会降低并行性。当输入源不允许并行时,如果 Dataflow 将数据注入步骤与后续步骤融合,并将此融合步骤分配给单个线程,则整个流水线的运行速度可能会更慢。

为避免出现这种情况,请在输入源注入步骤后插入 Reshuffle 步骤。如需了解详情,请参阅本文档的阻止融合部分。

默认的扇形分布和数据格式

单个转换步骤的默认分支可能会成为瓶颈并限制并行度。例如,“高扇出”ParDo 转换可能会导致融合限制 Dataflow 充分利用工作器的能力。在此类操作中,您的输入集合可能只有相对较少的元素,但 ParDo 会输出数量高达输入数百倍或数千倍的元素,并且后面还会伴随另一个 ParDo。如果 Dataflow 服务将这些 ParDo 操作融合在一起,那么即使中间 PCollection 包含更多元素,融合后的步骤也最多只能按照输入集合中的项数执行并行处理。

如需了解潜在解决方案,请参阅本文档中的防止融合部分。

数据形状

数据的形状(无论是输入数据还是中间数据)可能会限制并行度。例如,当自然键(例如城市)的 GroupByKey 步后跟 mapCombine 步时,Dataflow 会合并这两个步骤。如果键空间较小(例如五个城市),并且一个键非常热门(例如一个大城市),则 GroupByKey 步骤输出中的大部分项都会分发到一个进程。此进程会成为瓶颈,并减慢作业速度。

在此示例中,您可以将 GroupByKey 步骤的结果重新分配到更大的人工键空间,而不是使用自然键。在 GroupByKey 步骤和 mapCombine 步骤之间插入 Reshuffle 步骤。在 Reshuffle 步骤中,创建人工键空间(例如,使用 hash 函数),以克服数据形状导致的并行性受限问题。

如需了解详情,请参阅本文档的阻止融合部分。

输出接收器

接收器是写入外部数据存储系统(如文件或数据库)的转换。在实践中,存储终端会被建模和实现为标准 DoFn 对象,并用于将 PCollection 实体化到外部系统。在本例中,PCollection 包含最终的流水线结果。调用接收器 API 的线程可以并行运行,以将数据写入外部系统。默认情况下,线程之间不会进行协调。如果没有中间层来缓冲写入请求和控制流,外部系统可能会过载并降低写入吞吐量。如果通过添加更多并行性来纵向扩容资源,可能会进一步降低流水线的速度。

解决此问题的方法是减少写入步骤中的并行度。您可以在写入步骤之前添加 GroupByKey 步骤。GroupByKey 步骤将输出数据分组到较小的批次中,以减少外部系统的总 RPC 调用和连接。例如,使用 GroupByKey 可在 100 万个数据点中创建 50 个哈希空间。

这种方法的缺点是,它会引入并行性的硬编码限制。另一种方法是在写入数据时在接收器中实现指数退避。此选项可提供最低限度的客户端节流。

监控并行度

如需监控并行度,您可以使用 Google Cloud 控制台查看检测到的任何 straggler。如需了解详情,请参阅排查批处理作业中的 Straggler 问题排查流处理作业中的 Straggler 问题

融合优化

当 JSON 形式的流水线执行图完成验证后,Dataflow 服务可能会修改此执行图以进行优化。优化可能会将流水线执行图中的多个步骤或转换融合为单一步骤。通过将多个步骤融合在一起,Dataflow 服务就无需一一具体化流水线中的所有中间 PCollection,从而可以节省大量内存和处理方面的开销。

虽然您在构建流水线时指定的所有转换都是在该服务中执行的,但为了确保最有效地执行您的流水线,这些转换可能会以不同的顺序执行,或者作为较大融合转换的一部分执行。Dataflow 服务遵循执行图中各步骤之间的数据依赖关系,但其他步骤可能会按任意顺序执行。

融合示例

下图显示了为实现高效执行,Dataflow 服务如何优化和融合 Java 版 Apache Beam SDK 所附带 WordCount 示例中的执行图:

WordCount 示例程序执行图已经过优化,且步骤已通过 Dataflow 服务进行了融合。

图 2:WordCount 示例优化执行图

阻止融合

在某些情况下,Dataflow 可能会错误地猜测融合流水线中各操作的最佳方式,而使得 Dataflow 无法充分利用所有可用的工作器。在这种情况下,您可以阻止 Dataflow 执行融合优化。

要阻止步骤融合,您可以向流水线添加一项操作来强制 Dataflow 服务具体化中间 PCollection。请考虑使用下列其中一项操作:

  • 插入一个 GroupByKey,并在第一个 ParDo 之后取消分组。Dataflow 服务永远不会融合汇总过程中的 ParDo 操作。
  • 将中间 PCollection 作为辅助输入传递到另一个 ParDo。Dataflow 服务始终会将辅助输入具体化。
  • 插入 Reshuffle 步骤。Reshuffle 可防止融合,为数据添加检查点以及对记录执行去重。即使在 Apache Beam 文档中标记为已弃用,Dataflow 也支持重排。

监控融合

您可以使用 Google Cloud 控制台、gcloud CLI 或 API 访问优化图表和融合阶段。

控制台

如需在控制台中查看图表的融合阶段和步骤,请在 Dataflow 作业的执行详情标签页中打开阶段工作流图表视图。

如需查看为阶段融合的组件步骤,请点击图表中的融合阶段。在阶段信息窗格中,组件步骤行会显示融合阶段。有时,单个复合转换的各个部分会被融合为多个阶段。

gcloud

如需使用 gcloud CLI 访问优化图表和融合阶段,请运行以下 gcloud 命令:

  gcloud dataflow jobs describe --full JOB_ID --format json

JOB_ID 替换为 Dataflow 作业的 ID。

如需提取相关位,请将 gcloud 命令的输出传送到 jq

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

如需在输出响应文件中查看融合阶段的说明,请参阅 ComponentTransform 数组中的 ExecutionStageSummary对象。

API

如需使用 API 访问优化图表和融合阶段,请调用 project.locations.jobs.get

如需在输出响应文件中查看融合阶段的说明,请参阅 ComponentTransform 数组中的 ExecutionStageSummary对象。

组合优化

在大规模数据处理中,汇总操作是一个非常重要的概念。 汇总是指将概念上相距甚远的数据汇集在一起,这极其适用于实现关联性。在 Dataflow 编程模型中,汇总操作以 GroupByKeyCoGroupByKeyCombine 转换形式表示。

Dataflow 的汇总操作可将整个数据集中的数据组合到一起,包括可能分布在多个工作器中的数据。在此类汇总操作期间,通常最有效的做法是先尽可能多地组合本地数据,然后再组合各实例中的数据。当您应用 GroupByKey 或其他汇总转换时,Dataflow 服务会在进行主分组操作之前,自动在本地组合部分内容。

在组合部分或多级层内容时,Dataflow 服务会根据您的流水线是使用批量数据还是流式数据而做出不同的决策。对于有界限数据,该服务更注重效率,并将尽可能多地组合本地内容。对于无界限数据,该服务更注重低延迟,可能不会组合部分内容(因为此操作可能会增加延迟时间)。