创建经典模板

Dataflow 模板使用运行时参数接受仅在流水线执行期间可用的值。如需对模板流水线的执行进行自定义,可以将这些参数传递给在流水线中运行的函数(例如 DoFn)。

如需从 Apache Beam 流水线创建模板,必须修改流水线代码以支持运行时参数:

然后,创建并暂存模板

运行时参数和 ValueProvider 接口

ValueProvider 接口允许流水线接受运行时参数。Apache Beam 提供三种类型的 ValueProvider 对象。

名称 说明
RuntimeValueProvider

RuntimeValueProvider 是默认的 ValueProvider 类型。 RuntimeValueProvider 允许流水线接受仅在流水线执行期间可用的值。该值在流水线构建期间不可用,因此您无法用它来更改流水线的工作流程图。

您可以使用 isAccessible() 来检查 ValueProvider 的值是否可用。如果在流水线执行之前调用 get(),Apache Beam 将返回错误:
Value only available at runtime, but accessed from a non-runtime context.

如果您无法提前知道值,请使用 RuntimeValueProvider

StaticValueProvider

StaticValueProvider 允许您为流水线提供一个静态值。该值在流水线构建期间可用,因此您可以用它来更改流水线的工作流程图。

如果您提前知道值,请使用 StaticValueProvider。有关示例,请参见 StaticValueProvider 部分

NestedValueProvider

NestedValueProvider 允许您从另一个 ValueProvider 对象计算值。NestedValueProvider 封装 ValueProvider,并且封装的 ValueProvider 的类型决定了在流水线构造期间能否访问值。

如果要在运行时使用该值计算另一个值,请使用 NestedValueProvider。有关示例,请参见 NestedValueProvider 部分

注意:Python 版 Apache Beam SDK 不支持 NestedValueProvider

修改代码以使用运行时参数

本部分介绍如何使用 ValueProviderStaticValueProviderNestedValueProvider

在流水线选项中使用 ValueProvider

为要在运行时设置或使用的所有流水线选项使用 ValueProvider

例如,以下 WordCount 代码段不支持运行时参数。该代码会添加一个输入文件选项、创建一个流水线并从输入文件中读取行。

Java:SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java:SDK 1.x

如需添加运行时参数支持,请修改输入文件选项以使用 ValueProvider

Java:SDK 2.x

对于输入文件选项类型,请使用 ValueProvider<String>(而不是 String)。

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argument 替换为 add_value_provider_argument

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java:SDK 1.x

在函数中使用 ValueProvider

如需在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider 参数。

以下示例包含一个整数 ValueProvider 选项和一个做整数加法的简单函数。该函数依赖于 ValueProvider 整数。在执行期间,流水线会将 MySumFn 应用于 PCollection(包含 [1, 2, 3])中的每个整数。如果运行时值为 10,则生成的 PCollection 将包含 [11, 12, 13]

Java:SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java:SDK 1.x

使用 StaticValueProvider

如需为流水线提供静态值,请使用 StaticValueProvider

此示例使用 MySumFn,它是一个采用 ValueProvider<Integer>DoFn。如果您提前知道参数的值,则可以使用 StaticValueProvider 将静态值指定为 ValueProvider

Java:SDK 2.x

此代码获取流水线运行时的值:

  .apply(ParDo.of(new MySumFn(options.getInt())))

相反,您可以将 StaticValueProvider 用于静态值:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

此代码获取流水线运行时的值:

  beam.ParDo(MySumFn(user_options.templated_int))

相反,您可以将 StaticValueProvider 用于静态值:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java:SDK 1.x

在实现支持常规参数和运行时参数的 I/O 模块时,也可以使用 StaticValueProviderStaticValueProvider 减少了实现两个类似方法的代码重复。

Java:SDK 2.x

此示例的源代码来自 Apache Beam 的 TextIO.java(位于 GitHub 上)

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

在此示例中,存在一个同时接受 stringValueProvider 参数的构造参数。如果该参数是 string,它将被转换为 StaticValueProvider

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java:SDK 1.x

使用 NestedValueProvider

注意:Python 版 Apache Beam SDK 不支持 NestedValueProvider

如需从另一个 ValueProvider 对象计算值,请使用 NestedValueProvider

NestedValueProviderValueProviderSerializableFunction 转换函数作为输入。在对 NestedValueProvider 调用 .get() 时,转换函数会根据 ValueProvider 值创建一个新值。此转换允许您使用 ValueProvider 值来创建所需的最终值:

  • 示例 1:用户提供文件名 file.txt。转换将文件路径 gs://directory_name/ 添加到文件名之前。调用 .get() 将返回 gs://directory_name/file.txt
  • 示例 2:用户为 BigQuery 查询提供了子字符串,例如特定日期。转换使用该子字符串来创建完整查询。调用 .get() 将返回完整查询。

注意NestedValueProvider 只接受一个值输入。您不能使用 NestedValueProvider 来组合两个不同的值。

以下代码使用 NestedValueProvider 来实现第一个示例:用户提供文件名,转换将文件路径附加到文件名之前。

Java:SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Python

Python 版 Apache Beam SDK 不支持 NestedValueProvider

Java:SDK 1.x

元数据

您可以使用附加元数据扩展模板,以便在执行模板时验证自定义参数。如果要为模板创建元数据,您需要执行以下操作:

  1. 使用下表中的参数创建名为 <template-name>_metadata 的 JSON 格式文件。

    注意:请勿将您创建的文件命名为 <template-name>_metadata.json。虽然该文件包含 JSON,但它不能以 .json 文件扩展名作为结尾。

  2. 将 JSON 文件存储在 Cloud Storage 中模板所在的文件夹内。

    注意:模板应存储在 <template-name> 中,元数据应存储在 <template-name>_metadata 中。

元数据参数

参数键 必填 值的说明
name 模板的名称。
description 对模板进行说明的一小段文本。
parameters 否。默认设为空数组。 模板将使用的一组附加参数。
name 模板中使用的参数的名称。
label 一个人类可读的标签(将在界面中用于标记参数)。
helpText 对参数进行说明的一小段文本。
isOptional 否。默认设为 false。 如果参数是必需的,则为 false,如果参数是可选的,则为 true
regexes 否。默认设为空数组。 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如:["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。

示例元数据文件

Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

您可以从 Dataflow 模板目录中下载此元数据文件。

流水线 I/O 和运行时参数

Java:SDK 2.x

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持特定连接器和方法,请参阅 I/O 连接器的 API 参考文档。受支持的方法具有一个带有 ValueProvider 的过载。如果方法没有过载,则此方法不支持运行时参数。以下 I/O 连接器至少具有部分 ValueProvider 支持:

  • 基于文件的 IO:TextIOAvroIOFileIOTFRecordIOXmlIO
  • BigQueryIO*
  • BigtableIO(需要 SDK 2.3.0 或更高版本)
  • PubSubIO
  • SpannerIO

* 注意:如果要运行从 BigQuery 读取数据的批处理流水线,您必须在所有 BigQuery 读取上使用 .withTemplateCompatibility()

Python

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持 I/O 连接器及其方法,请参阅连接器的 API 参考文档。以下 I/O 连接器接受运行时参数:

  • 基于文件的 IO:textioavroiotfrecordio

Java:SDK 1.x

创建和暂存模板

写入流水线后,您必须创建并暂存模板文件。使用适合您的 SDK 版本的命令。

注意:创建和暂存模板后,暂存位置包含执行模板所需的其他文件。如果删除暂存位置,模板执行将失败。

Java:SDK 2.x

此 Maven 命令会在使用 --templateLocation 指定的 Cloud Storage 位置创建和暂存模板。

  • YOUR_PROJECT_ID 替换为项目 ID。

  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。

  • YOUR_TEMPLATE_NAME 替换为模板的名称。

  • com.example.myclass 替换为您的 Java 类。

  • 验证 templateLocation 路径是否正确。

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
    

Python

此 Python 命令会在使用 --template_location 指定的 Cloud Storage 位置创建和暂存模板。请对此命令进行以下更改:

  • YOUR_PROJECT_ID 替换为项目 ID。

  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。

  • YOUR_TEMPLATE_NAME 替换为模板的名称。

  • examples.mymodule 替换为您的 Python 模块。

  • 验证 template_location 路径是否正确。

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

Java:SDK 1.x

创建和暂存模板后,下一步是执行模板