部署流水线

本文档详细介绍了 Dataflow 如何部署和运行流水线,并介绍了优化和负载平衡等高级主题。如果需要有关如何创建和部署首个流水线的分步指南,请参阅 Dataflow 的 JavaPython模板快速入门。

构建和测试 Apache Beam 流水线后,您可以使用 Dataflow 代管式服务来部署和执行该流水线。运行 Dataflow 服务后,您的流水线代码将成为 Dataflow 作业。

Dataflow 服务可以完全托管用于运行 Dataflow 作业的 Google Cloud 服务(如 Compute EngineCloud Storage),并会自动运行和清理必要的资源。借助 Dataflow 服务,您可以通过 Dataflow 监控界面Dataflow 命令行界面等工具了解作业的运行情况。

您可以在流水线代码中设置执行参数,以此控制 Dataflow 服务在运行作业时的某些行为。例如,可使用执行参数指定流水线的各步骤在以下哪个位置运行:工作器虚拟机、Dataflow 服务后端还是本地。

除了管理 Google Cloud 资源以外,Cloud Dataflow 服务还会自动执行和优化分布式并行处理的诸多方面。其中包括:

  • 并行处理和分布。Dataflow 会自动对您的数据进行分区,并将您的工作器代码分布到多个 Compute Engine 实例进行并行处理。
  • 优化。Dataflow 会使用您的流水线代码创建一个执行图来表示您流水线的 PCollection 和转换,并优化此执行图以实现最高效的性能和资源利用率。Dataflow 还会自动优化成本可能非常高昂的操作,例如数据汇总。
  • 自动调节功能。Dataflow 服务包含多项可动态调整资源分配和数据分区的功能,例如自动扩缩和动态工作负载再平衡功能。这些功能有助于 Dataflow 服务尽可能快速、高效地执行您的作业。

流水线生命周期:从流水线代码到 Dataflow 作业

当您的 Dataflow 流水线运行时,Dataflow 会根据用于构造 Pipeline 对象的代码创建一个执行图,其中包括所有转换及其关联的处理函数(如 DoFn)。这一阶段称为图形构造时间,并在运行流水线的本地计算机上运行。

在图形构造期间,Apache Beam 会从流水线代码的主入口点在本地执行代码,在调用源、接收器或转换步骤时停止,并将这些调用转换为图形的节点。因此,流水线的入口点(Java 的 main() 方法或 Python 脚本的顶层)会在运行流水线的机器上本地执行,同时,在 DoFn 对象的方法中声明的相同代码会在 Dataflow 工作器中执行。

此外,在构造图形期间,Apache Beam 会验证流水线引用的任何资源(如 Cloud Storage 存储分区、BigQuery 表和 Pub/Sub 主题或订阅)是否确实存在,以及是否可访问。 验证是通过对相应服务的标准 API 调用完成的,因此用于运行流水线的用户帐号务必与必要服务正确连接并有权调用其 API。在将流水线提交到 Dataflow 服务之前,Apache Beam 还会检查其他错误,并确保流水线图不包含任何非法操作。

然后,系统会将执行图转换为 JSON 格式,并将 JSON 执行图传输到 Dataflow 服务端点。

注意:当您在本地执行流水线时,系统也会构造执行图,但不会将其转换为 JSON 格式或传输到服务,而是在 Dataflow 程序启动所在的同一台机器上本地运行该图形。如需了解详情,请参阅有关配置本地执行的文档。

Dataflow 服务随后会对 JSON 执行图进行验证。验证完成后,该图形即成为 Dataflow 服务中的一项作业。您可以使用 Dataflow 监控界面查看作业、执行图、状态和日志信息。

Java:SDK 2.x

Dataflow 服务向您运行 Dataflow 程序的机器发送响应。此响应会封装在 DataflowPipelineJob 对象中,该对象包含 Dataflow 作业的 jobId。通过 Dataflow 监控界面Dataflow 命令行界面,您可以使用 jobId 对您的作业进行监控、跟踪和问题排查。如需了解详情,请参阅 DataflowPipelineJob API 参考资料

Python

Dataflow 服务向您运行 Dataflow 程序的机器发送响应。此响应会封装在 DataflowPipelineResult 对象中,该对象包含 Dataflow 作业的 job_id。通过 Dataflow 监控界面Dataflow 命令行界面,您可以使用 job_id 对您的作业进行监控、跟踪和问题排查。

Java:SDK 1.x

执行图

Dataflow 会根据您在构造 Pipeline 对象时使用的转换和数据,构建一个步骤图来表示您的流水线。此图形称为流水线执行图

Apache Beam SDK 附带了一个 WordCount 示例,其中包含一系列用于对文本集合中的个别字词进行读取、提取、统计、格式设置和写入操作的转换,以及用于统计每个字词出现次数的转换。下图显示了如何将 WordCount 流水线中的转换扩展成执行图:

将 WordCount 示例程序中的转换扩展成一个包含 Dataflow 服务待执行步骤的执行图。
图 1:WordCount 示例的执行图

通常情况下,执行图中转换的顺序会与您在构造流水线时指定的顺序有所不同。这是因为 Dataflow 服务在托管云端资源上运行之前会对执行图执行各种优化和融合处理。Dataflow 服务在执行流水线时会遵循数据依赖关系;但是,那些彼此间不存在数据依赖关系的步骤可以按任意顺序执行。

Dataflow 监控界面中选择作业时,您可以看到 Dataflow 为流水线生成的未经优化的执行图。

并行处理和分布

Dataflow 服务会自动对流水线中的处理逻辑进行并行化处理,并将其分布到您指定用于执行作业的各个工作器。Dataflow 会使用编程模型中的抽象层来表示并行处理函数;例如,ParDo 转换可让 Dataflow 将处理代码(由 DoFn 表示)自动分布到多个工作器,以实现并行运行。

设计用户代码结构

您可以将 DoFn 代码视为若干独立的小型实体,也就是说,可能有许多实例在不同机器中运行,且各实例之间对彼此一无所知。因此,纯函数(即不依赖隐藏或外部状态、没有明显副作用且具有确定性的函数)代码会非常符合 DoFn 的并行和分布性质。

但是,纯函数模型并不严谨;只要您的代码不依赖 Dataflow 服务无法保证的内容,状态信息或外部初始化数据就可用于 DoFn 和其他函数对象。在设计 ParDo 转换结构并创建 DoFn 时,请谨记以下准则:

  • Dataflow 服务可以保证输入 PCollection 中的所有元素都只由 DoFn 实例处理一次。
  • Dataflow 服务无法保证 DoFn 的调用次数。
  • Dataflow 服务无法保证分布式元素的确切分组方式,也就是说,它无法保证系统会一起处理哪些元素(如果有)。
  • Dataflow 服务无法保证在流水线执行过程中会具体创建多少个 DoFn 实例。
  • Dataflow 服务具有容错功能;如果工作器发生问题,该服务可能会多次重试您的代码。Dataflow 服务可能会创建代码的备份副本,并且可能会出现人为因素的负面影响问题(例如,您的代码依赖或创建的临时文件使用了重复名称)。
  • Dataflow 服务会按 DoFn 实例序列化元素的处理。 您的代码不必是严格的线程安全代码;但是,多个 DoFn 实例之间共用的任何状态必须是线程安全的。

如需详细了解如何构建用户代码,请参阅编程模型文档中的用户提供函数要求

错误和异常处理

您的流水线在处理数据时可能会抛出异常。其中一些错误是瞬态的(例如,暂时性无法访问外部服务),但有些错误是永久性的(例如,由输入数据损坏或不可解析引发的错误,或者计算期间的 NULL 指针)。

Dataflow 会处理任意内容包中的元素,并会在该内容包中的任何元素引发错误时重试整个内容包。以批量模式运行时,含有失败项的内容包将重试 4 次。单个内容包失败 4 次即意味着流水线完全失败。以流处理模式运行时,含有失败项的内容包将无限期地重试下去,这可能会导致您的流水线永久停止。

融合优化

当 JSON 形式的流水线执行图完成验证后,Dataflow 服务可能会修改此执行图以进行优化。例如,该服务可能会将流水线执行图中的多个步骤或转换融合为单一步骤。通过将多个步骤融合在一起,Dataflow 服务就无需一一具体化流水线中的所有中间 PCollection,从而可以节省大量内存和处理方面的开销。

虽然您在构建流水线时指定的所有转换均在该服务中执行,但这些转换可能以不同的顺序执行,或者它们可能作为一个较大融合式转换的一部分执行,以便确保最有效地执行您的流水线。Dataflow 服务遵循执行图中各步骤之间的数据依赖关系,但其他步骤可以按任意顺序执行。

融合示例

下图显示了为实现高效执行,Dataflow 服务如何优化和融合 Java 版 Apache Beam SDK 所附带 WordCount 示例中的执行图:

WordCount 示例程序执行图已经过优化,且步骤已通过 Dataflow 服务进行了融合。
图 2:WordCount 示例的优化执行图

阻止融合

有些情况下,您可能希望阻止 Dataflow 服务在您的流水线中执行融合优化。在这些情况下,Dataflow 服务可能会错误地猜测融合流水线中各操作的最佳方式,而使得 Dataflow 服务无法充分利用所有可用的工作器,因此需要禁止融合。

谈到因融合而导致 Dataflow 无法充分利用工作器的情况,“高扇出”ParDo 就是一例。在此类操作中,您的输入集合可能只有相对较少的元素,但 ParDo 会输出数量高达输入数百倍或数千倍的元素,并且后面还会伴随另一个 ParDo。如果 Dataflow 服务将这些 ParDo 操作融合在一起,那么即使中间 PCollection 包含更多元素,融合后的步骤也最多只能按照输入集合中的项数执行并行处理。

要阻止这种融合,您可以向流水线添加一项操作来强制 Dataflow 服务具体化中间 PCollection。请考虑使用下列其中一项操作:

  • 您可以插入一个 GroupByKey,并在第一个 ParDo 之后取消分组。Dataflow 服务永远不会融合汇总过程中的 ParDo 操作。
  • 您可以将中间 PCollection 作为辅助输入传递到另一个 ParDo。Dataflow 服务始终会将辅助输入具体化。
  • 您可以插入 Reshuffle 步骤。Reshuffle 可防止融合,为数据添加检查点以及对记录执行去重。即使在 Apache Beam 文档中标记为已弃用,Dataflow 也支持重排。

组合优化

在大规模数据处理中,汇总操作是一个非常重要的概念。汇总是指将概念上相距甚远的数据汇集在一起,这极其有助于实现关联性。 在 Dataflow 编程模型中,汇总操作以 GroupByKeyCoGroupByKeyCombine 转换形式表示。

Dataflow 的汇总操作可将整个数据集中的数据组合到一起,包括可能分布在多个工作器中的数据。在此类汇总操作期间,通常最有效的做法是先尽可能多地组合本地数据,然后再组合各实例中的数据。当您应用 GroupByKey 或其他汇总转换时,Dataflow 服务会在进行主分组操作之前,自动在本地组合部分内容

在组合部分或多级层内容时,Dataflow 服务会根据您的流水线是使用批量数据还是流式数据而做出不同的决策。对于有界限数据,该服务更注重效率,并将尽可能多地组合本地内容。对于无界限数据,该服务更注重低延迟,可能不会组合部分内容(因为此操作可能会增加延迟时间)。

自动调节功能

Dataflow 服务包含多项自动调节功能,这些功能可以进一步动态地优化运行中的 Dataflow 作业。自动调节功能包括自动扩缩动态工作负载再平衡

自动扩缩

启用自动扩缩功能后,Dataflow 服务便会自动选择运行作业所需的适当数量的工作器实例。在运行期间,Dataflow 服务还可以根据您的作业特征进行动态再分配,增加或减少工作器数量。举例来说,流水线的特定部分可能要比其他部分执行更加繁重的计算任务,因此,在这些作业阶段中,Dataflow 服务可能会自动启动额外的工作器(并会在不再需要这些工作器时将其关闭)。

Java:SDK 2.x

默认情况下,所有批处理 Dataflow 作业和使用 Streaming Engine 的流处理作业都会启用自动扩缩功能。您可以停用自动扩缩功能,方法是在运行流水线时指定标记 --autoscalingAlgorithm=NONE,如果执行了此操作,那么请注意,Dataflow 服务会根据 --numWorkers 选项设置工作器数量(默认值为 3)。

启用自动扩缩功能后,Dataflow 服务便不会允许用户控制分配给您作业的确切工作器实例数量。您仍然可以设置工作器数量上限,具体做法是在运行流水线时指定 --maxNumWorkers 选项。

对于批处理作业而言,--maxNumWorkers 是可选标志。默认值为 1000。 对于使用 Streaming Engine 的流处理作业,--maxNumWorkers 是可选标志。默认值为 100。对于不使用 Streaming Engine 的流处理作业,--maxNumWorkers 是必需标志。

Python

默认情况下,使用 Python 版 Apache Beam SDK 0.5.1(或更高版本)创建的所有批处理 Dataflow 作业都会启用自动扩缩功能。您可以停用自动扩缩功能,方法是在运行流水线时明确指定标记 --autoscaling_algorithm=NONE,如果执行了此操作,那么请注意,Dataflow 服务会根据 --num_workers 选项设置工作器数量(默认值为 3)。

Java:SDK 1.x

Dataflow 根据流水线的并行性进行扩缩。流水线的并行性是指在任何给定时间以最有效率的方式处理数据所需的估算线程数。

批量自动扩缩功能

对于批处理流水线,Dataflow 会根据流水线每个阶段的预估总工作量自动选择工作器数量,具体取决于输入大小和当前吞吐量。Dataflow 会根据每 30 秒的执行进度重新评估工作量,并根据预估总工作量的增减情况动态增加或减少工作器数量。

如果出现以下任何情况,为了节省空闲资源,Dataflow 会维持或减少工作器数量:

  • 一般工作器 CPU 使用率低于 5%。
  • 并行性会由于不可并行的工作而受到限制,例如不可拆分的数据(如压缩文件)或由不拆分的 I/O 模块处理的数据。
  • 并行数量是固定的,例如在 Cloud Storage 目标位置写入现有文件。
  • 如果您的流水线使用已实现的自定义数据源,那么您可以通过实现以下几种方法来向 Dataflow 服务的自动扩缩算法提供更多信息(有可能会提高性能):

    Java:SDK 2.x

    • BoundedSource 子类中,实现 getEstimatedSizeBytes 方法。Dataflow 服务会使用 getEstimatedSizeBytes 计算要用于您的流水线的初始工作器数量。
    • BoundedReader 子类中,实现 getFractionConsumed 方法。Dataflow 服务会使用 getFractionConsumed 跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。

    Python

    • BoundedSource 子类中,实现 estimate_size 方法。Dataflow 服务会使用 estimate_size 计算要用于您的流水线的初始工作器数量。
    • RangeTracker 子类中,实现 fraction_consumed 方法。Dataflow 服务会使用 fraction_consumed 跟踪读取进度,并对读取期间使用的正确数量的工作器进行收敛。

    Java:SDK 1.x

    流式自动扩缩功能

    流式自动扩缩功能可让 Dataflow 服务以自适应方式更改用于执行流处理流水线的工作器数量,以应对负载和资源利用率的变化。流式自动扩缩是一项免费功能,旨在降低用于执行流处理流水线的资源费用。

  • 纵向扩容:如果流处理流水线的平均工作器使用率超过 20%,则 Dataflow 会在几分钟内进行纵向扩容。根据当前每个工作器的吞吐量,Dataflow 的目标是在纵向扩容后大约 150 秒内清除积压。
  • 纵向缩容:如果流处理流水线积压时间少于 10 秒,并且工作器的平均 CPU 使用率低于 75%,则 Dataflow 会纵向缩容。纵向缩容后,工作器的平均 CPU 使用率为 75%。在不使用 Streaming Engine 的流处理作业中,由于磁盘分布和使用了较低的目标,可能无法实现 75% 的 CPU 使用率。
  • 不扩缩:如果没有积压,但 CPU 使用率为 75% 或更高,则流水线不会纵向缩容。如果存在积压,但 CPU 使用率低于 20%,则流水线不会纵向扩容。
  • 如果不启用自动扩缩功能,您可以通过指定 numWorkersnum_workers,选择固定数量的工作器来执行您的流水线。由于输入工作负载会随时间变化,因此您指定的数量对于当下的情况可能会变得太多或太少。如果配置的工作器过多,则会导致不必要的额外费用,而如果配置的工作器过少,则会导致处理数据延迟增加。启用自动扩缩功能后,系统只有在需要时才会使用资源。

    自动扩缩流式流水线的目标是最大限度地减少积压,同时尽可能提高工作器的利用率和吞吐量,从而快速应对负载高峰。启用自动扩缩功能后,您就无需选择配置高峰负载或全新结果。当 CPU 利用率和积压增加时,系统会相应地添加工作器,而当这些指标下降时,系统也会相应地移除工作器。这样一来,您只需为所用的资源付费,而且作业也会尽可能高效地得到处理。

    Java:SDK 2.x

    自定义无界限来源

    如果您的流水线使用自定义无界限来源,那么该来源必须在出现输入积压情况时向 Dataflow 服务发送通知。积压输入量用于估算尚未经来源处理的输入量(以字节为单位)。如要就输入积压的事宜向该服务发送通知,请在您的 UnboundedReader 类中实现以下任一方法。

    • getSplitBacklogBytes() - 当前来源分组的积压输入量。该服务会汇总所有分组的积压输入量。
    • getTotalBacklogBytes() - 所有分组中的全局积压输入量。在某些情况下,无法计算每个分组的积压输入量,而是只能计算所有分组的积压输入量。 只需提供第一个分组(分组 ID 为“0”)的总积压输入量。
    Apache Beam 代码库包含多个实现 UnboundedReader 类的自定义来源示例
    启用流式自动扩缩功能

    对于使用 Streaming Engine 的流处理作业,默认情况下会启用自动扩缩功能。

    如需为不使用 Streaming Engine 的作业启用自动扩缩功能,请在启动流水线时设置以下执行参数

    --autoscalingAlgorithm=THROUGHPUT_BASED
    --maxNumWorkers=N
    

    对于不使用 Streaming Engine 的流处理作业,工作器数量下限是 --maxNumWorkers 值的 1/15,向上取整。

    流处理流水线在部署时使用的是固定永久性磁盘池(相当于 --maxNumWorkers 个磁盘)。在指定 --maxNumWorkers 时请考虑这一点,并确保此磁盘数量足以满足您流水线的需求。

    用量和价格

    Compute Engine 用量是根据平均工作器数量来计算,而 Persistent Disk 用量则是以 --maxNumWorkers 的确切数量为准。系统会重新分配永久性磁盘,使每个工作器都会挂接相同数量的磁盘。

    在上面的示例中,由于设置了 --maxNumWorkers=15,您将需要支付 1 至 15 个 Compute Engine 实例的费用,以及正好 15 个永久性磁盘的费用。

    Python

    启用流式自动扩缩功能

    如需启用自动扩缩功能,请在启动流水线时设置以下执行参数

    --autoscaling_algorithm=THROUGHPUT_BASED
    --max_num_workers=N
    

    对于不使用 Streaming Engine 的流处理作业,工作器数量下限是 --maxNumWorkers 值的 1/15,向上取整。

    流处理流水线在部署时使用的是固定永久性磁盘池(相当于 --maxNumWorkers 个磁盘)。在指定 --maxNumWorkers 时请考虑这一点,并确保此磁盘数量足以满足您流水线的需求。

    用量和价格

    Compute Engine 用量是根据平均工作器数量来计算,而 Persistent Disk 用量则是以 --max_num_workers 的确切数量为准。系统会重新分配永久性磁盘,使每个工作器都会挂接相同数量的磁盘。

    在上面的示例中,由于设置了 --max_num_workers=15,您将需要支付 1 至 15 个 Compute Engine 实例的费用,以及正好 15 个永久性磁盘的费用。

    Java:SDK 1.x

    手动扩缩流处理流水线

    在以流处理模式正式提供自动扩缩功能之前,您可以使用 Dataflow 的更新功能,手动扩缩运行流处理流水线所需的工作器数量。

    Java:SDK 2.x

    要在执行过程中扩缩流处理流水线的规模,请务必在启动该流水线时设置以下执行参数

    • --maxNumWorkers 设置为要提供给流水线使用的工作器数量上限
    • --numWorkers 设置为流水线在开始运行时要使用的初始工作器数量。

    流水线运行后,您可以更新流水线并使用 --numWorkers 参数指定新的工作器数量。为新 --numWorkers 设置的值必须介于 N--maxNumWorkers 之间,其中 N 等于 --maxNumWorkers/15。

    更新流水线会将正在运行的作业替换为采用新工作器数量的新作业,但会保留与先前作业关联的所有状态信息。

    Python

    要在执行过程中扩缩流处理流水线的规模,请务必在启动该流水线时设置以下执行参数

    • --max_num_workers 设置为要提供给流水线使用的工作器数量上限
    • --num_workers 设置为流水线在开始运行时要使用的初始工作器数量。

    流水线运行后,您可以更新流水线并使用 --num_workers 参数指定新的工作器数量。为新 --num_workers 设置的值必须介于 N--max_num_workers 之间,其中 N 等于 --max_num_workers/15。

    更新流水线会将正在运行的作业替换为采用新工作器数量的新作业,但会保留与先前作业关联的所有状态信息。

    Java:SDK 1.x

    动态工作负载再平衡

    Dataflow 服务的动态工作负载再平衡功能可让该服务根据运行时状况动态地重新分配工作负载。这些条件可能包括:

    • 工作负载分配不平衡
    • 工作器完成时间超出预期
    • 工作器完成速度超出预期

    Dataflow 服务会自动检测这些状况,并可动态地将工作负载重新分配到未使用或未充分利用的工作器,从而缩短作业的总体处理时间。

    限制

    动态工作负载再平衡功能仅适用于 Dataflow 服务并行处理部分输入数据的情况,例如,从外部输入来源读取数据、使用已具体化的中间 PCollection,或使用 GroupByKey 之类汇总转换的结果。如果您的作业中有大量步骤发生了融合,则该作业所含的中间 PCollection 将会较少,而动态工作负载再平衡功能将会仅按照已具体化的来源 PCollection 中的元素数量进行应用。如果您希望确保动态工作负载再平衡功能可应用于流水线中的特定 PCollection,则可以通过几种不同方式阻止融合,以确保能进行动态并行处理。

    动态工作负载再平衡功能无法重新并行处理比单条记录更精细的数据。如果数据中所含的个别记录会导致处理时间大幅延迟,那么这些记录可能仍会导致作业延迟,因为 Dataflow 无法对个别“热门”记录进行细分并将其重新分配到多个工作器。

    Java:SDK 2.x

    如果您为流水线的最终输出设置了固定数量的分片(例如使用 TextIO.Write.withNumShards 写入数据),则并行处理过程将受限于您选择的分片数量。

    Python

    如果您为流水线的最终输出设置了固定数量的分片(例如使用 beam.io.WriteToText(..., num_shards=...) 写入数据),则 Dataflow 将根据您选择的分片数量来限制并行处理。

    Java:SDK 1.x

    固定分片数这一限制只是暂时性的处置做法,并且可会在未来的 Dataflow 服务版本中发生变化。

    使用自定义数据源

    Java:SDK 2.x

    如果您的流水线使用您提供的自定义数据源,您必须实现 splitAtFraction 方法,这样该数据源才能使用动态工作负载再平衡功能。

    Python

    如果您的流水线使用您提供的自定义数据源,则您的 RangeTracker 必须实现 try_claimtry_splitposition_at_fractionfraction_consumed 方法,这样该数据源才能使用动态工作负载再平衡功能。

    如需了解详情,请参阅 RangeTracker 上的 API 参考信息

    Java:SDK 1.x

    资源用量和管理

    Dataflow 服务会全面托管每个作业在 Google Cloud 中的资源。这包括启动和关停 Compute Engine 实例(有时称为工作器或虚拟机),以及访问项目用于暂存 I/O 和临时文件的 Cloud Storage 存储分区。但是,如果您的流水线需要与 Google Cloud 数据存储技术(如 BigQueryPub/Sub)进行交互,则您必须管理这些服务的资源和配额。

    Dataflow 会将用户提供的 Cloud Storage 位置专门用于暂存文件。该位置由您全权控制,并且您应确保该位置的生命周期能持续到从中读取数据的所有作业都结束为止。您可以将同一暂存位置重复用于作业的多次运行,因为 SDK 的内置缓存可以加快作业的开始时间。

    作业

    每个 Google Cloud 项目最多可以运行 25 个并发 Dataflow 作业;但是,您可以联系 Google Cloud 支持团队来提高此限制。如需了解详情,请参阅配额

    Dataflow 服务目前只能处理 20MB 或更小的 JSON 作业请求。作业请求的大小取决于流水线的 JSON 表示;流水线越大意味着请求也越大。

    要估算流水线的 JSON 请求大小,请使用以下选项运行您的流水线:

    Java:SDK 2.x

    --dataflowJobFile=< path to output file >
    

    Python

    --dataflow_job_file=< path to output file >
    

    Java:SDK 1.x

    此命令会将作业的 JSON 表示写入文件中。您最好根据序列化文件的大小来估算请求大小;由于请求中包含一些其他信息,实际大小将会略大一些。

    如需了解详情,请参阅以下问题排查页面:“413 请求实体过大”/“流水线的序列化 JSON 表示法的大小超出允许的限额”

    此外,您作业的图形大小不得超过 10 MB。如需了解详情,请参阅以下问题排查页面:“作业图太大。请使用较小的作业图重试,或将作业拆分成两个或更多较小的作业”

    工作器

    目前,Dataflow 服务允许每个作业最多使用 1000 个 Compute Engine 实例。 对于批量作业,默认机器类型为 n1-standard-1。对于流式作业,*支持 Streaming Engine* 的作业的默认机器类型为 n1-standard-2,*非 Streaming Engine* 作业的默认机器类型为 n1-standard-4。因此,使用默认机器类型时,Dataflow 服务可以为每个作业分配多达 4000 个核心。如果您的作业需要更多核心,则可以选择更大的机器类型。

    您可以使用任何可用的 Compute Engine 机器类型系列以及自定义机器类型。为获得最佳效果,请使用 n1 机器类型。Dataflow 服务等级协议不支持共享核心机器类型(例如 f1g1 系列工作器)。

    Dataflow 按工作器中的 vCPU 数量和内存 GB 数计费。计费与机器类型系列无关。您可以为流水线指定机器类型,方法是在创建流水线时设置适当的执行参数

    Java:SDK 2.x

    如需更改机器类型,请设置 --workerMachineType 选项。

    Python

    如需更改机器类型,请设置 --worker_machine_type 选项。

    Java:SDK 1.x

    资源配额

    Dataflow 服务会进行检查,以确保您的 Google Cloud 项目具有运行作业(包括启动作业和调节为最多数量的工作器实例)所需的 Compute Engine 资源配额。如果没有足够可用的资源配额,您的作业将无法启动。

    Dataflow 的自动扩缩功能受项目的可用 Compute Engine 配额限制。如果有足够的配额供您的作业启动,但项目的其余可用配额被另一个作业所占用,则第一个作业将可以运行但无法完全扩展。

    不过,对于超出项目资源配额的作业,Dataflow 服务负责解决为这些作业增加配额的问题。必要时,您可以使用 Google Cloud Console 来申请更多资源配额。

    永久性磁盘资源

    在运行流处理作业时,Dataflow 服务目前仅支持每个工作器实例 15 个永久性磁盘。每个永久性磁盘是单个 Compute Engine 虚拟机的本地磁盘。作业中的工作器数量不得超过永久性磁盘数量。最低资源配额是工作器数量与磁盘数量之比为 1:1。

    对于在工作器虚拟机上运行的作业,每个永久性磁盘的默认大小为 250 GB(对于批处理模式)400 GB(对于流处理模式)。使用 Streaming EngineDataflow Huffle 的作业在 Dataflow 服务后端运行并使用较小的磁盘。

    位置

    默认情况下,Dataflow 服务会将 Compute Engine 资源部署到 us-central1 区域的 us-central1-f 地区中。如果替换这项设置,您可以指定 --region 参数。如果您需要为资源使用特定地区,请在创建流水线时使用 --zone 参数。不过,我们建议您仅指定区域而不要指定地区。这样,Dataflow 服务便可在收到作业创建请求时,自动根据可用的地区容量在该区域内选择最佳地区。如需了解详情,请参阅区域性端点文档。

    Streaming Engine

    目前,Dataflow 流水线运行程序完全在工作器虚拟机上执行流处理流水线的各步骤,并会消耗工作器 CPU、内存和永久性磁盘存储空间。Dataflow 的 Streaming Engine 会将流水线执行从工作器虚拟机中移出并移入 Dataflow 服务后端。

    Streaming Engine 的优点

    Streaming Engine 模型具有以下优点:

    • 可减少在工作器虚拟机上使用的 CPU、内存和 Persistent Disk 存储空间资源。 Streaming Engine 最适合较小的工作器机器类型(n1-standard-2,而非 n1-standard-4),并且只需要小型工作器启动磁盘(而不需要 Persistent Disk),从而减少了资源和配额用量。
    • 可让自动扩缩功能更快地响应传入数据量的变化。Streaming Engine 可让您以更顺畅且更精细的方式扩缩工作器的数量。
    • 可提升支持能力;您无需重新部署流水线即可应用服务更新。

    节省的工作器资源主要是通过将工作负载分流到 Dataflow 服务实现的。 因此,使用 Streaming Engine 会产生一笔相关费用。但是,与不使用 Streaming Engine 的 Dataflow 流水线相比,使用此选项的 Dataflow 流水线的帐单总额应该大致相同

    使用 Streaming Engine

    目前已在以下区域为流处理流水线提供 Streaming Engine。此服务未来也会在其他区域提供。

    • us-west1(俄勒冈)
    • us-central1(爱荷华)
    • us-east1(南卡罗来纳)
    • us-east4(北弗吉尼亚)
    • northamerica-northeast1(蒙特利尔)
    • europe-west2(伦敦)
    • europe-west1(比利时)
    • europe-west4(荷兰)
    • europe-west3(法兰克福)
    • asia-southeast1(新加坡)
    • asia-east1(台湾)
    • asia-northeast1(东京)
    • australia-southeast1(悉尼)

    Java:SDK 2.x

    要对流处理流水线使用 Streaming Engine,请指定以下参数:

    • --enableStreamingEngine(如果您使用的是 Java 版 Apache Beam SDK 2.11.0 或更高版本)。
    • --experiments=enable_streaming_engine(如果您使用的是 Java 版 Apache Beam SDK 2.10.0 版)。

    如果您对流水线使用 Dataflow Streaming Engine,请勿指定 --zone 参数,而应指定 --region 参数并将其值设置为目前提供 Streaming Engine 的一个区域。Dataflow 会自动选择指定区域中的地区。如果您指定了 --zone 参数并将其设置为可用区域之外的一个地区,则 Dataflow 会报告错误。

    Streaming Engine 最适合较小的工作器机器类型,因此我们建议您设置 --workerMachineType=n1-standard-2。您还可以设置 --diskSizeGb=30,因为 Streaming Engine 只需要用于存储工作器启动映像和本地日志的空间。这些值为默认值。

    Python

    当满足以下条件时,默认情况下,系统会针对新的 Dataflow 流处理流水线启用 Streaming Engine:

    如果要在 Python 流处理流水线中停用 Streaming Engine,请指定以下参数:

    --experiments=disable_streaming_engine

    如果您使用 Python 2,仍必须通过指定以下参数来启用 Streaming Engine:

    --enable_streaming_engine

    如果您在流水线中使用 Dataflow Streaming Engine,请勿指定 --zone 参数。而应指定 --region 参数并将其值设置为目前提供 Streaming Engine 的一个地区。Dataflow 会自动选择指定区域中的地区。如果您指定了 --zone 参数并将其设置为可用区域之外的一个地区,则 Dataflow 会报告错误。

    Streaming Engine 最适合较小的工作器机器类型,因此我们建议您设置 --machine_type=n1-standard-2。您还可以设置 --disk_size_gb=30,因为 Streaming Engine 只需要用于存储工作器启动映像和本地日志的空间。这些值为默认值。

    Java:SDK 1.x

    Dataflow Shuffle

    Dataflow Shuffle 是 Dataflow 转换(例如 GroupByKeyCoGroupByKeyCombine)背后的基础操作。Dataflow Shuffle 操作能够以可扩缩、高效且可容错的方式,根据键对数据进行分区和分组。目前,Dataflow 使用完全在工作器虚拟机上运行的 Shuffle 实现,并会消耗工作器 CPU、内存和永久性磁盘存储空间。这项基于服务的 Dataflow Shuffle 功能(仅适用于批处理流水线)可将 Shuffle 操作移出工作器虚拟机并移入 Dataflow 服务后端。

    Dataflow Shuffle 的优点

    基于服务的 Dataflow Shuffle 具有以下优点:

    • 可让绝大多数流水线作业类型的批处理流水线执行得更快。
    • 可减少在工作器虚拟机上使用的 CPU、内存和 Persistent Disk 存储空间资源。
    • 可实现更加出色的自动扩缩能力;虚拟机不再保留任何 Shuffle 数据,因此可以较早地缩减规模。
    • 可提升容错能力;采用这项功能之后,即使保留 Dataflow Shuffle 数据的虚拟机运行状况不佳,也不会导致整个作业失败(未采用此功能时可能就会失败)。

    节省的工作器资源主要是通过将 Shuffle 工作负载分流到 Dataflow 服务实现的。因此,使用 Dataflow Shuffle 会产生一笔相关费用。但是,与不使用基于服务的 Dataflow 实现的 Dataflow 流水线相比,使用该选项的 Dataflow 流水线的帐单总额应当会更少或保持不变

    对于绝大多数流水线作业类型而言,Dataflow Shuffle 的执行速度会快于在工作器虚拟机上运行的 Shuffle 实现。但是,执行时间可能因具体运行情况而异。如果您运行的流水线有重要期限,我们建议在期限之前分配足够的缓冲时间。此外,考虑为 Shuffle 申请更大的配额

    磁盘注意事项

    使用基于服务的 Dataflow Shuffle 功能时,您无需将大型永久性磁盘挂接到工作器虚拟机。Dataflow 会自动挂接一个 25 GB 的小型启动磁盘。但由于这种磁盘较小,因此在使用 Dataflow Shuffle 时需要注意一些重要事项:

    • 工作器虚拟机将 25 GB 磁盘空间的一部分用于存储操作系统、二进制文件、日志和容器。使用 Dataflow Shuffle 时,那些耗用大量磁盘空间且空间需求超出剩余磁盘容量的作业可能会失败。
    • 受限于小型磁盘的性能,使用大量磁盘 I/O 的作业可能执行得很慢。 如需详细了解不同大小磁盘之间的性能差异,请参阅 Compute Engine Persistent Disk 性能页面。

    如果您的作业存在以上注意事项所述的任何情况,您可以使用流水线选项指定较大的磁盘大小。

    使用 Dataflow Shuffle

    目前,我们在以下区域提供基于服务的 Dataflow Shuffle:

    • us-west1(俄勒冈)
    • us-central1(爱荷华)
    • us-east1(南卡罗来纳)
    • us-east4(北弗吉尼亚)
    • northamerica-northeast1(蒙特利尔)
    • europe-west2(伦敦)
    • europe-west1(比利时)
    • europe-west4(荷兰)
    • europe-west3(法兰克福)
    • asia-southeast1(新加坡)
    • asia-east1(台湾)
    • asia-northeast1(东京)
    • australia-southeast1(悉尼)

    Dataflow Shuffle 未来也会陆续在其他区域提供。

    Java:SDK 2.x

    要在批处理流水线中使用基于服务的 Dataflow Shuffle,请指定以下参数:
    --experiments=shuffle_mode=service

    如果您对流水线使用 Dataflow Shuffle,请勿指定 --zone 参数,而应指定 --region 参数并将其值设置为目前提供 Shuffle 的一个区域。Dataflow 会自动选择指定区域中的地区。如果您指定了 --zone 参数并将其设置为可用区域之外的一个地区,则 Dataflow 会报告错误。

    Python

    要在批处理流水线中使用基于服务的 Dataflow Shuffle,请指定以下参数:
    --experiments=shuffle_mode=service

    如果您对流水线使用 Dataflow Shuffle,请勿指定 --zone 参数,而应指定 --region 参数并将其值设置为目前提供 Shuffle 的一个区域。Dataflow 会自动选择指定区域中的地区。如果您指定了 --zone 参数并将其设置为可用区域之外的一个地区,则 Dataflow 会报告错误。

    Java:SDK 1.x

    Dataflow Flexible Resource Scheduling

    Dataflow FlexRS 使用高级调度技术Dataflow Shuffle 服务并结合使用抢占式虚拟机 (VM) 实例和常规虚拟机,来降低批处理的费用。通过同时运行抢占式虚拟机和常规虚拟机,Dataflow 可改善以下情况的用户体验:Compute Engine 在系统事件期间中止抢占式虚拟机实例。当 Compute Engine 抢占了您正在使用的抢占式虚拟机时,FlexRS 帮助确保流水线继续运行,并且您不会丢失之前的工作成果。如需详细了解 FlexRS,请参阅在 Dataflow 中使用灵活的资源调度服务

    Dataflow Runner v2

    当前的生产 Dataflow 运行程序在运行 Apache Beam 流水线时利用特定于语言的工作器。 为了提高可扩缩性、通用性、可扩展性和效率,Dataflow 运行程序正在迁移到更基于服务的架构。 这些更改包括与 Shuffle 服务和 Streaming Engine 封装在一起的更高效且可移植的工作器架构。

    新的 Dataflow 运行程序 Dataflow Runner v2 适用于 Python 流处理流水线和批处理流水线。建议您首先尝试在当前工作负载中使用 Dataflow Runner v2,然后再在所有新流水线上默认启用它。您无需对流水线代码进行任何更改即可利用这个新架构。

    使用 Dataflow Runner v2 的优势

    从 Python 流处理流水线和批处理流水线开始,新功能将只在 Dataflow Runner v2 上可用。此外,Dataflow Runner v2 架构的效率提升可能会使 Dataflow 作业的性能提升。

    使用 Dataflow Runner v2 时,您可能会发现帐单有所减少。Dataflow Runner v2 的计费模式尚未最终确定,因此当新的运行程序在所有流水线中启用时,您的费用可能会增回到接近当前水平。

    使用 Dataflow Runner v2

    Dataflow Runner v2 适用于具有 Dataflow 区域端点的地区。

    Java:SDK 2.x

    Dataflow Runner v2 目前不适用于 Java。

    Python

    Dataflow Runner v2 需要将 Streaming Engine 用于流处理作业,将 Dataflow Shuffle 用于批处理作业。如需为相应的作业启用它们,请指定以下参数:
    --experiments=use_runner_v2

    调试 Dataflow Runner v2 作业

    如需使用 Dataflow Runner v2 调试作业,应遵循标准调试步骤;但是,在使用 Dataflow Runner v2 时,请注意以下事项:

    • Dataflow Runner v2 作业在工作器虚拟机上运行两种类型的进程:SDK 进程和运行程序自动化测试框架进程。 根据流水线和虚拟机类型,可能有一个或多个 SDK 进程,但每台虚拟机只有一个运行程序自动化测试框架进程。
    • SDK 进程运行用户代码和其他特定于语言的功能,而运行程序自动化测试框架进程则管理其他一切。
    • 在开始从 Dataflow 请求工作之前,运行程序自动化测试框架进程会等待所有 SDK 进程与其连接。
    • 如果工作器虚拟机在 SDK 进程启动期间下载并安装依赖项,则作业可能会延迟。 如果 SDK 进程存在问题(例如启动或安装库),则工作器会将其状态报告为运行状况不佳。
    • 工作器虚拟机日志(通过日志浏览器Dataflow 监控界面获取)包含来自运行程序自动化测试框架进程的日志以及来自 SDK 进程的日志。
    • 如需诊断用户代码中的问题,请检查来自 SDK 进程的工作器日志。如果在运行程序自动化测试框架日志中发现任何错误,请与支持团队联系以提交错误。