Cloud Storage から Dataflow に読み込む

Cloud Storage から Dataflow にデータを読み取るには、Apache Beam TextIO または AvroIOI/O コネクタを使用します。

Google Cloud ライブラリの依存関係を含める

Cloud Storage で TextIO または AvroIO コネクタを使用するには、次の依存関係を含めます。このライブラリは、"gs://" ファイル名のスキーマ ハンドラを提供します。

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

詳細については、Apache Beam SDK をインストールするをご覧ください。

並列処理

TextIO コネクタと AvroIO コネクタは、次の 2 つのレベルの並列処理をサポートしています。

  • 個々のファイルは個別にキーが設定されるため、複数のワーカーで読み取ることができます。
  • ファイルが圧縮されていない場合、コネクタは各ファイルのサブ範囲を個別に読み取る可能性があるため、非常に高度な並列処理が発生します。この分割は、ファイル内の各行が意味のあるレコードである場合にのみ可能です。たとえば、デフォルトでは JSON ファイルに使用できません。

パフォーマンス

次の表に、Cloud Storage からの読み取りパフォーマンスの指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2 ワーカーで実行されています。Runner v2 は使用されていません。

1 億件のレコード | 1 KB | 1 列 スループット(バイト) スループット(要素)
読み取り 320 MBps 320,000 要素/秒

これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。

ベスト プラクティス

  • Cloud Storage では watchForNewFiles を使用しないでください。コネクタは参照されたファイルのリストをメモリに保持する必要があるため、大規模な本番環境パイプラインではスケーリングが難しくなります。リストはメモリからフラッシュできません。このため、時間の経過とともにワーカーの作業メモリが少なくなっていきます。代わりに Cloud Storage の Pub/Sub 通知の使用を検討してください。詳細については、ファイル処理パターンをご覧ください。

  • ファイル名とファイルの内容の両方が有用なデータである場合は、FileIO クラスを使用してファイル名を読み取ります。たとえば、ファイル内のデータを処理するときに役立つメタデータがファイル名に含まれていることがあります。詳しくは、ファイル名へのアクセスをご覧ください。FileIO のドキュメントでも、このパターンの例を紹介しています。

次のステップ