Cloud Storage から Dataflow に読み込む

Cloud Storage から Dataflow にデータを読み取るには、Apache Beam TextIO または AvroIOI/O コネクタを使用します。

Google Cloud ライブラリの依存関係を含める

Cloud Storage で TextIO または AvroIO コネクタを使用するには、次の依存関係を含めます。このライブラリは、"gs://" ファイル名のスキーマ ハンドラを提供します。

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

詳細については、Apache Beam SDK をインストールするをご覧ください。

Dataflow の Apache Beam I/O コネクタで gRPC を有効にする

Dataflow の Apache Beam I/O コネクタを介して、gRPC を使用して Cloud Storage に接続できます。gRPC は、Google が開発したオープンソースの高性能リモート プロシージャ コール(RPC)フレームワークで、Cloud Storage の操作に使用できます。

Dataflow ジョブの Cloud Storage への読み取りリクエストを高速化するには、Dataflow で Apache Beam I/O コネクタを有効にして gRPC を使用します。

コマンドライン

  1. Apache Beam SDK バージョン 2.55.0 以降を使用していることを確認します。
  2. Dataflow ジョブを実行するには、--additional-experiments=use_grpc_for_gcs パイプライン オプションを使用します。さまざまなパイプライン オプションについては、オプション フラグをご覧ください。

Apache Beam SDK

  1. Apache Beam SDK バージョン 2.55.0 以降を使用していることを確認します。
  2. Dataflow ジョブを実行するには、--experiments=use_grpc_for_gcs パイプライン オプションを使用します。さまざまなパイプライン オプションについては、基本オプションをご覧ください。

Dataflow で Apache Beam I/O コネクタを設定して、Cloud Monitoring で gRPC 関連の指標を生成できます。gRPC 関連の指標は、次の目的に役立ちます。

  • Cloud Storage に対する gRPC リクエストのパフォーマンスをモニタリングして最適化する。
  • 問題のトラブルシューティングとデバッグを行う。
  • アプリの使用状況と動作に関する分析情報を取得する。

Dataflow で Apache Beam I/O コネクタを設定して gRPC 関連の指標を生成する方法については、クライアントサイドの指標を使用するをご覧ください。ユースケースで指標を収集する必要がない場合は、指標の収集をオプトアウトできます。手順については、クライアントサイドの指標をオプトアウトするをご覧ください。

並列処理

TextIO コネクタと AvroIO コネクタは、次の 2 つのレベルの並列処理をサポートしています。

  • 個々のファイルは個別にキーが設定されるため、複数のワーカーで読み取ることができます。
  • ファイルが圧縮されていない場合、コネクタは各ファイルのサブ範囲を個別に読み取る可能性があるため、非常に高度な並列処理が発生します。この分割は、ファイル内の各行が意味のあるレコードである場合にのみ可能です。たとえば、デフォルトでは JSON ファイルに使用できません。

パフォーマンス

次の表に、Cloud Storage からの読み取りパフォーマンスの指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2 ワーカーで実行されています。Runner v2 は使用されていません。

1 億件のレコード | 1 KB | 1 列 スループット(バイト) スループット(要素)
読み取り 320 MBps 320,000 要素/秒

これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。

ベスト プラクティス

  • Cloud Storage では watchForNewFiles を使用しないでください。コネクタは参照されたファイルのリストをメモリに保持する必要があるため、大規模な本番環境パイプラインではスケーリングが難しくなります。リストはメモリからフラッシュできません。このため、時間の経過とともにワーカーの作業メモリが少なくなっていきます。代わりに Cloud Storage の Pub/Sub 通知の使用を検討してください。詳細については、ファイル処理パターンをご覧ください。

  • ファイル名とファイルの内容の両方が有用なデータである場合は、FileIO クラスを使用してファイル名を読み取ります。たとえば、ファイル内のデータを処理するときに役立つメタデータがファイル名に含まれていることがあります。詳しくは、ファイル名へのアクセスをご覧ください。FileIO のドキュメントでも、このパターンの例を紹介しています。

次の例は、Cloud Storage から読み取る方法を示しています。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

次のステップ