このドキュメントでは、Dataflow パイプライン コードからカスタム クラシック テンプレートを作成する方法について説明します。クラシック テンプレートは、既存の Dataflow パイプラインをパッケージ化しています。特定のパイプライン パラメータを変更することで、ジョブごとにカスタマイズできる再利用可能なテンプレートを作成できます。テンプレートを作成する代わりに、コマンドを使用して既存のパイプラインからテンプレートを生成します。
まず、このプロセスを簡単に紹介します。詳細については、以降のセクションで説明します。
- パイプラインのコードでは、実行時に設定または使用するすべてのパイプライン オプションに
ValueProvider
インターフェースを使用します。ランタイムのパラメータを受け入れるDoFn
オブジェクトを使用します。 - 追加のメタデータを使用して、クラシック テンプレートの実行時にカスタム パラメータが検証されるようにテンプレートを拡張します。このようなメタデータの例としては、カスタム クラシック テンプレートの名前やオプションのパラメータなどがあります。
- パイプライン I/O コネクタが
ValueProvider
オブジェクトをサポートしているかどうかを確認し、必要に応じて変更します。 - カスタム クラシック テンプレートを作成してステージングします。
- カスタム クラシック テンプレートを実行します。
さまざまな種類の Dataflow テンプレート、それぞれのメリット、従来のテンプレートを選択するタイミングについては、Dataflow テンプレートをご覧ください。
クラシック テンプレートの実行に必要な権限
Dataflow のクラシック テンプレートを実行するために必要な権限は、テンプレートを実行する場所と、パイプラインのソースとシンクが別のプロジェクト内にあるかどうかによって異なります。
ローカルまたは Google Cloud で Dataflow パイプラインを実行する詳しい方法については、Dataflow のセキュリティと権限をご覧ください。
Dataflow のロールと権限のリストについては、Dataflow アクセス制御をご覧ください。
制限事項
- 次のパイプライン オプションは、クラシック テンプレートではサポートされていません。ワーカー ハーネス スレッドの数を制御する必要がある場合は、Flex テンプレートを使用します。
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Dataflow ランナーは、Pub/Sub トピックとサブスクリプション パラメータの
ValueProvider
オプションをサポートしていません。ランタイムのパラメータで Pub/Sub オプションが必要な場合は、Flex テンプレートを使用します。
ランタイムのパラメータと ValueProvider
インターフェースについて
ValueProvider
インターフェースを使用すると、パイプラインでランタイムのパラメータを受け入れることができます。Apache Beam には、3 タイプの ValueProvider
オブジェクトが用意されています。
名前 | 説明 |
---|---|
RuntimeValueProvider |
|
StaticValueProvider |
|
NestedValueProvider |
|
パイプライン コードでランタイム パラメータを使用する
このセクションでは、ValueProvider
、StaticValueProvider
、NestedValueProvider
の使用方法について説明します。
パイプライン オプションで ValueProvider
を使用する
実行時に設定または使用するすべてのパイプライン オプションで ValueProvider
を使用します。
たとえば、次の WordCount
コード スニペットではランタイムのパラメータはサポートされていません。このコードは、入力ファイルのオプションを追加し、パイプラインを作成して、入力ファイルから行を読み取ります。
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
ランタイムのパラメータのサポートを追加するには、ValueProvider
を使用するように入力ファイルのオプションを変更します。
Java
入力ファイルのオプションのタイプには、String
ではなく ValueProvider<String>
を使用します。
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
add_argument
を add_value_provider_argument
に置き換えます。
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
関数で ValueProvider
を使用する
ランタイムのパラメータ値を独自の関数で使用するには、ValueProvider
パラメータを使用するように関数を更新します。
次の例には、整数 ValueProvider
のオプションと、整数を追加する単純な関数が含まれています。この関数は整数 ValueProvider
に依存します。このパイプラインが実行されると、[1, 2, 3]
を含む PCollection
のすべての整数に MySumFn
が適用されます。ランタイムの値が 10 の場合、PCollection
には [11, 12, 13]
が含まれます。
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
StaticValueProvider
を使用する
パイプラインに静的な値を指定するには、StaticValueProvider
を使用します。
この例では、ValueProvider<Integer>
を取る DoFn
の MySumFn
を使用します。事前にパラメータの値がわかっている場合は、StaticValueProvider
を使用して静的な値を ValueProvider
として指定できます。
Java
このコードは、パイプラインの実行時に値を取得します。
.apply(ParDo.of(new MySumFn(options.getInt())))
代わりに、StaticValueProvider
を使用して静的な値を指定できます。
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
このコードは、パイプラインの実行時に値を取得します。
beam.ParDo(MySumFn(user_options.templated_int))
代わりに、StaticValueProvider
を使用して静的な値を指定できます。
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
通常のパラメータとランタイムのパラメータの両方をサポートする I/O モジュールを実装する場合は、StaticValueProvider
を使用することもできます。StaticValueProvider
によって、2 つの類似したメソッドを実装することによるコードの重複が減ります。
Java
この例のソースコードは、GitHub の Apache Beam の TextIO.java のものです。
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
この例には、引数として string
または ValueProvider
を受け入れる単一のコンストラクタがあります。引数が string
の場合は、StaticValueProvider
に変換されます。
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
NestedStaticValueProvider
を使用する
別の ValueProvider
オブジェクトから値を計算するには、NestedValueProvider
を使用します。
NestedValueProvider
は、ValueProvider
と SerializableFunction
トランスレータを入力として受け入れます。NestedValueProvider
で .get()
を呼び出すと、トランスレータは ValueProvider
値に基づいて新しい値を作成します。この変換では、ValueProvider
値を使用して必要な最終的な値を作成できます。
次の例では、ユーザーがファイル名 file.txt
を指定しています。変換により、ファイル名にパス gs://directory_name/
が追加されます。.get()
を呼び出すと、gs://directory_name/file.txt
が返されます。
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
パイプライン コードでメタデータを使用する
追加のメタデータを使用して、テンプレートの実行時にカスタム パラメータが確認されるようにテンプレートを拡張できます。テンプレートのメタデータを作成する場合、手順は次のとおりです。
- メタデータ パラメータのパラメータとメタデータ ファイルの例の形式を使用して、
TEMPLATE_NAME_metadata
という名前の JSON 形式のファイルを作成します。TEMPLATE_NAME
をテンプレートの名前に置き換えます。メタデータ ファイルにファイル名の拡張子が付いていないことを確認します。たとえば、テンプレート名が
myTemplate
の場合、メタデータ ファイルはmyTemplate_metadata
である必要があります。 - メタデータ ファイルを Cloud Storage でテンプレートと同じフォルダに保存します。
メタデータのパラメータ
パラメータキー | 必須 | 値の説明 | |
---|---|---|---|
name |
○ | テンプレートの名前。 | |
description |
× | テンプレートを説明する短い文章。 | |
streaming |
× | true の場合、このテンプレートはストリーミングをサポートします。デフォルト値は false です。 |
|
supportsAtLeastOnce |
× | true の場合、このテンプレートは 1 回以上の処理をサポートします。デフォルト値は false です。テンプレートが 1 回以上のストリーミング モードで動作するように設計されている場合は、このパラメータを true に設定します。 |
|
supportsExactlyOnce |
× | true の場合、このテンプレートは 1 回限りの処理をサポートします。デフォルト値は true です。 |
|
defaultStreamingMode |
× | 1 回以上モードと 1 回限りモードの両方をサポートするテンプレートのデフォルトのストリーミング モード。"AT_LEAST_ONCE" 、"EXACTLY_ONCE" のいずれかの値を使用できます。指定しない場合、デフォルトのストリーミング モードは 1 回限りモードです。 |
|
parameters |
× | テンプレートで使用する追加のパラメータの配列。デフォルトで空の配列が使用されます。 | |
name |
○ | テンプレートで使用されるパラメータの名前。 | |
label |
○ | パラメータにラベルを付けるために Google Cloud コンソールで使用される、人が読める文字列。 | |
helpText |
○ | パラメータを説明する短い文章。 | |
isOptional |
× | パラメータが必須の場合は false 、パラメータが省略可能な場合は true 。値が設定されていない場合、isOptional はデフォルトで false に設定されます。このパラメータキーをメタデータに含めない場合、メタデータが必須パラメータになります。 |
|
regexes |
× | パラメータの値を検証するために使用される文字列形式の POSIX-egrep 正規表現の配列。たとえば、["^[a-zA-Z][a-zA-Z0-9]+"] は、値がアルファベットで始まり、その後に文字が 1 つ以上続くことを検証する単独の正規表現です。デフォルトで空の配列が使用されます。 |
メタデータ ファイルの例
Java
Dataflow サービスでは、次のメタデータを使用して、WordCount テンプレートのカスタム パラメータを検証します。
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Dataflow サービスでは、次のメタデータを使用して、WordCount テンプレートのカスタム パラメータを検証します。
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Google 提供のテンプレートのメタデータ ファイルは、Dataflow のテンプレート ディレクトリからダウンロードできます。
サポートされているパイプライン I/O コネクタと ValueProvider
Java
一部の I/O コネクタには、ValueProvider
オブジェクトを受け入れるメソッドが含まれています。特定のコネクタまたはメソッドがサポートされるかどうかは、I/O コネクタの API リファレンス ドキュメントで確認してください。サポートされているメソッドには、ValueProvider
のオーバーロードがあります。メソッドにオーバーロードがない場合、そのメソッドではランタイムのパラメータがサポートされません。次の I/O コネクタでは、ValueProvider
が少なくとも部分的にサポートされます。
- ファイルベースの I/O:
TextIO
、AvroIO
、FileIO
、TFRecordIO
、XmlIO
BigQueryIO
*BigtableIO
(SDK 2.3.0 以降が必要)PubSubIO
SpannerIO
Python
一部の I/O コネクタには、ValueProvider
オブジェクトを受け入れるメソッドが含まれています。I/O コネクタとメソッドがサポートされているかどうかは、コネクタの API リファレンス ドキュメントで確認してください。次の I/O コネクタは、ランタイムのパラメータを受け入れます。
- ファイルベースの I/O:
textio
、avroio
、tfrecordio
クラシック テンプレートを作成してステージングする
パイプラインを作成したら、テンプレート ファイルを作成してステージングする必要があります。テンプレートの作成とステージングが完了したら、ステージング場所にテンプレートの実行に必要なその他のファイルが格納されます。ステージング場所を削除すると、テンプレートの実行が失敗します。Dataflow ジョブはテンプレートをステージングした直後には実行されません。カスタム テンプレート ベースの Dataflow ジョブを実行するには、Google Cloud コンソール、Dataflow REST API、gcloud CLI を使用します。
次の例は、テンプレート ファイルをステージングする方法を示しています。
Java
この Maven コマンドは、--templateLocation
で指定された Cloud Storage の場所にテンプレートを作成してステージングします。
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
templateLocation
パスが正しいことを確認します。次のように置き換えます。
com.example.myclass
: Java クラスPROJECT_ID
: プロジェクト IDBUCKET_NAME
: Cloud Storage バケットの名前TEMPLATE_NAME
: テンプレートの名前REGION
: Dataflow ジョブをデプロイするリージョン
Python
この Python コマンドは、--template_location
で指定された Cloud Storage の場所にテンプレートを作成してステージングします。
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
template_location
パスが正しいことを確認します。次のように置き換えます。
examples.mymodule
: Python モジュールPROJECT_ID
: プロジェクト IDBUCKET_NAME
: Cloud Storage バケットの名前TEMPLATE_NAME
: テンプレートの名前REGION
: Dataflow ジョブをデプロイするリージョン
テンプレートの作成とステージングが完了したら、次はテンプレートを実行します。