创建经典 Dataflow 模板

在本文档中,您将了解如何通过 Dataflow 流水线代码创建自定义经典模板。 经典模板打包现有 Dataflow 流水线,以创建可重复使用的模板,您可以通过更改特定的流水线参数来为每个作业自定义模板。您可以使用命令从现有流水线生成模板,而不是编写模板。

以下是此过程的简要介绍。后续部分详细介绍了此过程。

  1. 在流水线代码中,为要在运行时设置或使用的所有流水线选项使用 ValueProvider 接口。使用接受运行时参数的 DoFn 对象。
  2. 使用附加元数据扩展模板,以便在运行经典模板时验证自定义参数。此类元数据的示例包括自定义经典模板的名称和可选参数。
  3. 检查流水线 I/O 连接器是否支持 ValueProvider 对象,并根据需要进行更改。
  4. 创建并暂存自定义经典模板。
  5. 运行自定义经典模板。

如需了解不同类型的 Dataflow 模板及其优势以及何时选择经典模板,请参阅 Dataflow 模板

运行经典模板所需的权限

运行 Dataflow 经典模板所需的权限取决于您运行模板的位置,以及流水线的来源和接收器是否位于其他项目中。

如需详细了解如何在本地或使用 Google Cloud 运行 Dataflow 流水线,请参阅 Dataflow 安全性和权限

如需查看 Dataflow 角色和权限的列表,请参阅 Dataflow 访问权限控制

限制

  • 经典模板不支持以下流水线选项。如果您需要控制工作器自动化测试框架线程数,请使用 Flex 模板

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • Dataflow 运行程序不支持 Pub/Sub 主题和订阅参数的 ValueProvider 选项。如果您需要在运行时参数中使用 Pub/Sub 选项,请使用 Flex 模板。

关于运行时参数和 ValueProvider 接口

ValueProvider 接口允许流水线接受运行时参数。Apache Beam 提供三种类型的 ValueProvider 对象。

名称 说明
RuntimeValueProvider

RuntimeValueProvider 是默认的 ValueProvider 类型。 RuntimeValueProvider 允许流水线接受仅在流水线执行期间可用的值。该值在流水线构建期间不可用,因此您无法用它来更改流水线的工作流程图。

您可以使用 isAccessible() 来检查 ValueProvider 的值是否可用。如果在流水线执行之前调用 get(),Apache Beam 将返回错误:
Value only available at runtime, but accessed from a non-runtime context.

如果您无法提前知道值,请使用 RuntimeValueProvider。如需在运行时更改参数值,请不要为模板中的参数设置值。通过模板创建作业时设置参数的值。

StaticValueProvider

StaticValueProvider 可让您为流水线提供一个静态值。该值在流水线构建期间可用,因此您可以用它来更改流水线的工作流程图。

如果您提前知道值,请使用 StaticValueProvider。有关示例,请参见 StaticValueProvider 部分

NestedValueProvider

NestedValueProvider 可让您从另一个 ValueProvider 对象计算值。NestedValueProvider 封装 ValueProvider,并且封装的 ValueProvider 的类型决定了在流水线构造期间能否访问值。

如果要在运行时使用该值计算另一个值,请使用 NestedValueProvider。有关示例,请参见 NestedValueProvider 部分

在流水线代码中使用运行时参数

本部分介绍如何使用 ValueProviderStaticValueProviderNestedValueProvider

在流水线选项中使用 ValueProvider

为要在运行时设置或使用的所有流水线选项使用 ValueProvider

例如,以下 WordCount 代码段不支持运行时参数。该代码会添加一个输入文件选项、创建一个流水线并从输入文件中读取行。

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

如需添加运行时参数支持,请修改输入文件选项以使用 ValueProvider

Java

对于输入文件选项类型,请使用 ValueProvider<String>(而不是 String)。

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argument 替换为 add_value_provider_argument

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

在函数中使用 ValueProvider

如需在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider 参数。

以下示例包含一个整数 ValueProvider 选项和一个做整数加法的简单函数。该函数依赖于 ValueProvider 整数。在执行期间,流水线会将 MySumFn 应用于 PCollection(包含 [1, 2, 3])中的每个整数。如果运行时值为 10,则生成的 PCollection 将包含 [11, 12, 13]

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

使用 StaticValueProvider

如需为流水线提供静态值,请使用 StaticValueProvider

此示例使用 MySumFn,它是一个采用 ValueProvider<Integer>DoFn。如果您提前知道参数的值,则可以使用 StaticValueProvider 将静态值指定为 ValueProvider

Java

此代码获取流水线运行时的值:

  .apply(ParDo.of(new MySumFn(options.getInt())))

相反,您可以将 StaticValueProvider 用于静态值:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

此代码获取流水线运行时的值:

  beam.ParDo(MySumFn(user_options.templated_int))

相反,您可以将 StaticValueProvider 用于静态值:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

在实现支持常规参数和运行时参数的 I/O 模块时,也可以使用 StaticValueProviderStaticValueProvider 减少了实现两个类似方法的代码重复。

Java

此示例的源代码来自 Apache Beam 的 TextIO.java(位于 GitHub 上)

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

在此示例中,存在一个同时接受 stringValueProvider 参数的构造参数。如果该参数是 string,它将被转换为 StaticValueProvider

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

使用 NestedStaticValueProvider

如需从另一个 ValueProvider 对象计算值,请使用 NestedValueProvider

NestedValueProviderValueProviderSerializableFunction 转换函数作为输入。在对 NestedValueProvider 调用 .get() 时,转换函数会根据 ValueProvider 值创建一个新值。此转换可让您使用 ValueProvider 值来创建所需的最终值。

在以下示例中,用户提供文件名 file.txt。转换将路径 gs://directory_name/ 添加到文件名之前。调用 .get() 将返回 gs://directory_name/file.txt

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

在流水线代码中使用元数据

您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:

  1. 使用元数据参数中的参数和示例元数据文件中的格式创建名为 TEMPLATE_NAME_metadata 的 JSON 格式的文件。将 TEMPLATE_NAME 替换为模板的名称。

    确保元数据文件没有文件扩展名。例如,如果模板名称为 myTemplate,则其元数据文件必须为 myTemplate_metadata

  2. 将元数据文件存储在 Cloud Storage 中模板所在的文件夹内。

元数据参数

参数键 是否必需 值的说明
name 模板的名称。
description 对模板进行说明的一小段文本。
streaming 如果为 true,则此模板支持流处理。默认值为 false
supportsAtLeastOnce 如果为 true,则此模板支持“至少一次”处理。默认值为 false。如果模板设计为支持“至少一次”流处理模式,请将此参数设置为 true
supportsExactlyOnce 如果为 true,则此模板支持“正好一次”处理。默认值为 true
defaultStreamingMode 默认流处理模式,适用于同时支持“至少一次”模式和“正好一次”模式的模板。请使用以下某个值:"AT_LEAST_ONCE""EXACTLY_ONCE"。如果未指定,则默认流处理模式为“正好一次”。
parameters 模板使用的一组附加参数。默认情况下,使用空数组。
name 模板中使用的参数的名称。
label 人类可读的字符串,用于在 Google Cloud 控制台中标记参数。
helpText 对参数进行说明的一小段文本。
isOptional 如果参数是必需的,则为 false;如果参数是可选的,则为 true。除非设置了值,否则 isOptional 默认为 false。 如果您没有为元数据添加此参数键,则元数据会成为必需参数。
regexes 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。

示例元数据文件

Java

Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。

支持的流水线 I/O 连接器和 ValueProvider

Java

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持特定连接器和方法,请参阅 I/O 连接器的 API 参考文档。受支持的方法具有一个带有 ValueProvider 的过载。如果方法没有过载,则此方法不支持运行时参数。以下 I/O 连接器至少具有部分 ValueProvider 支持:

  • 基于文件的 IO:TextIOAvroIOFileIOTFRecordIOXmlIO
  • BigQueryIO*
  • BigtableIO(需要 SDK 2.3.0 或更高版本)
  • PubSubIO
  • SpannerIO

Python

某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持 I/O 连接器及其方法,请参阅连接器的 API 参考文档。以下 I/O 连接器接受运行时参数:

  • 基于文件的 IO:textioavroiotfrecordio

创建和暂存经典模板

写入流水线后,您必须创建并暂存模板文件。创建和暂存模板后,暂存位置包含运行模板所需的其他文件。如果删除暂存位置,模板将无法运行。Dataflow 作业不会在您暂存模板后立即运行。如需运行基于模板的自定义 Dataflow 作业,您可以使用 Google Cloud 控制台Dataflow REST APIgcloud CLI

以下示例展示了如何暂存模板文件:

Java

此 Maven 命令会在使用 --templateLocation 指定的 Cloud Storage 位置创建和暂存模板。

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

验证 templateLocation 路径是否正确。请替换以下内容:

  • com.example.myclass:您的 Java 类
  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称。
  • TEMPLATE_NAME:您的模板的名称
  • REGION:要在其中部署 Dataflow 作业的区域

Python

此 Python 命令会在使用 --template_location 指定的 Cloud Storage 位置创建和暂存模板。

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

验证 template_location 路径是否正确。请替换以下内容:

  • examples.mymodule:您的 Python 模块
  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称。
  • TEMPLATE_NAME:您的模板的名称
  • REGION:要在其中部署 Dataflow 作业的区域

创建和暂存模板后,下一步是运行模板