设置 Dataflow 流水线选项

本页面介绍了如何为 Dataflow 作业设置流水线选项。这些流水线选项用来配置流水线的运行方式、运行位置以及所用资源。

流水线的执行过程独立于 Apache Beam 程序的执行。您编写的 Apache Beam 程序会为延迟执行构造流水线。这意味着该程序会生成任何受支持的 Apache Beam 运行程序可以执行的一系列步骤。 兼容的运行程序包括 Google Cloud 上的 Dataflow 运行程序,以及直接在本地环境中执行流水线的直接运行程序。

您可以在运行时将参数传递给 Dataflow 作业。如需详细了解如何在运行时设置流水线选项,请参阅配置流水线选项

将流水线选项与 Apache Beam SDK 搭配使用

您可以使用以下 SDK 为 Dataflow 作业设置流水线选项:

  • Python 版 Apache Beam SDK
  • Java 版 Apache Beam SDK
  • Go 版 Apache Beam SDK

如需使用 SDK,请使用 Apache Beam SDK 类 PipelineOptions 设置流水线运行程序和其他执行参数。

您可以通过两种方式指定流水线选项:

  • 通过提供流水线选项列表,以程序化方式设置流水线选项。
  • 运行流水线代码时,直接在命令行中设置流水线选项。

以程序化方式设置流水线选项

您可以通过创建和修改 PipelineOptions 对象来以编程方式设置流水线选项。

Java

使用 PipelineOptionsFactory.fromArgs 方法构建 PipelineOptions 对象。

如需查看示例,请参阅本页面上的在 Dataflow 上启动示例部分。

Python

创建一个 PipelineOptions 对象。

如需查看示例,请参阅本页面上的在 Dataflow 上启动示例部分。

Go

Go 版 Apache Beam SDK 不支持使用 PipelineOptions 以编程方式设置流水线选项。使用 Go 命令行参数。

如需查看示例,请参阅本页面上的在 Dataflow 上启动示例部分。

在命令行中设置流水线选项

您可以使用命令行参数设置流水线选项。

Java

以下示例语法来自 Java 快速入门中的 WordCount 流水线。

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

请替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • REGIONDataflow 区域,例如 us-central1

Python

以下示例语法来自 Python 快速入门中的 WordCount 流水线。

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

替换以下内容:

  • DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域,例如 europe-west1

    --region 标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。

  • STORAGE_BUCKET:Cloud Storage 存储桶名称

  • PROJECT_ID:Google Cloud 项目 ID

Go

以下示例语法来自 Go 快速入门中的 WordCount 流水线。

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

请替换以下内容:

  • BUCKET_NAME:Cloud Storage 存储桶名称

  • PROJECT_ID:Google Cloud 项目 ID

  • DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域。例如 europe-west1--region 标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。

设置实验性流水线选项

在 Java、Python 和 Go SDK 中,experiments 流水线选项启用实验性或正式发布前 Dataflow 功能。

以程序化方式设置

如需以程序化方式设置 experiments 选项,请使用以下语法。

Java

PipelineOptions 对象中,使用以下语法添加 experiments 选项。此示例使用实验标志将启动磁盘大小设置为 80 GB。

options.setExperiments("streaming_boot_disk_size_gb=80")

如需查看展示如何创建 PipelineOptions 对象的示例,请参阅本页面上的在 Dataflow 上启动示例部分。

Python

PipelineOptions 对象中,使用以下语法添加 experiments 选项。此示例使用实验标志将启动磁盘大小设置为 80 GB。

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

如需查看展示如何创建 PipelineOptions 对象的示例,请参阅本页面上的在 Dataflow 上启动示例部分。

Go

Go 版 Apache Beam SDK 不支持使用 PipelineOptions 以编程方式设置流水线选项。使用 Go 命令行参数。

在命令行中设置

如需在命令行中设置 experiments 选项,请使用以下语法。

Java

此示例使用实验标志将启动磁盘大小设置为 80 GB。

--experiments=streaming_boot_disk_size_gb=80

Python

此示例使用实验标志将启动磁盘大小设置为 80 GB。

--experiments=streaming_boot_disk_size_gb=80

Go

此示例使用实验标志将启动磁盘大小设置为 80 GB。

--experiments=streaming_boot_disk_size_gb=80

在模板中设置

如需在运行 Dataflow 模板时启用实验性功能,请使用 --additional-experiments 标志。

经典模板

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Flex 模板

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

访问流水线选项对象

在 Apache Beam 程序中创建 Pipeline 对象时,会传递 PipelineOptions。Dataflow 服务会在运行流水线时将 PipelineOptions 的副本发送到每个工作器。

Java

使用 ProcessContext.getPipelineOptions 方法访问任何 ParDo 转换的 DoFn 实例内的 PipelineOptions

Python

Python 版 Apache Beam SDK 不支持此功能。

Go

使用 beam.PipelineOptions 访问流水线选项。

在 Dataflow 上启动

使用 Dataflow 运行程序服务在托管式 Google Cloud 资源上运行作业。使用 Dataflow 运行流水线会创建一个 Dataflow 作业,该作业使用 Google Cloud 项目中的 Compute Engine 和 Cloud Storage 资源。 如需了解 Dataflow 权限,请参阅 Dataflow 安全和权限

Dataflow 作业在流水线执行期间使用 Cloud Storage 来存储临时文件。为避免支付不必要的存储费用,请在 Dataflow 作业用于临时存储的存储桶上关闭软删除功能。如需了解详情,请参阅从存储桶中移除软删除政策

设置必需选项

如需使用 Dataflow 运行流水线,请设置以下流水线选项:

Java

  • project:您的 Google Cloud 项目的 ID。
  • runner:执行您的流水线的流水线运行程序。对于 Google Cloud 执行时,此字段必须是 DataflowRunner
  • gcpTempLocation:供 Dataflow 暂存大部分临时文件的 Cloud Storage 路径。如果要指定存储桶,您必须提前创建存储桶。如果您未设置 gcpTempLocation,可以设置流水线选项 tempLocation,然后将 gcpTempLocation 设置为 tempLocation 的值。如果它们均未指定,系统会创建默认 gcpTempLocation
  • stagingLocation:供 Dataflow 暂存二进制文件的 Cloud Storage 路径。如果您使用的是 Apache Beam SDK 2.28 或更高版本,请勿设置此选项。对于 Apache Beam SDK 2.28 或更低版本,如果您未设置此选项,则您为 tempLocation 指定的位置用作暂存位置。

    如果该选项及 tempLocation 均未指定,则系统会创建默认 gcpTempLocation。如果指定了 tempLocation,但未指定 gcpTempLocation,则 tempLocation 必须是 Cloud Storage 路径,gcpTempLocation 默认设为该路径。如果未指定 tempLocation,但指定了 gcpTempLocation,则系统不会填充 tempLocation

Python

  • project:您的 Google Cloud 项目 ID。
  • region:您的 Dataflow 作业的区域。
  • runner:执行您的流水线的流水线运行程序。对于 Google Cloud 执行时,此字段必须是 DataflowRunner
  • temp_location:供 Dataflow 暂存流水线执行期间创建的临时作业文件的 Cloud Storage 路径。

Go

  • project:您的 Google Cloud 项目 ID。
  • region:您的 Dataflow 作业的区域。
  • runner:执行您的流水线的流水线运行程序。对于 Google Cloud 执行时,此字段必须是 dataflow
  • staging_location:供 Dataflow 暂存流水线执行期间创建的临时作业文件的 Cloud Storage 路径。

以程序化方式设置流水线选项

以下示例代码展示了如何以编程方式设置运行程序和其他必要选项来构建流水线,以使用 Dataflow 执行流水线。

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

Go 版 Apache Beam SDK 使用 Go 命令行参数。使用 flag.Set() 设置标志值。

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

构建流水线后,请指定所有的流水线读取、转换和写入对象,之后再运行流水线。

使用命令行中的流水线选项

以下示例展示了如何使用命令行中指定的流水线选项。此示例未以编程的方式设置流水线选项:

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

使用 Python argparse 模块解析命令行选项。

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

使用 Go flag 软件包解析命令行选项。您必须先解析选项,然后才能调用 beam.Init()。在此示例中,output 是一个命令行选项。

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

构建流水线后,请指定所有的流水线读取、转换和写入对象,之后再运行流水线。

控制执行模式

当 Apache Beam 程序在 Dataflow 等服务上运行流水线时,该程序可以异步运行流水线,也可以阻塞流水线,直到流水线完成。您可以使用以下指南来更改此行为。

Java

当 Apache Beam Java 程序在 Dataflow 等服务上运行流水线时,通常会异步执行。要运行流水线并等待作业完成,请将 DataflowRunner 设置为流水线运行程序并明确调用 pipeline.run().waitUntilFinish()

如果您使用 DataflowRunner,并在从 pipeline.run() 返回的 PipelineResult 对象上调用 waitUntilFinish(),则该流水线将在 Google Cloud 上执行,但本地代码会等待云端工作完成并返回最终的 DataflowPipelineJob 对象。当作业运行时,Dataflow 服务会在等待时打印作业状态更新和控制台消息。

Python

当 Apache Beam Python 程序在 Dataflow 等服务上运行流水线时,通常会异步执行。如需在流水线执行完毕前一直阻止,请使用从运行程序的 run() 方法返回的 PipelineResult 对象的 wait_until_finish() 方法。

Go

当 Apache Beam Go 程序在 Dataflow 上运行流水线时,它默认是同步的,直到流水线完成为止。如果您不想进行阻止,可以通过以下两个选项进行:

  1. 使用 Go 例程启动作业。

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. 使用 jobopts 软件包中的 --async 命令行标志。

如需查看执行详情、监控进度和验证作业的完成状态,请使用 Dataflow 监控界面Dataflow 命令行界面

使用流处理来源

Java

如果您的流水线从无界限数据源(例如 Pub/Sub)中读取数据,则该流水线将自动在流处理模式下执行。

Python

如果您的流水线使用无界限数据源(例如 Pub/Sub),则必须将 streaming 选项设置为 true。

Go

如果您的流水线从无界限数据源(例如 Pub/Sub)中读取数据,则该流水线将自动在流处理模式下执行。

流处理作业默认使用 Compute Engine 机器类型 n1-standard-2 或更高的机器类型。

在本地启动

除了在托管的云端资源上运行流水线,您也可以选择在本地执行流水线。对于针对小型数据集测试、调试或运行流水线,本地执行具有一定的优势。例如,本地执行不依赖远程 Dataflow 服务及关联的 Google Cloud 项目。

在本地执行流水线时,您必须针对小到本地内存足以满足其需求的数据集运行流水线。您可以使用 Create 转换创建较小的内存中数据集,也可以使用 Read 转换来处理小型本地文件或远程文件。本地执行通常可更快速轻松地执行测试和调试,同时对外部的依赖性更低,但会受到本地环境中可用内存的限制。

以下示例代码展示了如何构建在本地环境中执行的流水线。

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

流水线构建完毕后即可运行。

创建自定义流水线选项

除了标准 PipelineOptions 之外,您还可以添加自己的自定义选项。Apache Beam 命令行还可以使用以同一格式指定的命令行参数来解析自定义选项。

Java

如需添加您自己的选项,请使用 getter 和 setter 方法为每个选项定义一个接口,示例如下:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

如需添加您自己的选项,请使用 add_argument() 方法(其行为与 Python 的标准 argparse 模块完全相同),如以下示例所示:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

如需添加您自己的选项,请使用 Go 标志软件包,如以下示例所示:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

您还可以指定说明(在用户将 --help 作为命令行参数传递时显示)和默认值。

Java

您可以使用注释来设置说明和默认值,如下所示:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

我们建议您使用 PipelineOptionsFactory 注册接口,然后在创建 PipelineOptions 对象时传递该接口。使用 PipelineOptionsFactory 注册接口时,--help 可找出您的自定义选项接口并将其添加到 --help 命令的输出中。PipelineOptionsFactory 验证您的自定义选项是否与其他所有已注册选项兼容。

以下示例代码展示了如何使用 PipelineOptionsFactory 注册自定义选项接口:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

现在,您的流水线可以接受 --myCustomOption=value 作为命令行参数。

Python

您可以按如下方式设置说明和默认值:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

您可以按如下方式设置说明和默认值:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)