创建经典模板

Dataflow 模板使用运行时参数接受仅在流水线执行期间可用的值。如需对模板流水线的执行进行自定义,可以将这些参数传递给在流水线中运行的函数(例如 DoFn)。

要从 Apache Beam 流水线创建模板,必须修改流水线代码以支持运行时参数:

然后,创建并暂存模板

运行时参数和 ValueProvider 接口

ValueProvider 接口允许流水线接受运行时参数。Apache Beam 提供三种类型的 ValueProvider 对象。

名称 说明
RuntimeValueProvider

RuntimeValueProvider 是默认的 ValueProvider 类型。 RuntimeValueProvider 允许流水线接受仅在流水线执行期间可用的值。该值在流水线构建期间不可用,因此您无法用它来更改流水线的工作流程图。

您可以使用 isAccessible() 来检查 ValueProvider 的值是否可用。如果在流水线执行之前调用 get(),Apache Beam 将返回错误:
Value only available at runtime, but accessed from a non-runtime context.

如果您无法提前知道值,请使用 RuntimeValueProvider

StaticValueProvider

StaticValueProvider 允许您为流水线提供一个静态值。该值在流水线构建期间可用,因此您可以用它来更改流水线的工作流程图。

如果您提前知道值,请使用 StaticValueProvider。有关示例,请参见 StaticValueProvider 部分

NestedValueProvider

NestedValueProvider 允许您从另一个 ValueProvider 对象计算值。NestedValueProvider 封装 ValueProvider,并且封装的 ValueProvider 的类型决定了在流水线构造期间能否访问值。

如果要在运行时使用该值计算另一个值,请使用 NestedValueProvider。有关示例,请参见 NestedValueProvider 部分

Dataflow Runner 不支持 Pub/Sub 主题和订阅参数的 ValueProvider 选项。如果您需要运行时参数中的 Pub/Sub 选项,请切换到使用 Flex 模板

修改代码以使用运行时参数

本部分介绍如何使用 ValueProviderStaticValueProviderNestedValueProvider

在流水线选项中使用 ValueProvider

为要在运行时设置或使用的所有流水线选项使用 ValueProvider

例如,以下 WordCount 代码段不支持运行时参数。该代码会添加一个输入文件选项、创建一个流水线并从输入文件中读取行。

Java:SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java:SDK 1.x

如需添加运行时参数支持,请修改输入文件选项以使用 ValueProvider

Java:SDK 2.x

对于输入文件选项类型,请使用 ValueProvider<String>(而不是 String)。

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argument 替换为 add_value_provider_argument

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java:SDK 1.x

在函数中使用 ValueProvider

如需在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider 参数。

以下示例包含一个整数 ValueProvider 选项和一个做整数加法的简单函数。该函数依赖于 ValueProvider 整数。在执行期间,流水线会将 MySumFn 应用于 PCollection(包含 [1, 2, 3])中的每个整数。如果运行时值为 10,则生成的 PCollection 将包含 [11, 12, 13]

Java:SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java:SDK 1.x

使用 StaticValueProvider

如需为流水线提供静态值,请使用 StaticValueProvider

此示例使用 MySumFn,它是一个采用 ValueProvider<Integer>DoFn。如果您提前知道参数的值,则可以使用 StaticValueProvider 将静态值指定为 ValueProvider

Java:SDK 2.x

此代码获取流水线运行时的值:

  .apply(ParDo.of(new MySumFn(options.getInt())))

相反,您可以将 StaticValueProvider 用于静态值:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

此代码获取流水线运行时的值:

  beam.ParDo(MySumFn(user_options.templated_int))

相反,您可以将 StaticValueProvider 用于静态值:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java:SDK 1.x

在实现支持常规参数和运行时参数的 I/O 模块时,也可以使用 StaticValueProviderStaticValueProvider 减少了实现两个类似方法的代码重复。

Java:SDK 2.x

此示例的源代码来自 Apache Beam 的 TextIO.java(位于 GitHub 上)

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

在此示例中,存在一个同时接受 stringValueProvider 参数的构造参数。如果该参数是 string,它将被转换为 StaticValueProvider

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java:SDK 1.x

使用 NestedValueProvider

如需从另一个 ValueProvider 对象计算值,请使用 NestedValueProvider

NestedValueProviderValueProviderSerializableFunction 转换函数作为输入。在对 NestedValueProvider 调用 .get() 时,转换函数会根据 ValueProvider 值创建一个新值。此转换允许您使用 ValueProvider 值来创建所需的最终值。

在以下示例中,用户提供文件名 file.txt。转换将路径 gs://directory_name/ 添加到文件名之前。调用 .get() 将返回 gs://directory_name/file.txt

Java:SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Java:SDK 1.x

元数据

您可以使用附加元数据扩展模板,以便在执行模板时验证自定义参数。如果要为模板创建元数据,您需要执行以下操作:

  1. 使用下表中的参数创建名为 <template-name>_metadata 的 JSON 格式文件。
  2. 将 JSON 格式的文件存储在 Cloud Storage 中模板所在的文件夹中。

元数据参数

参数键 必填 值的说明
name 模板的名称。
description 对模板进行说明的一小段文本。
parameters 否。默认设为空数组。 模板将使用的一组附加参数。
name 模板中使用的参数的名称。
label 一个人类可读的标签(将在界面中用于标记参数)。
helpText 对参数进行说明的一小段文本。
isOptional 否。默认设为 false。 如果参数是必需的,则为 false,如果参数是可选的,则为 true。 如果您没有为元数据添加此参数键,则元数据会成为必需参数。
regexes 否。默认设为空数组。 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如:["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。

示例元数据文件

Java

Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Dataflow 服务使用以下元数据验证 wordcount 模板的自定义参数:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。

管道 I/O 和运行时参数

Java:SDK 2.x

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持特定连接器和方法,请参阅 I/O 连接器的 API 参考文档。受支持的方法具有一个带有 ValueProvider 的过载。如果方法没有过载,则此方法不支持运行时参数。以下 I/O 连接器至少具有部分 ValueProvider 支持:

  • 基于文件的 IO:TextIOAvroIOFileIOTFRecordIOXmlIO
  • BigQueryIO*
  • BigtableIO(需要 SDK 2.3.0 或更高版本)
  • PubSubIO
  • SpannerIO

Python

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持 I/O 连接器及其方法,请参阅连接器的 API 参考文档。以下 I/O 连接器接受运行时参数:

  • 基于文件的 IO:textioavroiotfrecordio

Java:SDK 1.x

创建和暂存模板

写入流水线后,您必须创建并暂存模板文件。

请参阅以下示例,了解如何暂存模板文件:

Java:SDK 2.x

此 Maven 命令会在使用 --templateLocation 指定的 Cloud Storage 位置创建和暂存模板。

注意:如果您使用的是 Java 版 Apache Beam SDK 2.15.0 或更高版本,则还必须指定 --region

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
                  --region=REGION"
    

验证 templateLocation 路径是否正确。 请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • TEMPLATE_NAME:您的模板的名称
  • com.example.myclass:您的 Java 类

Python

此 Python 命令会在使用 --template_location 指定的 Cloud Storage 位置创建和暂存模板。

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME

验证 template_location 路径是否正确。 请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • TEMPLATE_NAME:您的模板的名称
  • examples.mymodule:您的 Python 模块

Java:SDK 1.x

创建和暂存模板后,下一步是执行模板