Dataflow で変更をストリーミングする

Bigtable Beam コネクタを使用すると、Dataflow でコード内のパーティション変更の追跡や処理を行うことなく、Bigtable データ変更レコードを読み取ることができます。これは、このロジックをコネクタが処理するためです。

このドキュメントでは、Bigtable Beam コネクタを構成して使用し、Dataflow パイプラインを使用して変更ストリームを読み取る方法について説明します。このドキュメントを読む前に、変更ストリームの概要Dataflow について理解しておく必要があります。

独自のパイプラインの構築に代わる方法

独自の Dataflow パイプラインを構築しない場合は、次のいずれかのオプションを使用できます。

Google 提供の Dataflow テンプレートを使用できます。

コードの出発点として、Bigtable のチュートリアルまたはクイックスタートのコードサンプルを利用することもできます。

生成するコードが google cloud libraries-bom バージョン 26.14.0 以降を使用していることを確認します。

コネクタの詳細

Bigtable Beam コネクタ メソッド(BigtableIO.readChangeStream)を使用すると、処理可能なデータ変更レコード(ChangeStreamMutation)のストリームを読み取ることができます。Bigtable Beam コネクタは、Apache Beam GitHub リポジトリのコンポーネントです。コネクタコードの説明については、BigtableIO.java のコメントをご覧ください。

Beam バージョン 2.48.0 以降のコネクタを使用する必要があります。Apache Beam ランタイム サポートをチェックして、サポートされているバージョンの Java を使用していることを確認します。コネクタを使用するパイプラインを Dataflow にデプロイできます。これは、リソースのプロビジョニングと管理を行い、ストリーム データ処理のスケーラビリティと信頼性をサポートします。

Apache Beam プログラミング モデルの詳細については、Beam のドキュメントをご覧ください。

イベント時間のないデータのグループ化

Bigtable Beam コネクタを使用してストリーミングされるデータ変更レコードは、イベント時間に依存する Dataflow 関数と互換性がありません。

レプリケーションとウォーターマークで説明したように、パーティションのレプリケーションがインスタンスの残りの部分に追いついていない場合、低ウォーターマークは前に進まなくなります。低ウォーターマークが進まなくなると、変更ストリームが停止する可能性があります。

ストリームの停滞を防ぐため、Bigtable Beam コネクタは出力タイムスタンプをゼロにして、すべてのデータを出力します。Dataflow では、ゼロのタイムスタンプを持つデータ変更レコードはすべて遅延データとみなされます。そのため、イベント時間に依存する Dataflow 機能には、Bigtable 変更ストリームとの互換性がありません。たとえば、ウィンドウ関数イベント時間トリガーイベント時間タイマーは使用できません。

代わりに、チュートリアルの例に示されているように、非イベント時間トリガーで GlobalWindows を使用して、この遅延データをペインにグループ化できます。トリガーとペインの詳細については、Beam プログラミング ガイドのトリガーをご覧ください。

自動スケーリング

コネクタは Dataflow 自動スケーリングをサポートしています。Runner v2 を使用する場合、これはデフォルトで有効になっています(必須)。Dataflow の自動スケーリング アルゴリズムは、推定された変更ストリームのバックログを考慮します。これは、Dataflow モニタリング ページの Backlog セクションでモニタリングできます。ワーカー数の上限を設定するジョブをデプロイする場合は、--maxNumWorkers フラグを使用します。

自動スケーリングを使用する代わりにパイプラインを手動でスケーリングするには、ストリーミング パイプラインを手動でスケーリングするをご覧ください。

制限事項

Dataflow で Bigtable Beam コネクタを使用する前に、次の制限事項に注意してください。

Dataflow Runner V2

コネクタを実行するには、Dataflow Runner v2 を使用する必要があります。これを有効にするには、コマンドライン引数で --experiments=use_runner_v2 を指定します。Runner v1 で実行すると、次の例外によりパイプラインが失敗します。

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

スナップショット

コネクタは Dataflow のスナップショットをサポートしていません。

重複

Bigtable Beam コネクタは、各行キーと各クラスターの変更をコミット タイムスタンプ順にストリーミングしますが、ストリーム内の以前の時点から再開されることがあるため、重複が発生する可能性があります。

始める前に

コネクタを使用する前に、次の前提条件を満たす必要があります。

認証の設定

ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

詳細については Set up authentication for a local development environment をご覧ください。

本番環境での認証の設定については、 Set up Application Default Credentials for code running on Google Cloudをご覧ください。

変更ストリームを有効にする

変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。新しいテーブルを作成して、変更ストリームを有効にすることもできます。

変更ストリームのメタデータ テーブル

Dataflow を使用して変更をストリーミングすると、Bigtable Beam コネクタによって、デフォルトで __change_stream_md_table という名前のメタデータ テーブルが作成されます。変更ストリーム メタデータ テーブルは、コネクタの動作状態を管理し、データ変更レコードに関するメタデータを格納します。

デフォルトでは、コネクタはストリーミングされるテーブルと同じインスタンスにテーブルを作成します。テーブルが正しく機能するように、メタデータ テーブルのアプリ プロファイルで単一クラスタ ルーティングを使用し、単一行のトランザクションを有効にする必要があります。

Cloud Bigtable コネクタを使用して Bigtable から変更をストリーミングする方法の詳細については、BigtableIO のドキュメントをご覧ください。

必要なロール

Dataflow を使用して Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

Bigtable から変更を読み取るには、次のロールが必要です。

  • 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin

Dataflow ジョブを実行するには、次のロールが必要です。

ロールの付与の詳細については、アクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Bigtable Beam コネクタを依存関係として追加する

Maven pom.xml ファイルに次の依存関係のようなコードを追加します。バージョンは 2.48.0 以降にする必要があります。

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

変更ストリームを読み取る

データ変更レコードを読み取る Dataflow パイプラインを構築するには、コネクタを構成してから、変換とシンクを追加します。次に、コネクタを使用して、Beam パイプラインの ChangeStreamMutation オブジェクトを読み込みます。

このセクションの Java で記述されたコードサンプルは、パイプラインを作成して使用し Key-Value ペアを文字列に変換する方法を示しています。各ペアは、行キーと ChangeStreamMutation オブジェクトで構成されます。このパイプラインは、各オブジェクトのエントリをカンマ区切りの文字列に変換します。

パイプラインをビルドする

次の Java のコードサンプルは、パイプラインの構築方法を示しています。

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

データ変更レコードを処理する

このサンプルは、行のデータ変更レコード内のエントリをすべてループ処理し、エントリのタイプに基づいて文字列への変換メソッドを呼び出す方法を示しています。

データ変更レコードに含めることができるエントリタイプのリストについては、データ変更レコードの内容をご覧ください。

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

このサンプルでは、writeエントリが変換されています。

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

この例では、セルの削除のエントリが変換されます。

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

このサンプルでは、列ファミリーの削除エントリが変換されています。


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

モニタリング

Google Cloud コンソールの次のリソースでは、Dataflow パイプラインを実行して Bigtable の変更ストリームを読み取る際に Google Cloud リソースを監視できます。

特に、次の指標を確認してください。

  • Bigtable の [モニタリング] ページで、次の指標を確認します。
    • 指標 cpu_load_by_app_profile_by_method_by_table変更ストリーム別の CPU 使用率データ。変更ストリームがクラスタの CPU 使用率に与える影響を示します。
    • 変更ストリームのストレージ使用量(バイト)change_stream_log_used_bytes)。
  • Dataflow のモニタリング ページで、データの鮮度を確認します。これにより、現在の時刻とウォーターマークの違いが示されます。通常は 2 分程度ですが、急激な増大により 1~2 分長くなる場合もあります。データの鮮度の指標がこのしきい値を常に上回っていると、パイプラインがリソース不足になり、Dataflow ワーカーを追加する必要があります。 データの鮮度は、データ変更レコードの処理が遅いかどうかを示すものではありません。
  • Dataflow processing_delay_from_commit_timestamp_MEAN 指標は、ジョブの存続期間中のデータ変更レコードの平均処理時間を示します。

Bigtable の server/latencies 指標は、Bigtable 変更ストリームを読み取っている時のデータ変更レコードの処理遅延ではなく、ストリーミング リクエストの継続時間を反映するため、Dataflow パイプラインを監視する場合には役に立ちません。変更ストリームのレイテンシが高いからといって、リクエストの処理が遅いわけではありません。接続が長時間開いていたことを意味します。

次のステップ