在本文档中,您将了解如何通过 Dataflow 流水线代码创建自定义经典模板。 经典模板打包现有 Dataflow 流水线,以创建可重复使用的模板,您可以通过更改特定的流水线参数来为每个作业自定义模板。您可以使用命令从现有流水线生成模板,而不是编写模板。
以下是此过程的简要介绍。后续部分详细介绍了此过程。
- 在流水线代码中,为要在运行时设置或使用的所有流水线选项使用
ValueProvider
接口。使用接受运行时参数的DoFn
对象。 - 使用附加元数据扩展模板,以便在运行经典模板时验证自定义参数。此类元数据的示例包括自定义经典模板的名称和可选参数。
- 检查流水线 I/O 连接器是否支持
ValueProvider
对象,并根据需要进行更改。 - 创建并暂存自定义经典模板。
- 运行自定义经典模板。
如需了解不同类型的 Dataflow 模板及其优势以及何时选择经典模板,请参阅 Dataflow 模板。
运行经典模板所需的权限
运行 Dataflow 经典模板所需的权限取决于您运行模板的位置,以及流水线的来源和接收器是否位于其他项目中。
如需详细了解如何在本地或使用 Google Cloud 运行 Dataflow 流水线,请参阅 Dataflow 安全性和权限。
如需查看 Dataflow 角色和权限的列表,请参阅 Dataflow 访问权限控制。
限制
- 经典模板不支持以下流水线选项。如果您需要控制工作器自动化测试框架线程数,请使用 Flex 模板。
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Dataflow 运行程序不支持 Pub/Sub 主题和订阅参数的
ValueProvider
选项。如果您需要在运行时参数中使用 Pub/Sub 选项,请使用 Flex 模板。
关于运行时参数和 ValueProvider
接口
ValueProvider
接口允许流水线接受运行时参数。Apache Beam 提供三种类型的 ValueProvider
对象。
名称 | 说明 |
---|---|
RuntimeValueProvider |
您可以使用 如果您无法提前知道值,请使用 |
StaticValueProvider |
如果您提前知道值,请使用 |
NestedValueProvider |
如果要在运行时使用该值计算另一个值,请使用 |
在流水线代码中使用运行时参数
本部分介绍如何使用 ValueProvider
、StaticValueProvider
和 NestedValueProvider
。
在流水线选项中使用 ValueProvider
为要在运行时设置或使用的所有流水线选项使用 ValueProvider
。
例如,以下 WordCount
代码段不支持运行时参数。该代码会添加一个输入文件选项、创建一个流水线并从输入文件中读取行。
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
如需添加运行时参数支持,请修改输入文件选项以使用 ValueProvider
。
Java
对于输入文件选项类型,请使用 ValueProvider<String>
(而不是 String
)。
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
将 add_argument
替换为 add_value_provider_argument
。
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
在函数中使用 ValueProvider
如需在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider
参数。
以下示例包含一个整数 ValueProvider
选项和一个做整数加法的简单函数。该函数依赖于 ValueProvider
整数。在执行期间,流水线会将 MySumFn
应用于 PCollection
(包含 [1, 2, 3]
)中的每个整数。如果运行时值为 10,则生成的 PCollection
将包含 [11, 12, 13]
。
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
使用 StaticValueProvider
如需为流水线提供静态值,请使用 StaticValueProvider
。
此示例使用 MySumFn
,它是一个采用 ValueProvider<Integer>
的 DoFn
。如果您提前知道参数的值,则可以使用 StaticValueProvider
将静态值指定为 ValueProvider
。
Java
此代码获取流水线运行时的值:
.apply(ParDo.of(new MySumFn(options.getInt())))
相反,您可以将 StaticValueProvider
用于静态值:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
此代码获取流水线运行时的值:
beam.ParDo(MySumFn(user_options.templated_int))
相反,您可以将 StaticValueProvider
用于静态值:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
在实现支持常规参数和运行时参数的 I/O 模块时,也可以使用 StaticValueProvider
。
StaticValueProvider
减少了实现两个类似方法的代码重复。
Java
此示例的源代码来自 Apache Beam 的 TextIO.java(位于 GitHub 上)。
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
在此示例中,存在一个同时接受 string
或 ValueProvider
参数的构造参数。如果该参数是 string
,它将被转换为 StaticValueProvider
。
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
使用 NestedStaticValueProvider
如需从另一个 ValueProvider
对象计算值,请使用 NestedValueProvider
。
NestedValueProvider
将 ValueProvider
和 SerializableFunction
转换函数作为输入。在对 NestedValueProvider
调用 .get()
时,转换函数会根据 ValueProvider
值创建一个新值。此转换可让您使用 ValueProvider
值来创建所需的最终值。
在以下示例中,用户提供文件名 file.txt
。转换将路径 gs://directory_name/
添加到文件名之前。调用 .get()
将返回 gs://directory_name/file.txt
。
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
在流水线代码中使用元数据
您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:
- 使用元数据参数中的参数和示例元数据文件中的格式创建名为
TEMPLATE_NAME_metadata
的 JSON 格式的文件。将TEMPLATE_NAME
替换为模板的名称。确保元数据文件没有文件扩展名。例如,如果模板名称为
myTemplate
,则其元数据文件必须为myTemplate_metadata
。 - 将元数据文件存储在 Cloud Storage 中模板所在的文件夹内。
元数据参数
参数键 | 必需 | 值的说明 | |
---|---|---|---|
name |
是 | 模板的名称。 | |
description |
否 | 对模板进行说明的一小段文本。 | |
streaming |
否 | 如果为 true ,则此模板支持流处理。默认值为 false 。 |
|
supportsAtLeastOnce |
否 | 如果为 true ,则此模板支持“至少一次”处理。默认值为 false 。如果模板设计为支持“至少一次”流处理模式,请将此参数设置为 true 。
|
|
supportsExactlyOnce |
否 | 如果为 true ,则此模板支持“正好一次”处理。默认值为 true 。 |
|
defaultStreamingMode |
否 | 默认流处理模式,适用于同时支持“至少一次”模式和“正好一次”模式的模板。请使用以下某个值:"AT_LEAST_ONCE" 、"EXACTLY_ONCE" 。如果未指定,则默认流处理模式为“正好一次”。
|
|
parameters |
否 | 模板使用的一组附加参数。默认情况下,使用空数组。 | |
name |
是 | 模板中使用的参数的名称。 | |
label |
是 | 人类可读的字符串,用于在 Google Cloud 控制台中标记参数。 | |
helpText |
是 | 对参数进行说明的一小段文本。 | |
isOptional |
否 | 如果参数是必需的,则为 false ;如果参数是可选的,则为 true 。除非设置了值,否则 isOptional 默认为 false 。
如果您没有为元数据添加此参数键,则元数据会成为必需参数。 |
|
regexes |
否 | 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。 |
示例元数据文件
Java
Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。
支持的流水线 I/O 连接器和 ValueProvider
Java
某些 I/O 连接器包含接受 ValueProvider
对象的方法。如需确定是否支持特定连接器和方法,请参阅 I/O 连接器的 API 参考文档。受支持的方法具有一个带有 ValueProvider
的过载。如果方法没有过载,则此方法不支持运行时参数。以下 I/O 连接器至少具有部分 ValueProvider
支持:
- 基于文件的 IO:
TextIO
、AvroIO
、FileIO
、TFRecordIO
、XmlIO
BigQueryIO
*BigtableIO
(需要 SDK 2.3.0 或更高版本)PubSubIO
SpannerIO
Python
某些 I/O 连接器包含接受 ValueProvider
对象的方法。如需确定是否支持 I/O 连接器及其方法,请参阅连接器的 API 参考文档。以下 I/O 连接器接受运行时参数:
- 基于文件的 IO:
textio
、avroio
、tfrecordio
创建和暂存经典模板
写入流水线后,您必须创建并暂存模板文件。创建和暂存模板后,暂存位置包含运行模板所需的其他文件。如果删除暂存位置,模板将无法运行。Dataflow 作业不会在您暂存模板后立即运行。如需运行基于模板的自定义 Dataflow 作业,您可以使用 Google Cloud 控制台、Dataflow REST API 或 gcloud CLI。
以下示例展示了如何暂存模板文件:
Java
此 Maven 命令会在使用 --templateLocation
指定的 Cloud Storage 位置创建和暂存模板。
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
验证 templateLocation
路径是否正确。请替换以下内容:
com.example.myclass
:您的 Java 类PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称。TEMPLATE_NAME
:您的模板的名称REGION
:要在其中部署 Dataflow 作业的区域
Python
此 Python 命令会在使用 --template_location
指定的 Cloud Storage 位置创建和暂存模板。
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
验证 template_location
路径是否正确。请替换以下内容:
examples.mymodule
:您的 Python 模块PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称。TEMPLATE_NAME
:您的模板的名称REGION
:要在其中部署 Dataflow 作业的区域
创建和暂存模板后,下一步是运行模板。