パイプラインを作成するときは、多くの場合、Google Cloud Storage 内のファイルや BigQuery テーブルなど外部ソースからデータを読み取る必要があります。同様に、パイプラインの結果データを、Cloud Storage 内の出力ファイルや BigQuery などの外部データシンクに出力する場合もあります。Dataflow SDK によって提供される変換は、外部ソースのデータの読み取りや、外部シンクへのデータの書き込みを行うことができます。
Dataflow SDK では、一般的な多くのデータ ストレージ タイプに対応する Read
変換と Write
変換が提供されます。また、Read
および Write
の API は拡張可能です。つまり、独自の Read
オペレーションや Write
オペレーションを行う機能拡張を構築し、組み込みの変換でサポートされないデータ ストレージ形式の読み取りや書き込みをパイプラインで行うことができます。
入力データを読み取る
Read
変換は、外部ソースからデータを読み取り、パイプラインが使用できるようにそのデータの PCollection
表現を返します。パイプライン作成中のどの時点でも、Read
変換を使用して新しい PCollection
を作成できますが、通常はプログラムの最初で行います。
Java
注: Read
変換には入力 PCollection
がないため、変換は Pipeline
に直接適用されます。通常どおり、apply
を呼び出すと、要素がデータを表す適切な型の PCollection
が返されます。詳しくは、パイプラインを作成するをご覧ください。
複数のロケーションから読み取る
Text など多くの Read
変換は、指定する glob 演算子と一致する複数の入力ファイルの読み取りをサポートしています。次の Read
変換では、glob 演算子(*
)を使用して、Google Cloud Storage 内の指定した場所にあり、条件に一致するすべての入力ファイルを読み取ります。
Java
p.apply(TextIO.Read.named("ReadFromText") .from("gs://my_bucket/path/to/input-*.csv");
上記の Read
は、Cloud Storage の所定の位置にあり、名前が input- で始まり .csv で終わるすべてのファイルを読み取ります。
さまざまなソースのデータを 1 つの PCollection
に読み込むには、各ソースを個別に読み取ってから、Flatten 変換を行い、1 つの PCollection
を作成します。
出力データを書き込む
Write
変換は、PCollection
のデータを外部データソースに書き込みます。ほとんどの場合、Write
変換をプログラムの最後で使用し、パイプラインの最終結果を出力します。ただし、パイプラインの任意の時点で Write
を使用して PCollection
のデータを出力することもできます。
Write
変換を使用するには、書き込み先の PCollection
に対して apply
メソッドを呼び出し、適切な Write
変換を引数として渡します。
Java
PCollection
に Write
変換の apply
を行うと、型 PDone
のオブジェクトが返されます。PDone
オブジェクトは簡単な結果オブジェクトで、無視してもかまいません。
複数の出力ファイルに書き込む
Text など、ファイルベースの入力データと出力データの場合、Write
変換はデフォルトで複数の出力ファイルに書き込みます。Cloud Dataflow サービスでは、分割出力ファイルが常に自動的に生成されます。出力ファイル名を Write
変換に渡すと、そのファイル名が、Write
変換で生成されるすべての出力ファイルの接頭辞として使用されます。
各出力ファイルに接尾辞を付けるには Write
変換に接尾辞を指定します。
次の Write
変換では、複数の出力ファイルを Cloud Storage 内の場所に書き込みます。各ファイルの名前は、接頭辞 "numbers"、数値タグ、接尾辞 ".csv" で構成されます。
Java
records.apply(TextIO.Write.named("WriteToText") .to("gs://my_bucket/path/to/numbers") .withSuffix(".csv"));
上記の Write
は、名前が numbers で始まり .csv で終わる複数の出力ファイルを Cloud Storage 内の指定のロケーションに書き込みます。
Dataflow SDK に含まれる I/O API
Dataflow SDK にはソースとシンクの API が含まれています。
その他の I/O API
前述した I/O API の他に Dataflow SDK では拡張可能 API が提供され、これを使用すると、独自のカスタム データソースおよびデータシンクを作成することができます。
Java
Dataflow のソース API とシンク API を使用して、独自のカスタム入力ソースおよびカスタム出力シンクを作成できます。