PCollection

Dataflow SDK は PCollection という特殊クラスを使用して、パイプライン内のデータを表します。PCollection は複数要素のデータセットを表します。

PCollection はパイプライン データとして考えることができます。Dataflow の変換では入力や出力として PCollection を使用します。つまり、パイプラインでデータを扱うには PCollection の形式にする必要があります。各 PCollection は、特定の Pipeline オブジェクトによって所有され、使用できるのはその Pipeline オブジェクトのみです。

重要: このドキュメントには、制限なし PCollectionウィンドウ処理の情報が含まれます。これらの概念は Dataflow Java SDK のみに該当します。Dataflow Python SDK ではまだ使用できません。

PCollection の特徴

PCollection は要素のための容器で、大容量に対応でき、変化しません。PCollection に含めることができる要素数の上限はありません。どの PCollection もメモリに収まります。また、永続データストアを利用して非常に大きなデータセットを表すこともできます。

Java

PCollection 内の複数の要素は、任意の型にできますが、すべてが同一の型でなければなりません。ただし、Dataflow は、分散処理に対応するために、個々の要素をバイト文字列としてエンコーディングする必要があります。Dataflow SDK で提供されるデータ エンコーディング メカニズムには、よく使用される型のための組み込みエンコーディングが含まれています。また、必要に応じてカスタム エンコーディングを指定できます。任意の型の対して有効なエンコーディングを作成することは困難ですが、単純な構造化型のカスタム エンコーディングを作成することは可能です。

大規模データ処理での重要なデータ型は Key-Value ペアです。Dataflow SDK はクラス KV<K, V> を使用して Key-Value ペアを表します。

Python

大規模データ処理での重要なデータ型は Key-Value ペアです。Dataflow Python SDK は 2 つのタプルを使用して Key-Value ペアを表します。

PCollection の制限事項

PCollection には、標準のコレクション クラスと異なる特徴がいくつかあります。

  • PCollection は不変です。いったん作成した後で、個々の要素の追加、削除または変更を行うことはできません。
  • PCollection の個々の要素にランダムにアクセスすることはできません。
  • PCollection は、作成されたパイプラインに属します。Pipeline オブジェクト間で PCollection を共有することはできません。

PCollection は、既存のストレージ内のデータに物理的に対応することも、まだ計算されていないデータを表したりすることもできます。また、PCollection 内のデータは変更できません。PCollection を計算で使用して、新しいパイプライン データ(新しい PCollection)を生成することはできますが、いったん作成された既存の PCollection の要素は変更できません。

PCollection そのものはデータを格納しませんが、PCollection の要素が多すぎて、Dataflow が実行しているローカルメモリに収まらない場合があるので注意してください。PCollection を作成または変換するとき、標準のコンテナクラスとは異なり、データがメモリにコピーまたは移動されることはありません。その代わりに、PCollection は、クラウドにある潜在的に非常に大きなデータセットを表します。

制限付き PCollection と制限なし PCollection

PCollection のサイズを制限付き制限なしのどちらかにできます。PCollection の作成時に、制限付き(または制限なし)が決まります。ルート変換には、制限付き PCollections が作成されるものと制限なしが作成されるものがあります。これは入力データのソースによって異なります。

制限付き PCollection

変化しない既知のサイズの「固定されたデータセット」を表す場合には、PCollection が制限付きになります。固定データセットの例として「10 月のサーバーログ」や「先週処理されたすべての注文」があります。TextIO ルート変換や BigQueryIO ルート変換では、制限付き PCollection が作成されます。

制限付き PCollection を作成するデータソースを次に示します。

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Custom Source API を使用して自ら作成した制限付きカスタム データソース

Python

  • TextIO
  • BigQueryIO
  • Custom Source API を使用して自ら作成した制限付きカスタム データソース

制限付き PCollection を受け入れるデータシンクを次に示します。

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Custom Sink API を使用して自ら作成した制限付きカスタム データシンク

Python

  • TextIO
  • BigQueryIO
  • Custom Sink API を使用して自ら作成した制限付きカスタム データシンク

制限なし PCollection

「継続的に更新されるデータセット」、つまりストリーミング データを表す場合には、PCollection が制限なしになります。更新され続けるデータセットの例として「生成中のサーバーログ」や「処理中のすべての新規注文」があります。PubsubIO ルート変換では制限なし PCollection が作成されます。

一部のソース、特に制限なし PCollection を作成するソース(PubsubIO など)は、コレクションの各要素に自動的にタイムスタンプを付加します。

制限なし PCollection を作成するデータソースを次に示します。

  • PubsubIO
  • Custom Source API を使用して自ら作成した制限なしカスタム データソース

制限なし PCollection を受け入れるデータシンクを次に示します。

  • PubsubIO
  • BigQueryIO

処理の特徴

PCollection が制限付きか制限なしかによって、Dataflow がデータを処理する方法に影響します。制限付き PCollection は、バッチジョブを使用して処理できます。データセット全体を一度で読み取り、有限ジョブで処理を実行します。制限なし PCollection は、ストリーミング ジョブを使用して処理する必要があります。コレクション全体を一度に処理することはできません。

制限なし PCollection をグループ化する場合、Dataflow では、継続的に更新されるデータセットを限定サイズの論理的なウィンドウに分割するためのウィンドウ処理と呼ばれるコンセプトが必要になります。Dataflow は各ウィンドウをバンドルとして処理し、データセットが生成される間、処理が続きます。詳しくは、タイムスタンプとウィンドウ処理について次のセクションをご覧ください。

PCollection 要素のタイムスタンプ

PCollection 内の各要素にはタイムスタンプが関連付けられます。タイムスタンプは、時間に関連する要素が含まれる PCollection で役立ちます。たとえば、処理対象の注文の PCollection は、要素のタイムスタンプとして発注時刻を使用できます。

各要素のタイムスタンプは、PCollection を作成するソースによって最初に割り当てられます。多くの場合、制限なし PCollection を作成するソースは、新しい要素それぞれに、その要素が制限なし PCollection に追加された時刻に対応するタイムスタンプを割り当てます。

Java

また、固定データセットを生成するデータソース(BigQueryIOTextIO など)も、各要素にタイムスタンプを割り当てます。ただし通常、これらのデータソースは同じタイムスタンプ(Long.MIN_VALUE)を各要素に割り当てます。

PCollection の要素には手動でタイムスタンプを割り当てることができます。これが一般的に行われるのは、要素に固有のタイムスタンプがあるが、そのタイムスタンプを計算する必要がある場合です。たとえば、要素の構造に基づいてタイムスタンプを解析します。タイムスタンプを手動で割り当てるには、ParDo 変換を使用します。ParDo 変換内で、DoFn はタイムスタンプ付き出力要素を生成できます。詳しくは、タイムスタンプを割り当てるをご覧ください。

Python

PCollection の要素には手動でタイムスタンプを割り当てることができます。これが一般的に行われるのは、要素に固有のタイムスタンプがあるが、そのタイムスタンプを計算する必要がある場合です。たとえば、要素の構造に基づいてタイムスタンプを解析します。タイムスタンプを手動で割り当てるには、ParDo 変換を使用します。ParDo 変換内で、DoFn はタイムスタンプ付き出力要素を生成できます。

ウィンドウ処理をする

PCollection の各要素に関連付けられるタイムスタンプは、ウィンドウ処理というコンセプトで使用されます。ウィンドウ処理により、PCollection 内の要素がタイムスタンプに応じて分けられます。ウィンドウ処理をすべての PCollection に対して使用できますが、制限なし PCollection に対する一部の計算処理では必須です。継続するデータ ストリームを、有限サイズのチャンクに分けて処理する必要があるためです。

Dataflow のウィンドウ処理概念をパイプラインで使用する方法について詳しくは、ウィンドウ処理のセクションをご覧ください。

PCollection を作成する

Cloud Dataflow パイプラインでデータセットを処理するには、データが格納される場所にかかわらず、データを表す PCollection を作成する必要があります。Dataflow SDK では、最初の PCollection を作成するために主に次の 2 つの方法があります。

  • ファイルなど外部データソースからデータを読み取ります。
  • インメモリ コレクション クラスに格納されているデータの PCollection を作成します。

外部データを読み取る

外部データソースからデータを読み取る方法について詳しくは、パイプライン I/O をご覧ください。

ローカルメモリのデータから PCollection を作成する

ローカルメモリのデータから PCollection を作成して、パイプラインの変換でそのデータを使用できます。通常、ローカルメモリのデータを使用するのは、比較的小さなデータセットでパイプラインをテストし、テスト中に外部 I/O に対するパイプラインの依存を減らす場合です。

Java

インメモリの Java Collection から PCollection を作成するには、Create 変換の apply を行います。Create は、Dataflow SDK for Java によって提供されるルートの PTransform です。Create は、Java Collection と、Collection の要素のエンコーディング方法を指定する Coder オブジェクトを受け入れます。

次のコードサンプルは、Java の List から、テキストの個々の行を表す StringPCollection を作成します。

  // Create a Java Collection, in this case a List of Strings.
  static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection

上記のコードは Create.of を使用して、指定の要素を含む PCollection を生成します。なお、パイプラインでウィンドウ処理が使用される場合は、代わりに Create.timestamped を使用する必要があります。Create.timestamped は、指定した要素と指定したタイムスタンプを含む PCollection を生成します。

Python

PCollection を作成するには Create 変換を適用します。 Create は Dataflow Python SDK によって提供される標準の変換です。

with beam.Pipeline(options=pipeline_options) as p:

  lines = (p
           | beam.Create([
               'To be, or not to be: that is the question: ',
               'Whether \'tis nobler in the mind to suffer ',
               'The slings and arrows of outrageous fortune, ',
               'Or to take arms against a sea of troubles, ']))

カスタムデータ型の PCollection を使用する

指定するカスタムデータ型の要素を含む PCollection を作成できます。これが役立つのは、お客様の名前、住所、電話番号を保持する Java クラスなど、特定のフィールドを含む独自のクラスまたは構造のコレクションを作成する必要がある場合です。

カスタム型の PCollection を作成するときは、そのカスタム型の Coder を指定する必要があります。データセットが並列化されて複数のパイプライン ワーカー インスタンスに区分化される場合、Coder は、PCollection の要素をシリアル化または逆シリアル化する方法を Dataflow サービスに指示します。詳細については、データのエンコードをご覧ください。

Dataflow は、Coder を明示的に設定していない PCollection があれば Coder を推定しようとします。カスタム型のデフォルト Coder は、Java シリアル化を使用する SerializableCoder です。ただし、Dataflow では可能な場合には Coder として AvroCoder の使用が推奨されます。

Pipeline オブジェクトの CoderRegistry を使用すると、独自のデータ型のデフォルト コーダーとして AvroCoder を登録できます。クラスに次のようにアノテーションを指定してください。

Java

  @DefaultCoder(AvroCoder.class)
  public class MyClass {
    ...
 }

カスタムクラスと AvroCoder の互換性を保証するために、追加のアノテーションが必要になることがあります。たとえば、データ型の null フィールドに org.apache.avro.reflect.Nullable アノテーションを指定する必要が生じるかもしれません。詳細については、AvroCoder に関する API for Java リファレンス ドキュメントと、org.apache.avro.reflect のパッケージ ドキュメントをご覧ください。

Dataflow の TrafficRoutes サンプル パイプラインは、StationSpeed というカスタムクラスの要素型を持つ PCollection を作成します。次に示すように、StationSpeed によって AvroCoder がデフォルト コーダーとして登録されます。

Java

  /**
   * This class holds information about a station reading's average speed.
   */
  @DefaultCoder(AvroCoder.class)
  static class StationSpeed {
    @Nullable String stationId;
    @Nullable Double avgSpeed;

    public StationSpeed() {}

    public StationSpeed(String stationId, Double avgSpeed) {
      this.stationId = stationId;
      this.avgSpeed = avgSpeed;
    }

    public String getStationId() {
      return this.stationId;
    }
    public Double getAvgSpeed() {
      return this.avgSpeed;
    }
  }
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。