Dataflow SDK は PCollection
という特殊クラスを使用して、パイプライン内のデータを表します。PCollection
は複数要素のデータセットを表します。
PCollection
はパイプライン データとして考えることができます。Dataflow の変換では PCollection
を入力と出力として使用します。パイプライン内のデータを処理する場合は、PCollection
の形式にする必要があります。各 PCollection
は、特定の Pipeline
オブジェクトによって所有され、使用できるのはその Pipeline
オブジェクトのみです。
重要: このドキュメントには、制限なし PCollection
とウィンドウ処理の情報が含まれます。これらのコンセプトは Dataflow Java SDK のみに該当します。Dataflow Python SDK ではまだ使用できません。
PCollection の特徴
PCollection
は要素のための容器で、大容量に対応でき、変化しません。PCollection
に含めることができる要素の数に上限はありません。任意の PCollection
がメモリに収まることも、永続的なデータストアにバックアップされた非常に大きなデータセットになる可能性もあります。
Java
PCollection
内の複数の要素は、任意の型にできますが、すべてが同一の型でなければなりません。ただし、Dataflow は、分散処理に対応するために、個々の要素をバイト文字列としてエンコードする必要があります。Dataflow SDK で提供されるデータ エンコード メカニズムには、よく使用される型のための組み込みエンコードが含まれています。また、必要に応じてカスタム エンコードを指定できます。任意の型の対して有効なエンコードを作成することは困難ですが、単純な構造化型のカスタム エンコードを作成することは可能です。
大規模データ処理での重要なデータ型は Key-Value ペアです。Dataflow SDK はクラス KV<K, V>
を使用して Key-Value ペアを表します。
Python
大規模データ処理での重要なデータ型は Key-Value ペアです。Dataflow Python SDK は 2 つのタプルを使用して Key-Value ペアを表します。
PCollection の制限事項
PCollection
には、標準のコレクション クラスと異なる特徴がいくつかあります。
PCollection
は不変です。いったん作成した後で、個々の要素の追加、削除または変更を行うことはできません。PCollection
は個々の要素へのランダム アクセスはサポートしていません。PCollection
は、作成されたパイプラインに属します。PCollection
とPipeline
オブジェクト間で共有することはできません。
PCollection
は、既存のストレージ内のデータに物理的に対応することも、まだ計算されていないデータを表したりすることもできます。したがって、PCollection
のデータは不変です。新しいパイプライン データを生成する計算で新しい PCollection
として PCollection
を使用できます。いったん作成された既存の PCollection
の要素を変更することはできません。
PCollection
そのものはデータを格納しませんが、PCollection
の要素が多すぎて、Dataflow が実行しているローカルメモリに収まらない場合があるので注意してください。PCollection
を作成または変換するときには、通常のコンテナクラスと同様に、データはメモリ内でコピーまたは移動されません。代わりに、PCollection
はクラウド内の非常に大きなデータセットを表します。
制限付き PCollection と制限なし PCollection
PCollection
のサイズを制限付きか制限なしのどちらかにできます。PCollection
の作成時に、制限付きか制限なしかが決まります。いくつかのルート変換は PCollections
の境界を作成し、他のルート変換は制限なしの境界を作成します。入力データのソースに依存します。
制限付き PCollection
変化しない既知のサイズの「固定されたデータセット」を表す場合には、PCollection
が制限付きになります。固定データセットの例は、「10 月のサーバーログ」や「先週処理されたすべての注文」です。TextIO
と BigQueryIO
ルート変換では制限付き PCollection
が作成されます。
制限付き PCollection
を作成するデータソースを次に示します。
Java
TextIO
BigQueryIO
DatastoreIO
- Custom Source API を使用して自ら作成した制限付きカスタム データソース
Python
TextIO
BigQueryIO
- Custom Source API を使用して自ら作成した制限付きカスタム データソース
制限付き PCollection
を受け入れるデータシンクを次に示します。
Java
TextIO
BigQueryIO
DatastoreIO
- Custom Sink API を使用して自ら作成した制限付きカスタム データシンク
Python
TextIO
BigQueryIO
- Custom Sink API を使用して自ら作成した制限付きカスタム データシンク
制限なし PCollection
「継続的に更新されるデータセット」、つまりストリーミング データを表す場合には、PCollection
が制限なしになります。更新され続けるデータセットの例は、「生成中のサーバーログ」や「処理中のすべての新規注文」です。PubsubIO
ルート変換では制限なし PCollection
が作成されます。
一部のソース、特に制限なし PCollection
を作成するソース(PubsubIO
など)は、コレクションの各要素に自動的にタイムスタンプを付加します。
制限なし PCollection
を作成するデータソースを次に示します。
制限なし PCollection
を受け入れるデータシンクを次に示します。
PubsubIO
BigQueryIO
処理の特徴
PCollection
が制限付きか制限なしかによって、Dataflow がデータを処理する方法に影響します。制限付き PCollection
は、バッチジョブを使用して処理できます。データセット全体を一度で読み取り、有限ジョブで処理を実行します。制限なし PCollection
は、ストリーミング ジョブを使用して処理する必要があります。コレクション全体を一度に処理することはできません。
制限なし PCollection
をグループ化する場合、Dataflow では、継続的に更新されるデータセットを限定サイズの論理的なウィンドウに分割するためのウィンドウ処理と呼ばれるコンセプトが必要になります。Dataflow は各ウィンドウをバンドルとして処理し、データセットが生成される間、処理が続きます。詳しくは、タイムスタンプとウィンドウ処理について次のセクションをご覧ください。
PCollection 要素のタイムスタンプ
PCollection
内の各要素にはタイムスタンプが関連付けられます。タイムスタンプは、時間に関連する要素が含まれる PCollection
で役立ちます。たとえば、処理対象の注文の PCollection
は、要素のタイムスタンプとして発注時刻を使用できます。
各要素のタイムスタンプは、PCollection
を作成するソースによって最初に割り当てられます。制限なし PCollection
を作成するソースは、制限なし PCollection
に追加された時刻に応じて、新しい要素にタイムスタンプを割り当てます。
Java
固定データセットを生成するデータソース(BigQueryIO
や TextIO
など)も、各要素にタイムスタンプを割り当てます。ただし、通常、これらのデータソースは同じタイムスタンプ(Long.MIN_VALUE
)を各要素に割り当てます。
PCollection
の要素には手動でタイムスタンプを割り当てることができます。これが一般的に行われるのは、要素に固有のタイムスタンプがあるが、そのタイムスタンプを計算する必要がある場合です。たとえば、要素の構造に基づいてタイムスタンプを解析します。タイムスタンプを手動で割り当てるには、ParDo 変換を使用します。ParDo
変換内で、DoFn
はタイムスタンプ付き出力要素を生成できます。詳しくは、タイムスタンプを割り当てるをご覧ください。
Python
PCollection
の要素には手動でタイムスタンプを割り当てることができます。これが一般的に行われるのは、要素に固有のタイムスタンプがあるが、そのタイムスタンプを計算する必要がある場合です。たとえば、要素の構造に基づいてタイムスタンプを解析します。タイムスタンプを手動で割り当てるには、ParDo 変換を使用します。ParDo
変換内で、DoFn
はタイムスタンプ付き出力要素を生成できます。
ウィンドウ処理
PCollection
の各要素に関連付けられるタイムスタンプは、ウィンドウ処理というコンセプトで使用されます。ウィンドウ処理により、PCollection
内の要素がタイムスタンプに応じて分けられます。ウィンドウ処理をすべての PCollection
に対して使用できますが、制限なし PCollection
に対する一部の計算処理では必須です。継続するデータ ストリームを、有限サイズのチャンクに分けて処理する必要があるためです。
Dataflow のウィンドウ処理概念をパイプラインで使用する方法について詳しくは、ウィンドウ処理のセクションをご覧ください。
PCollection を作成する
Cloud Dataflow パイプラインでデータセットを操作するには、保存場所にかかわらず、データを表す PCollection
を作成する必要があります。Dataflow SDK では、最初の PCollection
を作成する方法が 2 つあります。
- ファイルなど外部データソースからデータを読み取ります。
- メモリ内のコレクション クラスに格納されているデータの
PCollection
を作成します。
外部データを読み取る
外部データソースからデータを読み取る方法について詳しくは、パイプライン I/O をご覧ください。
ローカルメモリのデータから PCollection を作成する
ローカルメモリのデータ外に PCollection
を作成し、パイプラインの変換でそのデータを使用できるようにします。通常、ローカルメモリのデータを使用するのは、比較的小さなデータセットでパイプラインをテストし、テスト中に外部 I/O に対するパイプラインの依存を減らす場合です。
Java
メモリ内 Java Collection
から PCollection
を作成するには、Create
変換を apply
します。Create
は、Dataflow SDK for Java から提供されるルート PTransform
です。Create
は、Collection
の要素のエンコード方法を指定する Java Collection
オブジェクトと Coder
オブジェクトを受け入れます。
次のサンプルコードは、String
の PCollection
を作成し、Java List
からのテキストの個々の行を表します。
// Create a Java Collection, in this case a List of Strings. static final List<String> LINES = Arrays.asList( "To be, or not to be: that is the question: ", "Whether 'tis nobler in the mind to suffer ", "The slings and arrows of outrageous fortune, ", "Or to take arms against a sea of troubles, "); PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()) // create the PCollection
上記のコードは Create.of
を使用して、指定の要素を含む PCollection
を生成します。パイプラインでウィンドウ処理が使用される場合は、代わりに Create.timestamped を使用する必要があります。Create.timestamped
は、指定した要素と指定したタイムスタンプを含む PCollection
を生成します。
Python
PCollection
を作成するには、Create
変換を適用します。Create
は Dataflow Python SDK によって提供される標準の変換です。
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # argv = None # if None, uses sys.argv pipeline_options = PipelineOptions(argv) with beam.Pipeline(options=pipeline_options) as pipeline: lines = ( pipeline | beam.Create([ 'To be, or not to be: that is the question: ', "Whether 'tis nobler in the mind to suffer ", 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ', ]))
カスタムデータ型の PCollection を使用する
指定するカスタムデータ型の要素を含む PCollection
を作成できます。これが役立つのは、お客様の名前、住所、電話番号を保持する Java クラスなど、特定のフィールドを含む独自のクラスまたは構造のコレクションを作成する必要がある場合です。
カスタム型の PCollection
を作成する場合は、そのカスタム型の Coder
を指定する必要があります。データセットが並列化されて複数のパイプライン ワーカー インスタンスに区分化される場合、Coder
は、PCollection
の要素をシリアル化または逆シリアル化する方法を Dataflow サービスに指示します。詳細については、データのエンコードをご覧ください。
Dataflow は、Coder
を明示的に設定していない PCollection
があれば Coder
を推定しようとします。カスタム型のデフォルト Coder
は、Java シリアル化を使用する SerializableCoder
です。Dataflow では、可能であれば Coder
として AvroCoder
を使用することをおすすめします。
Pipeline
オブジェクトの CoderRegistry を使用すると、独自のデータ型のデフォルト コーダーとして AvroCoder
を登録できます。クラスに次のようにアノテーションを指定してください。
Java
@DefaultCoder(AvroCoder.class) public class MyClass { ... }
カスタムクラスと AvroCoder
の互換性を保証するために、追加のアノテーションが必要になることがあります。たとえば、データ型の null フィールドに org.apache.avro.reflect.Nullable
でアノテーションを指定する必要があります。AvroCoder
と org.apache.avro.reflect
のパッケージ ドキュメントの詳細については、API for Java リファレンス ドキュメントをご覧ください。
Dataflow の TrafficRoutes サンプル パイプラインは、StationSpeed
というカスタムクラスの要素型を持つ PCollection
を作成します。次に示すように、StationSpeed
によって AvroCoder
がデフォルト コーダーとして登録されます。
Java
/** * This class holds information about a station reading's average speed. */ @DefaultCoder(AvroCoder.class) static class StationSpeed { @Nullable String stationId; @Nullable Double avgSpeed; public StationSpeed() {} public StationSpeed(String stationId, Double avgSpeed) { this.stationId = stationId; this.avgSpeed = avgSpeed; } public String getStationId() { return this.stationId; } public Double getAvgSpeed() { return this.avgSpeed; } }