指定流水线执行参数

在 Apache Beam 程序完成流水线的构建后,您需要执行该流水线。流水线的执行过程独立于 Apache Beam 程序的执行;Apache Beam 程序负责构建流水线,而您编写的代码会生成一系列将由流水线运行程序执行的步骤。流水线运行程序可以是 Google Cloud 上的 Dataflow 托管式服务、第三方运行程序服务,或直接在本地环境中执行相关步骤的本地流水线运行程序。

您可以使用 Apache Beam SDK 类 PipelineOptions 来指定流水线运行程序及其他执行选项,也可以使用 PipelineOptions 来配置流水线的执行方式、执行位置以及所用资源。

大多数情况下,您需要使用 Dataflow 运行程序服务在 Google Cloud 托管资源上运行流水线。通过 Dataflow 服务运行流水线会创建 Dataflow 作业,该作业使用 Google Cloud 项目中的 Compute Engine 和 Cloud Storage 资源。

您也可以在本地运行流水线。在本地运行流水线时,流水线转换将在执行 Dataflow 程序的机器上执行。本地执行对测试和调试非常有帮助,尤其是当您的流水线能够使用较小的内存中数据集时。

设置 PipelineOptions

当您在 Dataflow 程序中创建 Pipeline 对象时,会传递 PipelineOptions。Dataflow 服务会在运行流水线时将 PipelineOptions 的副本发送到每个工作器实例。

Java:SDK 2.x

注意:您可以使用 ProcessContext.getPipelineOptions 方法访问任何 ParDoDoFn 实例内的 PipelineOptions

Python

Python 版 Apache Beam SDK 尚不支持此功能。

Java:SDK 1.x

通过命令行参数设置 PipelineOptions

您可以通过创建 PipelineOptions 对象并直接设置相应字段来配置流水线,不过,您也可以借助 Apache Beam SDK 中的命令行解析器,使用命令行参数来设置 PipelineOptions 中的字段。

Java:SDK 2.x

如需从命令行中读取选项,请使用 PipelineOptionsFactory.fromArgs 方法构建 PipelineOptions 对象,示例代码如下:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

注意:如果附加 .withValidation 方法,Dataflow 会检查必需的命令行参数并验证参数值。

使用 PipelineOptionsFactory.fromArgs 可解释遵循以下格式的命令行参数:

--<option>=<value>

通过这种方式构建 PipelineOptions,您可以将 org.apache.beam.sdk.options.PipelineOptions 的任意子接口的任意选项指定为命令行参数。

Python

如需从命令行中读取选项,请创建 PipelineOptions 对象,示例代码如下:

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

PipelineOptions(flags=argv) 参数可解释遵循以下格式的命令行参数:

--<option>=<value>

以这种方式构建 PipelineOptions,让您可以通过创建 PipelineOptions 子类指定任意选项。

Java:SDK 1.x

创建自定义选项

除了标准 PipelineOptions 之外,您还可以添加自己的自定义选项。Dataflow 的命令行解析器也可以使用以同一格式指定的命令行参数来设置自定义选项。

Java:SDK 2.x

要添加您自己的选项,请使用 getter 和 setter 方法为每个选项定义一个接口,示例如下:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

如需添加您自己的选项,请使用 add_argument() 方法(其行为与 Python 的标准 argparse 模块完全相同),如以下示例所示:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java:SDK 1.x

您还可以指定说明(在用户将 --help 作为命令行参数传递时显示)和默认值。

Java:SDK 2.x

您可以使用注释来设置说明和默认值,如下所示:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

我们建议您使用 PipelineOptionsFactory 注册接口,然后在创建 PipelineOptions 对象时传递该接口。使用 PipelineOptionsFactory 注册接口时,--help 可找出您的自定义选项接口并将其添加到 --help 命令的输出中。PipelineOptionsFactory 还将验证您的自定义选项是否与其他所有已注册选项兼容。

以下示例代码展示了如何使用 PipelineOptionsFactory 注册自定义选项接口:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

现在,您的流水线可以接受 --myCustomOption=value 作为命令行参数。

Python

您可以按如下方式设置说明和默认值:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

Java:SDK 1.x

配置 PipelineOptions 以在 Cloud Dataflow 服务上执行流水线

如需使用 Dataflow 托管式服务执行流水线,您必须在 PipelineOptions 中设置以下字段:

Java:SDK 2.x

  • project - 您的 Google Cloud 项目的 ID。
  • runner - 用于解析程序并构建流水线的流水线运行程序。在云端执行时,此字段必须是 DataflowRunner
  • gcpTempLocation - 供 Dataflow 暂存所有临时文件的 Cloud Storage 路径。您必须提前创建此存储分区,之后再运行流水线。如果您未指定 gcpTempLocation,则可以指定流水线选项 tempLocation,之后 gcpTempLocation 便会被设置为 tempLocation 的值。如果它们均未指定,系统将创建默认 gcpTempLocation
  • stagingLocation - 供 Dataflow 暂存二进制文件的 Cloud Storage 存储分区。如果您未设置此选项,则为 tempLocation 指定的位置也将用作暂存位置。
  • 如果该选项及 tempLocation 均未指定,系统将创建默认 gcpTempLocation。如果指定了 tempLocation,但未指定 gcpTempLocation,则 tempLocation 必须为 Cloud Storage 路径,gcpTempLocation 将默认设置成这个路径。如果未指定 tempLocation,但指定了 gcpTempLocation,系统将不填充 tempLocation

注意:如果您使用的是 Java 版 Apache Beam SDK 2.15.0 或更高版本,则还必须指定 region

Python

  • project - 您的 Google Cloud 项目的 ID。
  • runner - 用于解析程序并构建流水线的流水线运行程序。在云端执行时,此字段必须是 DataflowRunner
  • staging_location - 供 Dataflow 暂存工作器执行作业所需的代码包的 Cloud Storage 路径。
  • temp_location - 供 Dataflow 暂存流水线执行期间创建的临时作业文件的 Cloud Storage 路径。

注意:如果您使用的是 Python 版 Apache Beam SDK 2.15.0 或更高版本,则还必须指定 region

Java:SDK 1.x

您可以编程方式设置这些选项,也可以通过命令行进行指定。以下示例代码展示了如何通过以编程方式设置运行程序以及通过其他必需选项来构建流水线,从而使用 Dataflow 托管式服务执行流水线。

Java:SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

Java:SDK 1.x

构建流水线后,请指定所有的流水线读取、转换和写入对象,之后再运行流水线。

以下示例代码展示了如何使用命令行设置在 Dataflow 服务上执行流水线所必需的选项:

Java:SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

Java:SDK 1.x

构建流水线后,请指定所有的流水线读取、转换和写入对象,之后再运行流水线。

Java:SDK 2.x

通过命令行传递必需的选项时,请使用 --project--runner--gcpTempLocation 选项,还可以选择性地使用 --stagingLocation 选项。

Python

通过命令行传递必需的选项时,请使用 --project--runner--staging_location 选项。

Java:SDK 1.x

异步执行

Java:SDK 2.x

使用 DataflowRunner 会导致流水线在 Google 的云端异步执行。在流水线执行期间,您可以通过 Dataflow 监控界面Dataflow 命令行界面监控作业的进度、查看执行详情,并接收流水线执行结果的最新动态。

Python

使用 DataflowRunner 会导致流水线在 Google 的云端异步执行。在流水线执行期间,您可以通过 Dataflow 监控界面Dataflow 命令行界面监控作业的进度、查看执行详情,并接收流水线执行结果的最新动态。

Java:SDK 1.x

同步执行

Java:SDK 2.x

DataflowRunner 指定为流水线运行程序并明确调用 pipeline.run().waitUntilFinish()

如果您使用 DataflowRunner,并在从 pipeline.run() 返回的 PipelineResult 对象上调用 waitUntilFinish(),该流水线将在云端执行,但本地代码会等待云端作业执行完毕,并返回最终的 DataflowPipelineJob 对象。当作业运行时,Dataflow 服务会在等待时打印作业状态更新和控制台消息。

如果您之前是 Java SDK 1.x 用户,并在命令行中使用 --runner BlockingDataflowPipelineRunner 以交互方式引导主程序阻止流水线运行程序,直到该流水线终止,那么,在 Java 2.x 中,您的主程序需要明确调用 waitUntilFinish()

Python

如需在流水线执行完毕前一直阻止,请使用从运行程序的 run() 方法返回的 PipelineResult 对象的 wait_until_finish() 方法。

Java:SDK 1.x

注意:在命令行中按 Ctrl+C 不会取消作业。Dataflow 服务仍然在 Google Cloud 上执行作业;如需取消作业,您需要使用 Dataflow 监控界面Dataflow 命令行界面

流处理执行

Java:SDK 2.x

如果您的流水线从无界限数据源(例如 Pub/Sub)中读取数据,则该流水线将自动在流处理模式下执行。

如果您的流水线使用无界限数据源和接收器,则必须为无界限 PCollection 选择一个数据选取策略,然后才能使用 GroupByKey 等任意聚合操作。

Python

如果您的流水线使用无界限数据源或接收器(例如 Pub/Sub),则必须将 streaming 选项设置为 true。

流处理作业默认使用 Compute Engine 机器类型 n1-standard-2 或更高的机器类型。您不能替换此设置,因为 n1-standard-2 是运行流处理作业所需的最低机器类型。

如果您的流水线使用无界限数据源和接收器,则必须为无界限 PCollection 选择一个数据选取策略,然后才能使用 GroupByKey 等任意聚合操作。

Java:SDK 1.x

设置其他 Cloud Dataflow 流水线选项

如需在云端运行您的流水线,您可以编程方式设置 PipelineOptions 对象中的下列字段:

Java:SDK 2.x

字段 类型 说明 默认值
runner Class (NameOfRunner) 拟使用的 PipelineRunner。借助此字段,您可以在运行时确定 PipelineRunner DirectRunner(本地模式)
streaming boolean 指定是否启用流处理模式;如果启用,则为 true。 如果您的流水线从无界限源中读取数据,则默认值为 true;否则为 false
project String 您的 Google Cloud 项目的 ID。如果您需要使用 Dataflow 托管式服务运行流水线,则此字段必填。 如果未设置,则默认为 Cloud SDK 中当前配置的项目。
jobName String 正在执行的 Dataflow 作业的名称,与在 Dataflow 作业列表和作业详情中显示的名称一致。还用于更新现有流水线 Dataflow 会自动生成一个唯一名称。
gcpTempLocation String 用于暂存临时文件的 Cloud Storage 路径。必须是有效的 Cloud Storage 网址,且以 gs:// 开头。
stagingLocation String 用于暂存本地文件的 Cloud Storage 路径。必须是有效的 Cloud Storage 网址,且以 gs:// 开头。 如果未设置,则默认为您为 tempLocation 指定的值。
autoscalingAlgorithm String Dataflow 作业的自动扩缩模式。可能的值为 THROUGHPUT_BASED(启用自动扩缩)或 NONE(停用)。如需详细了解 Dataflow 托管式服务中自动扩缩的工作原理,请参阅自动扩缩功能 对于所有批处理 Dataflow 作业以及使用 Streaming Engine 的流处理作业,默认为 THROUGHPUT_BASED。对于不使用 Streaming Engine 的流处理作业,默认为 NONE
numWorkers int 执行流水线时使用的 Google Compute Engine 初始实例数。此选项决定了 Dataflow 服务在作业开始时启动的工作器数。 如果未指定,Dataflow 服务将确定合适的工作器数。
maxNumWorkers int 在您的流水线执行期间为其提供的 Compute Engine 实例的最大数量。请注意,此值可超过 numWorkers 指定的工作器初始数量,其目的是允许您的作业以自动方式或其他方式纵向扩容。 如果未指定,Dataflow 服务将确定合适的工作器数。
numberOfWorkerHarnessThreads int 每个工作器使用的线程数。 如果未指定,Dataflow 服务将确定每个工作器合适的线程数。
region String 指定用于部署 Dataflow 作业的区域端点 如果未设置,则默认为 us-central1
workerRegion String

指定用于启动工作器实例以运行您的流水线的 Compute Engine 区域。此选项用于在与用于部署、管理和监控作业的 region 不同的位置运行工作器。workerRegion 的地区是自动分配的。

注意:此选项不能与 workerZonezone 一起使用。

如果未设置,则默认为为 region 设置的值。
workerZone String

指定用于启动工作器实例以运行您的流水线的 Compute Engine 地区。此选项用于在与用于部署、管理和监控作业的 region 不同的位置运行工作器。

注意:此选项不能与 workerRegionzone 一起使用。

如果指定了 regionworkerRegionworkerZone 将默认为相应区域的地区。您可以通过指定其他地区来替换此行为。
zone String (已弃用)对于 Apache Beam SDK 2.17.0 或更早版本,这指定了用于启动工作器实例以运行您的流水线的 Compute Engine 地区。 如果指定了 regionzone 将默认为相应区域的地区。您可以通过指定其他地区来替换此行为。
dataflowKmsKey String 指定用于对静态数据进行加密的 CMEK(客户管理的加密密钥)。您可以通过 Cloud KMS 控制加密密钥。您还必须指定 gcpTempLocation 才能使用此功能。 如果未指定,则 Dataflow 会使用默认的 Google Cloud 加密(而不是 CMEK)。
flexRSGoal String 为自动扩缩的批处理作业指定 Flexible Resource Scheduling (FlexRS)。影响 numWorkersautoscalingAlgorithmzoneregionworkerMachineType 参数。如需了解详情,请参阅 FlexRS 流水线选项部分 如果未指定,则默认为 SPEED_OPTIMIZED,效果与省略此标志相同。如需启用 FlexRS,您必须指定值 COST_OPTIMIZED,允许 Dataflow 服务选择任何可用的折扣资源。
filesToStage List<String> 可供每个工作器使用的本地文件、文件目录或归档(例如 JAR 或 ZIP 文件)的非空列表。如果设置此选项,系统将仅上传您指定的那些文件(Java 类路径将被忽略)。您必须按照正确的类路径顺序指定所有资源。资源不仅指代码,还包括供所有工作器使用的配置文件及其他资源。您的代码可以使用 Java 的标准资源查找方法访问列出的资源。注意:指定目录路径并非最佳做法,原因是 Dataflow 会在上传文件之前对其进行压缩,这会增加启动时间成本。此外,请勿使用此选项将数据传输到要由流水线处理的工作器,因为与将原生 Cloud Storage/BigQuery API 和相应的 Dataflow 数据源结合使用相比,这样做要慢得多。 如果省略 filesToStage,Dataflow 将根据 Java 类路径推断要暂存的文件。左栏中提到的注意事项在此处也适用(要列出的文件类型及通过您的代码访问它们的方式)。
network String Compute Engine 网络,用于启动 Compute Engine 实例以运行流水线。了解如何指定您的网络 如果未设置,Google Cloud 会假设您打算使用名为 default 的网络。
subnetwork String Compute Engine 子网,用于启动 Compute Engine 实例以运行流水线。了解如何指定您的子网 Dataflow 服务确定默认值。
usePublicIps boolean 指定 Dataflow 工作器是否使用公共 IP 地址。如果该值设置为 false,则 Dataflow 工作器将使用专用 IP 地址进行所有通信。在这种情况下,如果指定了 subnetwork 选项,则系统会忽略 network 选项。确保指定的 networksubnetwork 已启用专用 Google 访问通道 如果未设置,则默认值为 true,且 Dataflow 工作器使用公共 IP 地址。
enableStreamingEngine boolean 指定是否启用 Dataflow Streaming Engine;如果启用,则为 true。通过启用 Streaming Engine,您可以在 Dataflow 服务后端运行流处理流水线的相关步骤,从而节省 CPU、内存和 Persistent Disk 存储资源。 默认值为 false。这意味着您的流处理流水线的步骤完全在工作器虚拟机上执行。
createFromSnapshot String 指定在创建流处理作业时使用的快照 ID。快照保存流处理流水线的状态,并允许您从该状态启动新版本的作业。如需详细了解快照,请参阅使用快照 如果未设置,则不使用任何快照来创建作业。
hotKeyLoggingEnabled boolean 指定在流水线中检测到热键时,将该键输出到用户的 Cloud Logging 项目中。 如果未设置,将仅记录是否存在热键。
diskSizeGb int

在每个远程 Compute Engine 工作器实例上使用的磁盘的大小(以 GB 为单位)。如果进行设置,请指定至少 30 GB 的磁盘大小,用于存储工作器启动映像和本地日志。

对于使用 Dataflow Shuffle 的批处理作业,此选项设置工作器虚拟机启动磁盘的大小。对于不使用 Dataflow Shuffle 的批处理作业,此选项设置用于存储重排数据的磁盘大小;启动磁盘大小不受影响。

对于使用 Streaming Engine 的流处理作业,此选项设置启动磁盘的大小。对于不使用 Streaming Engine 的流处理作业,该选项设置由 Dataflow 服务创建的每个额外的永久性磁盘的大小;启动磁盘不受影响。如果流处理作业不使用 Streaming Engine,您可以使用实验标志 streaming_boot_disk_size_gb 设置启动磁盘大小。例如,指定 --experiments=streaming_boot_disk_size_gb=80 以创建 80 GB 的启动磁盘。

设置为 0 可使用您的 Cloud Platform 项目中定义的默认大小。

如果批量作业使用 Dataflow Shuffle,则默认值为 25 GB;否则,默认值为 250 GB。

如果流处理作业使用了 Streaming Engine,则默认值为 30 GB;否则,默认值为 400 GB。

警告:减小磁盘大小将减少可用的 shuffle I/O。不使用 Dataflow Shuffle 或 Streaming Engine 的 shuffle 受到限制的作业可能会导致运行时间和作业成本增加。

serviceAccount String 使用 my-service-account-name@<project-id>.iam.gserviceaccount.com 格式指定用户管理的控制器服务帐号。如需了解详情,请参阅 Cloud Dataflow 安全性和权限页面的控制器服务帐号部分。 如果未设置,则工作器会将您项目的 Compute Engine 服务帐号用作控制器服务帐号。
workerDiskType String 拟使用的永久性磁盘的类型,由磁盘类型资源的完整网址指定。例如,使用 compute.googleapis.com/projects//zones//diskTypes/pd-ssd 可指定 SSD 永久性磁盘。如需了解详情,请参阅适用于 diskTypes 的 Compute Engine API 参考页面。 Dataflow 服务确定默认值。
workerMachineType String

Dataflow 在启动工作器虚拟机时使用的 Compute Engine 机器类型。您可以使用任何可用的 Compute Engine 机器类型系列以及自定义机器类型。

为获得最佳效果,请使用 n1 机器类型。Dataflow 服务等级协议不支持共享核心机器类型(例如 f1g1 系列工作器)。

请注意,Dataflow 根据工作器中 vCPU 的数量和内存 GB 数进行计费。计费与机器类型系列无关。

如果您未设置此选项,Dataflow 服务将根据您的作业选择机器类型。

如需查看流水线配置选项的完整列表,请参阅适用于 PipelineOptions 接口(及其子接口)的 Java 版 API 参考文档。

Python

字段 类型 说明 默认值
runner str 拟使用的 PipelineRunner。此字段可以是 DirectRunnerDataflowRunner DirectRunner(本地模式)
streaming bool 指定是否启用流处理模式;如果启用,则为 true。 false
project str 您的 Google Cloud 项目的 ID。如果您需要使用 Dataflow 托管式服务运行流水线,则此字段必填。 如果未设置,将抛出一个错误。
job_name String 正在执行的 Dataflow 作业的名称,与在 Dataflow 作业列表和作业详情中显示的名称一致。 Dataflow 会自动生成一个唯一名称。
temp_location str 用于暂存临时文件的 Cloud Storage 路径。必须是有效的 Cloud Storage 网址,且以 gs:// 开头。 如果未设置,则默认为 staging_location 的值。您必须至少指定 temp_locationstaging_location 才能在 Google 云端运行流水线。
staging_location str 用于暂存本地文件的 Cloud Storage 路径。必须是有效的 Cloud Storage 网址,且以 gs:// 开头。 如果未设置,则默认为 temp_location 中的暂存目录。您必须至少指定 temp_locationstaging_location 才能在 Google 云端运行流水线。
autoscaling_algorithm str Dataflow 作业的自动扩缩模式。可能的值为 THROUGHPUT_BASED(启用自动扩缩)或 NONE(停用)。如需详细了解 Dataflow 托管式服务中自动扩缩的工作原理,请参阅自动扩缩功能 对于所有批处理 Dataflow 作业以及使用 Streaming Engine 的流处理作业,默认为 THROUGHPUT_BASED。对于不使用 Streaming Engine 的流处理作业,默认为 NONE
num_workers int 执行流水线时使用的 Compute Engine 实例数。 如果未指定,Dataflow 服务将确定合适的工作器数。
max_num_workers int 在您的流水线执行期间为其提供的 Compute Engine 实例的最大数量。请注意,此值可超过 num_workers 指定的工作器初始数量,其目的是允许您的作业以自动方式或其他方式纵向扩容。 如果未指定,Dataflow 服务将确定合适的工作器数。
number_of_worker_harness_threads int 每个工作器使用的线程数。 如果未指定,Dataflow 服务将确定每个工作器合适的线程数。 要使用此参数,您还需要使用 --experiments=use_runner_v2 标志。
region str 指定用于部署 Dataflow 作业的区域端点 如果未设置,则默认为 us-central1
worker_region String

指定用于启动工作器实例以运行您的流水线的 Compute Engine 区域。此选项用于在与用于部署、管理和监控作业的 region 不同的位置运行工作器。worker_region 的地区是自动分配的。

注意:此选项不能与 worker_zonezone 一起使用。

如果未设置,则默认为为 region 设置的值。
worker_zone String

指定用于启动工作器实例以运行您的流水线的 Compute Engine 地区。此选项用于在与用于部署、管理和监控作业的 region 不同的位置运行工作器。

注意:此选项不能与 worker_regionzone 一起使用。

如果指定了 regionworker_regionworker_zone 将默认为相应区域的地区。您可以通过指定其他地区来替换此行为。
zone str (已弃用)对于 Apache Beam SDK 2.17.0 或更早版本,这指定了用于启动工作器实例以运行您的流水线的 Compute Engine 地区。 如果指定了 regionzone 将默认为相应区域的地区。您可以通过指定其他地区来替换此行为。
dataflow_kms_key str 指定用于对静态数据进行加密的 CMEK(客户管理的加密密钥)。您可以通过 Cloud KMS 控制加密密钥。您还必须指定 temp_location 才能使用此功能。 如果未指定,则 Dataflow 会使用默认的 Google Cloud 加密(而不是 CMEK)。
flexrs_goal str 为自动扩缩的批处理作业指定 Flexible Resource Scheduling (FlexRS)。影响 num_workersautoscaling_algorithmzoneregionmachine_type 参数。如需了解详情,请参阅 FlexRS 流水线选项部分 如果未指定,则默认为 SPEED_OPTIMIZED,效果与省略此标志相同。如需启用 FlexRS,您必须指定值 COST_OPTIMIZED,允许 Dataflow 服务选择任何可用的折扣资源。
network str Compute Engine 网络,用于启动 Compute Engine 实例以运行流水线。了解如何指定您的网络 如果未设置,Google Cloud 会假设您打算使用名为 default 的网络。
subnetwork str Compute Engine 子网,用于启动 Compute Engine 实例以运行流水线。了解如何指定您的子网 Dataflow 服务确定默认值。
use_public_ips bool 指定 Dataflow 工作器必须使用公共 IP 地址。如果该值设置为 false,则 Dataflow 工作器将使用专用 IP 地址进行所有通信。在这种情况下,如果指定了 subnetwork 选项,则系统会忽略 network 选项。确保指定的 networksubnetwork 已启用专用 Google 访问通道。 此选项要求使用 Python 版 Beam SDK。已弃用的 Python 版 Dataflow SDK 不支持此选项。 如果未设置,则 Dataflow 工作器将使用公共 IP 地址。
enable_streaming_engine bool 指定是否启用 Dataflow Streaming Engine;如果启用,则为 true。通过启用 Streaming Engine,您可以在 Dataflow 服务后端运行流处理流水线的相关步骤,从而节省 CPU、内存和 Persistent Disk 存储资源。 默认值为 false。这意味着您的流处理流水线的步骤完全在工作器虚拟机上执行。
disk_size_gb int

在每个远程 Compute Engine 工作器实例上使用的磁盘的大小(以 GB 为单位)。如果进行设置,请指定至少 30 GB 的磁盘大小,用于存储工作器启动映像和本地日志。

对于使用 Dataflow Shuffle 的批处理作业,此选项设置工作器虚拟机启动磁盘的大小。对于不使用 Dataflow Shuffle 的批处理作业,此选项设置用于存储重排数据的磁盘大小;启动磁盘大小不受影响。

对于使用 Streaming Engine 的流处理作业,此选项设置启动磁盘的大小。对于不使用 Streaming Engine 的流处理作业,该选项设置由 Dataflow 服务创建的每个额外的永久性磁盘的大小;启动磁盘不受影响。如果流处理作业不使用 Streaming Engine,您可以使用实验标志 streaming_boot_disk_size_gb 设置启动磁盘大小。例如,指定 --experiments=streaming_boot_disk_size_gb=80 以创建 80 GB 的启动磁盘。

设置为 0 可使用您的 Cloud Platform 项目中定义的默认大小。

如果批量作业使用 Dataflow Shuffle,则默认值为 25 GB;否则,默认值为 250 GB。

如果流处理作业使用了 Streaming Engine,则默认值为 30 GB;否则,默认值为 400 GB。

警告:减小磁盘大小将减少可用的 shuffle I/O。不使用 Dataflow Shuffle 或 Streaming Engine 的 shuffle 受到限制的作业可能会导致运行时间和作业成本增加。

service_account_email str 使用 my-service-account-name@<project-id>.iam.gserviceaccount.com 格式指定用户管理的控制器服务帐号。如需了解详情,请参阅 Cloud Dataflow 安全性和权限页面的控制器服务帐号部分。 如果未设置,则工作器会将您项目的 Compute Engine 服务帐号用作控制器服务帐号。
worker_disk_type str 拟使用的永久性磁盘的类型,由磁盘类型资源的完整网址指定。例如,使用 compute.googleapis.com/projects//zones//diskTypes/pd-ssd 可指定 SSD 永久性磁盘。如需了解详情,请参阅适用于 diskTypes 的 Compute Engine API 参考页面。 Dataflow 服务确定默认值。
machine_type str

Dataflow 在启动工作器虚拟机时使用的 Compute Engine 机器类型。您可以使用任何可用的 Compute Engine 机器类型系列以及自定义机器类型。

为获得最佳效果,请使用 n1 机器类型。Dataflow 服务等级协议不支持共享核心机器类型(例如 f1g1 系列工作器)。

请注意,Dataflow 根据工作器中 vCPU 的数量和内存 GB 数进行计费。计费与机器类型系列无关。

如果您未设置此选项,Dataflow 服务将根据您的作业选择机器类型。

Java:SDK 1.x

配置 PipelineOptions 以在本地执行流水线

除了在托管的云端资源上运行流水线,您也可以选择在本地执行流水线。对于针对小型数据集测试、调试或运行流水线,本地执行具有一定的优势。例如,本地执行不依赖远程 Dataflow 服务及关联的 Google Cloud 项目。

在本地执行流水线时,我们强烈建议您针对小到本地内存足以满足其需求的数据集运行流水线。您可以使用 Create 转换创建较小的内存中数据集,也可以使用 Read 转换来处理小型本地文件或远程文件。本地执行可轻松快速地执行测试和调试,同时对外部的依赖性更低,但会受到本地环境中可用内存的限制。

以下示例代码展示了如何构建在本地环境中执行的流水线。

Java:SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

注意:对于本地模式,您无需设置运行程序,因为 DirectRunner 已是默认的运行程序。不过,您需要明确地将 DirectRunner 添加为依赖项,或者将其添加到类路径中。

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

注意:对于本地模式,您无需设置运行程序,因为 DirectRunner 已是默认的运行程序。

Java:SDK 1.x

流水线构建完毕后即可运行。

设置其他本地流水线选项

在本地执行流水线时,PipelineOptions 中属性的默认值一般已经够用。

Java:SDK 2.x

您可以在 Java API 参考中查找 Java PipelineOptions 的默认值;如需了解完整的详细信息,请参阅 PipelineOptions 类列表。

如果您的流水线使用 Google Cloud 服务(例如 BigQuery 或 Cloud Storage for IO),则您可能需要设置某些 Google Cloud 项目和凭据选项。在此类情况下,您应该使用 GcpOptions.setProject 设置 Google Cloud 项目 ID。您可能还需要明确设置凭据。如需了解完整的详细信息,请参阅 GcpOptions 类。

Python

您可以在 Python API 参考中查找 Python PipelineOptions 的默认值;如需了解完整的详细信息,请参阅 PipelineOptions 模块列表。

如果您的流水线使用 Google Cloud 服务(例如 BigQuery 或 Cloud Storage for IO),则您可能需要设置某些 Google Cloud 项目和凭据选项。在此类情况下,您应该使用 options.view_as(GoogleCloudOptions).project 设置 Google Cloud 项目 ID。您可能还需要明确设置凭据。如需了解完整的详细信息,请参阅 GoogleCloudOptions 类。

Java:SDK 1.x