Dataflow で変更をストリーミングする
Bigtable Beam コネクタを使用すると、Dataflow でコード内のパーティション変更の追跡や処理を行うことなく、Bigtable データ変更レコードを読み取ることができます。これは、このロジックをコネクタが処理するためです。
このドキュメントでは、Bigtable Beam コネクタを構成して使用し、Dataflow パイプラインを使用して変更ストリームを読み取る方法について説明します。このドキュメントを読む前に、変更ストリームの概要と Dataflow について理解しておく必要があります。
独自のパイプラインの構築に代わる方法
独自の Dataflow パイプラインを構築しない場合は、次のいずれかのオプションを使用できます。
Google 提供の Dataflow テンプレートを使用できます。
コードの出発点として、Bigtable のチュートリアルまたはクイックスタートのコードサンプルを利用することもできます。
生成するコードが google cloud libraries-bom
バージョン 26.14.0 以降を使用していることを確認します。
コネクタの詳細
Bigtable Beam コネクタ メソッド(BigtableIO.readChangeStream
)を使用すると、処理可能なデータ変更レコード(ChangeStreamMutation
)のストリームを読み取ることができます。Bigtable Beam コネクタは、Apache Beam GitHub リポジトリのコンポーネントです。コネクタコードの説明については、BigtableIO.java
のコメントをご覧ください。
Beam バージョン 2.48.0 以降のコネクタを使用する必要があります。Apache Beam ランタイム サポートをチェックして、サポートされているバージョンの Java を使用していることを確認します。コネクタを使用するパイプラインを Dataflow にデプロイできます。これは、リソースのプロビジョニングと管理を行い、ストリーム データ処理のスケーラビリティと信頼性をサポートします。
Apache Beam プログラミング モデルの詳細については、Beam のドキュメントをご覧ください。
イベント時間のないデータのグループ化
Bigtable Beam コネクタを使用してストリーミングされるデータ変更レコードは、イベント時間に依存する Dataflow 関数と互換性がありません。
レプリケーションとウォーターマークで説明したように、パーティションのレプリケーションがインスタンスの残りの部分に追いついていない場合、低ウォーターマークは前に進まなくなります。低ウォーターマークが進まなくなると、変更ストリームが停止する可能性があります。
ストリームの停滞を防ぐため、Bigtable Beam コネクタは出力タイムスタンプをゼロにして、すべてのデータを出力します。Dataflow では、ゼロのタイムスタンプを持つデータ変更レコードはすべて遅延データとみなされます。そのため、イベント時間に依存する Dataflow 機能には、Bigtable 変更ストリームとの互換性がありません。たとえば、ウィンドウ関数、イベント時間トリガー、イベント時間タイマーは使用できません。
代わりに、チュートリアルの例に示されているように、非イベント時間トリガーで GlobalWindows を使用して、この遅延データをペインにグループ化できます。トリガーとペインの詳細については、Beam プログラミング ガイドのトリガーをご覧ください。
自動スケーリング
コネクタは Dataflow 自動スケーリングをサポートしています。Runner v2 を使用する場合、これはデフォルトで有効になっています(必須)。Dataflow の自動スケーリング アルゴリズムは、推定された変更ストリームのバックログを考慮します。これは、Dataflow モニタリング ページの Backlog
セクションでモニタリングできます。ワーカー数の上限を設定するジョブをデプロイする場合は、--maxNumWorkers
フラグを使用します。
自動スケーリングを使用する代わりにパイプラインを手動でスケーリングするには、ストリーミング パイプラインを手動でスケーリングするをご覧ください。
制限事項
Dataflow で Bigtable Beam コネクタを使用する前に、次の制限事項に注意してください。
Dataflow Runner V2
コネクタを実行するには、Dataflow Runner v2 を使用する必要があります。これを有効にするには、コマンドライン引数で --experiments=use_runner_v2
を指定します。Runner v1 で実行すると、次の例外によりパイプラインが失敗します。
java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow
スナップショット
コネクタは Dataflow のスナップショットをサポートしていません。
重複
Bigtable Beam コネクタは、各行キーと各クラスターの変更をコミット タイムスタンプ順にストリーミングしますが、ストリーム内の以前の時点から再開されることがあるため、重複が発生する可能性があります。
始める前に
コネクタを使用する前に、次の前提条件を満たしてください。
認証の設定
ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
詳細については Set up authentication for a local development environment をご覧ください。
本番環境での認証の設定については、 Set up Application Default Credentials for code running on Google Cloudをご覧ください。
変更ストリームを有効にする
変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。新しいテーブルを作成して、変更ストリームを有効にすることもできます。
変更ストリームのメタデータ テーブル
Dataflow を使用して変更をストリーミングすると、Bigtable Beam コネクタによって、デフォルトで __change_stream_md_table
という名前のメタデータ テーブルが作成されます。変更ストリームのメタデータ テーブルは、コネクタの運用状態を管理し、データ変更レコードに関するメタデータを保存します。
デフォルトでは、コネクタはストリーミングされるテーブルと同じインスタンスにテーブルを作成します。テーブルが正しく機能するようにするには、メタデータ テーブルのアプリ プロファイルで単一クラスタ ルーティングを使用し、単一行トランザクションを有効にする必要があります。
Cloud Bigtable コネクタを使用して Bigtable から変更をストリーミングする方法の詳細については、BigtableIO のドキュメントをご覧ください。
必要なロール
Dataflow を使用して Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。
Bigtable から変更を読み取るには、次のロールが必要です。
- 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin)
Dataflow ジョブを実行するには、次のロールが必要です。
- Cloud リソースを含むプロジェクトの Dataflow デベロッパー(
roles/dataflow.developer
) - Cloud リソースを含むプロジェクトに対する Dataflow ワーカー(roles/dataflow.worker)
- 使用する Cloud Storage バケットに対するストレージ オブジェクト管理者(roles/storage.objectAdmin)
ロールの付与の詳細については、アクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
Bigtable Beam コネクタを依存関係として追加する
Maven pom.xml ファイルに次の依存関係のようなコードを追加します。バージョンは 2.48.0 以降にする必要があります。
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
変更ストリームを読み取る
データ変更レコードを読み取る Dataflow パイプラインを構築するには、コネクタを構成してから、変換とシンクを追加します。次に、コネクタを使用して、Beam パイプラインの ChangeStreamMutation
オブジェクトを読み込みます。
このセクションの Java で記述されたコードサンプルは、パイプラインを作成して使用し Key-Value ペアを文字列に変換する方法を示しています。各ペアは、行キーと ChangeStreamMutation
オブジェクトで構成されます。このパイプラインは、各オブジェクトのエントリをカンマ区切りの文字列に変換します。
パイプラインをビルドする
次の Java のコードサンプルは、パイプラインの構築方法を示しています。
データ変更レコードを処理する
このサンプルは、行のデータ変更レコード内のエントリをすべてループ処理し、エントリのタイプに基づいて文字列への変換メソッドを呼び出す方法を示しています。
データ変更レコードに含めることができるエントリタイプのリストについては、データ変更レコードの内容をご覧ください。
このサンプルでは、書き込みエントリが変換されています。
この例では、セルの削除のエントリが変換されます。
このサンプルでは、列ファミリーの削除エントリが変換されています。
モニタリング
Google Cloud コンソールの次のリソースでは、Dataflow パイプラインを実行して Bigtable の変更ストリームを読み取る際に Google Cloud リソースを監視できます。
特に、次の指標を確認してください。
- Bigtable の [モニタリング] ページで、次の指標を確認します。
- 指標
cpu_load_by_app_profile_by_method_by_table
の変更ストリーム別の CPU 使用率データ。変更ストリームがクラスタの CPU 使用率に与える影響を示します。 - 変更ストリームのストレージ使用量(バイト)(
change_stream_log_used_bytes
)。
- 指標
- Dataflow のモニタリング ページで、データの鮮度を確認します。これにより、現在の時刻とウォーターマークの違いが示されます。通常は 2 分程度ですが、急激な増大により 1~2 分長くなる場合もあります。データの鮮度の指標がこのしきい値を常に上回っていると、パイプラインがリソース不足になり、Dataflow ワーカーを追加する必要があります。 データの更新頻度から、データ変更レコードの処理速度が遅いかどうかはわかりません。
- Dataflow
processing_delay_from_commit_timestamp_MEAN
指標は、ジョブの存続期間中のデータ変更レコードの平均処理時間を示します。
Bigtable の server/latencies
指標は、Bigtable 変更ストリームを読み取っている時のデータ変更レコードの処理遅延ではなく、ストリーミング リクエストの継続時間を反映するため、Dataflow パイプラインを監視する場合には役に立ちません。変更ストリームでレイテンシが高い場合、リクエストの処理が遅いわけではありません。接続が長時間オープン状態だったことを意味します。