创建经典模板

在本文档中,您将了解如何通过 Dataflow 流水线代码创建自定义经典模板。

以下是此过程的简要介绍。后续部分详细介绍了此过程。

  1. 在流水线代码中,为要在运行时设置或使用的所有流水线选项使用 ValueProvider 接口。使用接受运行时参数的 DoFn 对象。
  2. 使用附加元数据扩展模板,以便在运行经典模板时验证自定义参数。此类元数据的示例包括自定义经典模板的名称和可选参数。
  3. 检查流水线 I/O 连接器是否支持 ValueProvider 对象,并根据需要进行更改。
  4. 创建并暂存自定义经典模板。
  5. 运行自定义经典模板。

如需了解不同类型的 Dataflow 模板及其优势以及何时选择经典模板,请参阅 Dataflow 模板

运行经典模板所需的权限

运行 Dataflow 经典模板所需的权限取决于您运行模板的位置,以及流水线的来源和接收器是否位于其他项目中。

如需详细了解如何在本地或使用 Google Cloud 运行 Dataflow 流水线,请参阅 Dataflow 安全性和权限

如需查看 Dataflow 角色和权限的列表,请参阅 Dataflow 访问权限控制

关于运行时参数和 ValueProvider 接口

ValueProvider 接口允许流水线接受运行时参数。Apache Beam 提供三种类型的 ValueProvider 对象。

名称 说明
RuntimeValueProvider

RuntimeValueProvider 是默认的 ValueProvider 类型。 RuntimeValueProvider 允许流水线接受仅在流水线执行期间可用的值。该值在流水线构建期间不可用,因此您无法用它来更改流水线的工作流程图。

您可以使用 isAccessible() 来检查 ValueProvider 的值是否可用。如果在流水线执行之前调用 get(),Apache Beam 将返回错误:
Value only available at runtime, but accessed from a non-runtime context.

如果您无法提前知道值,请使用 RuntimeValueProvider

StaticValueProvider

StaticValueProvider 允许您为流水线提供一个静态值。该值在流水线构建期间可用,因此您可以用它来更改流水线的工作流程图。

如果您提前知道值,请使用 StaticValueProvider。有关示例,请参见 StaticValueProvider 部分

NestedValueProvider

NestedValueProvider 允许您从另一个 ValueProvider 对象计算值。NestedValueProvider 封装 ValueProvider,并且封装的 ValueProvider 的类型决定了在流水线构造期间能否访问值。

如果要在运行时使用该值计算另一个值,请使用 NestedValueProvider。有关示例,请参见 NestedValueProvider 部分

Dataflow Runner 不支持 Pub/Sub 主题和订阅参数的 ValueProvider 选项。如果您需要运行时参数中的 Pub/Sub 选项,请切换到使用 Flex 模板

在流水线代码中使用运行时参数

本部分介绍如何使用 ValueProviderStaticValueProviderNestedValueProvider

在流水线选项中使用 ValueProvider

为要在运行时设置或使用的所有流水线选项使用 ValueProvider

例如,以下 WordCount 代码段不支持运行时参数。该代码会添加一个输入文件选项、创建一个流水线并从输入文件中读取行。

Java:SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java:SDK 1.x

如需添加运行时参数支持,请修改输入文件选项以使用 ValueProvider

Java:SDK 2.x

对于输入文件选项类型,请使用 ValueProvider<String>(而不是 String)。

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argument 替换为 add_value_provider_argument

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java:SDK 1.x

在函数中使用 ValueProvider

如需在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider 参数。

以下示例包含一个整数 ValueProvider 选项和一个做整数加法的简单函数。该函数依赖于 ValueProvider 整数。在执行期间,流水线会将 MySumFn 应用于 PCollection(包含 [1, 2, 3])中的每个整数。如果运行时值为 10,则生成的 PCollection 将包含 [11, 12, 13]

Java:SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java:SDK 1.x

使用 StaticValueProvider

如需为流水线提供静态值,请使用 StaticValueProvider

此示例使用 MySumFn,它是一个采用 ValueProvider<Integer>DoFn。如果您提前知道参数的值,则可以使用 StaticValueProvider 将静态值指定为 ValueProvider

Java:SDK 2.x

此代码获取流水线运行时的值:

  .apply(ParDo.of(new MySumFn(options.getInt())))

相反,您可以将 StaticValueProvider 用于静态值:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

此代码获取流水线运行时的值:

  beam.ParDo(MySumFn(user_options.templated_int))

相反,您可以将 StaticValueProvider 用于静态值:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java:SDK 1.x

在实现支持常规参数和运行时参数的 I/O 模块时,也可以使用 StaticValueProviderStaticValueProvider 减少了实现两个类似方法的代码重复。

Java:SDK 2.x

此示例的源代码来自 Apache Beam 的 TextIO.java(位于 GitHub 上)

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

在此示例中,存在一个同时接受 stringValueProvider 参数的构造参数。如果该参数是 string,它将被转换为 StaticValueProvider

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java:SDK 1.x

使用 NestedStaticValueProvider

如需从另一个 ValueProvider 对象计算值,请使用 NestedValueProvider

NestedValueProviderValueProviderSerializableFunction 转换函数作为输入。在对 NestedValueProvider 调用 .get() 时,转换函数会根据 ValueProvider 值创建一个新值。此转换允许您使用 ValueProvider 值来创建所需的最终值。

在以下示例中,用户提供文件名 file.txt。转换将路径 gs://directory_name/ 添加到文件名之前。调用 .get() 将返回 gs://directory_name/file.txt

Java:SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Java:SDK 1.x

在流水线代码中使用元数据

您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:

  1. 使用元数据参数中的参数和示例元数据文件中的格式创建名为 TEMPLATE_NAME_metadata 的 JSON 格式的文件。将 TEMPLATE_NAME 替换为模板的名称。

    确保元数据文件没有文件扩展名。例如,如果模板名称为 myTemplate,则其元数据文件必须为 myTemplate_metadata

  2. 将元数据文件存储在 Cloud Storage 中模板所在的文件夹内。

元数据参数

参数键 必需 值的说明
name 模板的名称。
description 对模板进行说明的一小段文本。
parameters 模板使用的一组附加参数。默认情况下,使用空数组。
name 模板中使用的参数的名称。
label 人类可读的字符串,用于在 Cloud Console 中标记参数。
helpText 对参数进行说明的一小段文本。
isOptional 如果参数是必需的,则为 false;如果参数是可选的,则为 true。除非设置了值,否则 isOptional 默认为 false。 如果您没有为元数据添加此参数键,则元数据会成为必需参数。
regexes 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。

示例元数据文件

Java

Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。

支持的流水线 I/O 连接器和 ValueProvider

Java:SDK 2.x

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持特定连接器和方法,请参阅 I/O 连接器的 API 参考文档。受支持的方法具有一个带有 ValueProvider 的过载。如果方法没有过载,则此方法不支持运行时参数。以下 I/O 连接器至少具有部分 ValueProvider 支持:

  • 基于文件的 IO:TextIOAvroIOFileIOTFRecordIOXmlIO
  • BigQueryIO*
  • BigtableIO(需要 SDK 2.3.0 或更高版本)
  • PubSubIO
  • SpannerIO

Python

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持 I/O 连接器及其方法,请参阅连接器的 API 参考文档。以下 I/O 连接器接受运行时参数:

  • 基于文件的 IO:textioavroiotfrecordio

Java:SDK 1.x

创建和暂存经典模板

写入流水线后,您必须创建并暂存模板文件。在暂存模板后,Dataflow 作业不会立即运行。如需运行基于模板的自定义 Dataflow 作业,您可以使用 Cloud ConsoleDataflow REST APICloud SDK

请参阅以下示例,了解如何暂存模板文件:

Java:SDK 2.x

此 Maven 命令会在使用 --templateLocation 指定的 Cloud Storage 位置创建和暂存模板。

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME
                  --region=REGION"
    

验证 templateLocation 路径是否正确。 请替换以下内容:

  • com.example.myclass:您的 Java 类
  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • TEMPLATE_NAME:您的模板的名称
  • REGION:用于部署 Dataflow 作业的地区端点

Python

此 Python 命令会在使用 --template_location 指定的 Cloud Storage 位置创建和暂存模板。

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

验证 template_location 路径是否正确。 请替换以下内容:

  • examples.mymodule:您的 Python 模块
  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • TEMPLATE_NAME:您的模板的名称
  • REGION:用于部署 Dataflow 作业的地区端点

Java:SDK 1.x

创建和暂存模板后,下一步是执行模板