パイプラインを構築する

Dataflow プログラムは、開始から終了までのデータ処理パイプラインを表します。このセクションでは、Dataflow SDK のクラスを使用してパイプラインを作成するメカニズムについて説明します。Dataflow SDK のクラスを使用してパイプラインを作成するには、プログラムで次の一般手順を実行する必要があります。

  • Pipeline オブジェクトを作成します。
  • Read または Create 変換を使用して、パイプライン データに対して 1 つ以上の PCollection を作成します。
  • 変換を各 PCollection に適用します。変換は、PCollection 内の要素の変更、フィルタ、グループ化、分析、その他の処理を実行できます。各変換では、処理が完了するまで追加の変換を適用できる新しい出力 PCollection が作成されます。
  • 変換された最終的な PCollection書き込みまたは出力を行います。
  • パイプラインを実行します。

各一般手順を示す詳細な例については、下の単純なパイプラインの例をご覧ください。

パイプライン オブジェクトを作成する

多くの場合、Dataflow は Pipeline オブジェクトを作成することで開始します。

Dataflow SDK では、各パイプラインはタイプ Pipeline の明示的なオブジェクトで表されます。各 Pipeline オブジェクトは、パイプラインがオペレーションを実行するデータとデータに適用される変換の両方をカプセル化する独立したエンティティです。

Java

パイプラインを作成するには、Pipeline オブジェクトを宣言し、それにいくつかの構成オプションを渡します。設定オプションは、静的メソッド PipelineOptionsFactory.create() を使用して作成できるタイプ PipelineOptions のオブジェクトを作成することで渡します。

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

パイプライン オプションを設定する

パイプライン オプションを使用して、パイプラインのさまざまな側面を設定します。これらには次のものが含まれます。

  • パイプラインの実行場所
  • パイプライン ジョブがファイルをステージングする場所
  • パイプラインが関連付けられている Cloud Platform プロジェクト
  • パイプラインがワーカーとして使用する Compute Engine インスタンス数

パイプライン オプションのプロパティには、プロジェクト ID や Cloud Storage ステージング場所など、Cloud Dataflow サービスが必要とする Cloud Platform プロジェクトに関する情報が含まれます。パイプライン オプションでは、Dataflow サービスがパイプライン ジョブに割り当てる必要のあるワーカー数や、パイプライン ジョブのステータス メッセージの送信先なども制御できます。

パイプラインの実行場所(Cloud Dataflow Service 上またはローカル)を決定するパイプライン オプションの主要プロパティは、パイプライン ランナーです。パイプライン ランナー プロパティは、パイプラインを非同期、ブロッキングのどちらで実行するかも指定します。

Java

PipelineOptions オブジェクトのプロパティはセッター メソッド(PipelineOptions.set[OptionName])を使用してパイプライン プログラム内で直接設定できますが、ベスト プラクティスは、コマンドライン オプションを使用して値を渡すことです。Dataflow SDK for Java は、パイプラインに渡されるコマンドライン オプションを解析および検証する PipelineOptionsFactory クラスを提供します。コマンドライン オプションを使用して PipelineRunnerPipelineOptions 内のその他のフィールドを実行時に決定することにより、ローカルとクラウドの両方で同じコードを使用してパイプラインを作成および実行できます。

クラウドモードまたはローカルモードの実行でパイプライン オプションをプログラムで設定する方法について詳しくは、実行パラメータを指定するをご覧ください。WordCount サンプル パイプラインでも、コマンドライン オプションを使用して実行時にパイプライン オプションを設定する方法を示しています。

パイプラインにデータを読み取る

パイプラインの初期の PCollection を作成するには、パイプライン オブジェクトにルート変換を適用します。ルート変換では、指定した外部データソースまたは一部のローカルデータから PCollection が作成されます。

Java

Dataflow Java SDK のルート変換には、ReadCreate の 2 種類があります。Read 変換は、BigQuery や Google Cloud Storage 内のテキスト ファイルなどの外部ソースからデータを読み取ります。Create 変換では、PCollection がメモリ内 java.util.Collection から作成されます。

次のコード例は、TextIO.Read ルート変換を apply して Google Cloud Storage のテキスト ファイルからデータを読み取る方法を示しています。変換は、Pipeline オブジェクトの p に適用され、PCollection<String> の形式で設定されたパイプライン データを返します。

PCollection<String> lines = p.apply(
  TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));

パイプライン データを処理するための変換を適用する

パイプラインで変換を使用するには、変換する PCollection に変換を適用します。

Java

変換を適用するには、処理する各 PCollection で、目的の変換オブジェクトを引数として渡して apply メソッドを呼び出します。

Dataflow SDK には、パイプラインの PCollection に適用できるいくつかの異なる変換が含まれています。これには、ParDoCombine などの汎用コア変換が含まれます。SDK には作成済みの複合変換も含まれています。これは、コレクション内の要素のカウントや組み合わせなど、有用な処理パターンで 1 つ以上のコア変換を組み合わせます。より複雑な独自の複合変換を定義して、パイプラインの厳密なユースケースに適合させることもできます。

Java

Dataflow Java SDK で、各変換は、基本クラス PTransform のサブクラスです。applyPCollection で呼び出す場合は、引数として使用する PTransform を渡します。

次のコードは、文字列の PCollection に変換を apply する方法を示しています。変換は、各文字列の内容を反転し、反転した文字列を含む新しい PCollection を出力するユーザー定義のカスタム変換です。

入力は words という PCollection<String> です。コードは ReverseWords という PTransform オブジェクトのインスタンスを apply に渡し、戻り値を reversedWords という PCollection<String> として保存します。

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

最終パイプライン データを書き込むまたは出力する

パイプラインがそのすべての変換を適用したら、通常は結果を出力する必要があります。パイプラインの最終 PCollection を出力するには、Write 変換をその PCollection に適用します。Write 変換は、Google Cloud Storage 内のファイルや BigQuery テーブルなどの外部データシンクに PCollection の要素を出力できます。Write を使用してパイプラインでいつでも PCollection を出力できますが、通常はパイプラインの最後にデータを書き出します。

Java

次のコード例は、TextIO.Write 変換を apply して StringPCollection を Google Cloud Storage 内のテキスト ファイルに書き込む方法を示しています。

PCollection<String> filteredWords = ...;
filteredWords.apply(TextIO.Write.named("WriteMyFile").to("gs://some/outputData.txt"));

パイプラインを実行する

パイプラインを作成したら、run メソッドを使用してパイプラインを実行します。パイプラインは非同期で実行されます。作成するプログラムはパイプラインの仕様をパイプライン ランナーに送信し、そこで実際の一連のパイプライン オペレーションが作成および実行されます。パイプラインをテストやデバッグの目的でローカルに実行するか、Cloud Dataflow マネージド サービスで実行するかを指定できます。パイプライン ランナー、パイプライン オプションの設定、ローカル実行とクラウド実行について詳しくは、実行パラメータを指定するをご覧ください。

Dataflow SDK で、Pipeline オブジェクトの作成時にパイプライン オプションで PipelineRunner を指定します。パイプラインの作成が完了したら、次のようにパイプライン オブジェクトに対して run を呼び出します。

Java

p.run();
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。