パイプライン I/O

パイプラインを作成するときは、多くの場合、Google Cloud Storage 内のファイルや BigQuery テーブルなど外部ソースからデータを読み取る必要があります。同様に、パイプラインの結果データを、Cloud Storage 内の出力ファイルや BigQuery などの外部データシンクに出力する場合もあります。Dataflow SDK によって提供される変換は、外部ソースのデータの読み取りや、外部シンクへのデータの書き込みを行うことができます。

Dataflow SDK では、一般的な多くのデータ ストレージ タイプに対応する Read 変換と Write 変換が提供されます。また、Read および Write の API は拡張可能です。つまり、独自の Read オペレーションや Write オペレーションを行う機能拡張を構築し、組み込みの変換でサポートされないデータ ストレージ形式の読み取りや書き込みをパイプラインで行うことができます。

入力データを読み取る

Read 変換は、外部ソースからデータを読み取り、パイプラインが使用できるようにそのデータの PCollection 表現を返します。パイプライン作成中のどの時点でも、Read 変換を使用して新しい PCollection を作成できますが、通常はプログラムの最初で行います。

Java

注: Read 変換には入力 PCollection がないため、変換は Pipeline に直接適用されます。通常どおり、apply を呼び出すと、要素がデータを表す適切な型の PCollection が返されます。詳しくは、パイプラインを作成するをご覧ください。

複数のロケーションから読み取る

Text など多くの Read 変換は、指定する glob 演算子と一致する複数の入力ファイルの読み取りをサポートしています。次の Read 変換では、glob 演算子(*)を使用して、Google Cloud Storage 内の指定した場所にあり、条件に一致するすべての入力ファイルを読み取ります。

Java

  p.apply(TextIO.Read.named("ReadFromText")
		     .from("gs://my_bucket/path/to/input-*.csv");

上記の Read は、Cloud Storage の所定の位置にあり、名前が input- で始まり .csv で終わるすべてのファイルを読み取ります。

さまざまなソースのデータを 1 つの PCollection に読み込むには、各ソースを個別に読み取ってから、Flatten 変換を行い、1 つの PCollection を作成します。

出力データを書き込む

Write 変換は、PCollection のデータを外部データソースに書き込みます。ほとんどの場合、Write 変換をプログラムの最後で使用し、パイプラインの最終結果を出力します。ただし、パイプラインの任意の時点で Write を使用して PCollection のデータを出力することもできます。

Write 変換を使用するには、書き込み先の PCollection に対して apply メソッドを呼び出し、適切な Write 変換を引数として渡します。

Java

PCollectionWrite 変換の apply を行うと、型 PDone のオブジェクトが返されます。PDone オブジェクトは簡単な結果オブジェクトで、無視してもかまいません。

複数の出力ファイルに書き込む

Text など、ファイルベースの入力データと出力データの場合、Write 変換はデフォルトで複数の出力ファイルに書き込みます。Cloud Dataflow サービスでは、分割出力ファイルが常に自動的に生成されます。出力ファイル名を Write 変換に渡すと、そのファイル名が、Write 変換で生成されるすべての出力ファイルの接頭辞として使用されます。

各出力ファイルに接尾辞を付けるには Write 変換に接尾辞を指定します。

次の Write 変換では、複数の出力ファイルを Cloud Storage 内の場所に書き込みます。各ファイルの名前は、接頭辞 "numbers"、数値タグ、接尾辞 ".csv" で構成されます。

Java

  records.apply(TextIO.Write.named("WriteToText")
			    .to("gs://my_bucket/path/to/numbers")
			    .withSuffix(".csv"));

上記の Write は、名前が numbers で始まり .csv で終わる複数の出力ファイルを Cloud Storage 内の指定のロケーションに書き込みます。

Dataflow SDK に含まれる I/O API

Dataflow SDK にはソースとシンクの API が含まれています。

Java

Dataflow SDK for Java では、次のような多くの一般的なデータ形式に対応する Read 変換と Write 変換が提供されます。

その他の I/O API

前述した I/O API の他に Dataflow SDK では拡張可能 API が提供され、これを使用すると、独自のカスタム データソースおよびデータシンクを作成することができます。

Java

Dataflow のソース API とシンク API を使用して、独自のカスタム入力ソースおよびカスタム出力シンクを作成できます。