Dataflow で変更をストリーミングする

Bigtable Beam コネクタを使用すると、Dataflow でコード内のパーティション変更の追跡や処理を行うことなく、Bigtable データ変更レコードを読み取ることができます。これは、このロジックをコネクタが処理するためです。

このドキュメントでは、Bigtable Beam コネクタを構成して使用し、Dataflow パイプラインを使用して変更ストリームを読み取る方法について説明します。このドキュメントを読む前に、変更ストリームの概要Dataflow について理解しておく必要があります。

独自のパイプラインの構築に代わる方法

独自の Dataflow パイプラインを構築しない場合は、次のいずれかのオプションを使用できます。

Google 提供の Dataflow テンプレートを使用できます。

コードの出発点として、Bigtable のチュートリアルまたはクイックスタートのコードサンプルを利用することもできます。

生成するコードが google cloud libraries-bom バージョン 26.14.0 以降を使用していることを確認します。

コネクタの詳細

Bigtable Beam コネクタ メソッド(BigtableIO.readChangeStream)を使用すると、処理可能なデータ変更レコード(ChangeStreamMutation)のストリームを読み取ることができます。Bigtable Beam コネクタは、Apache Beam GitHub リポジトリのコンポーネントです。コネクタコードの説明については、BigtableIO.java のコメントをご覧ください。

Beam バージョン 2.48.0 以降でコネクタを使用する必要があります。Apache Beam ランタイム サポートをチェックして、サポートされているバージョンの Java を使用していることを確認します。コネクタを使用するパイプラインを Dataflow にデプロイできます。これは、リソースのプロビジョニングと管理を行い、ストリーム データ処理のスケーラビリティと信頼性をサポートします。

Apache Beam プログラミング モデルの詳細については、Beam のドキュメントをご覧ください。

イベント時間のないデータのグループ化

Bigtable Beam コネクタでストリーミングされるデータ変更レコードは、イベント時間に依存する Dataflow 関数と互換性がありません。

レプリケーションとウォーターマークで説明したように、パーティションのレプリケーションがインスタンスの残りの部分に追いついていない場合、低ウォーターマークは前に進まなくなります。低ウォーターマークが進まなくなると、変更ストリームが停止する可能性があります。

ストリームの停滞を防ぐため、Bigtable Beam コネクタは出力タイムスタンプをゼロにして、すべてのデータを出力します。Dataflow では、ゼロのタイムスタンプを持つデータ変更レコードはすべて遅延データとみなされます。そのため、イベント時間に依存する Dataflow 機能には、Bigtable 変更ストリームと互換性がありません。たとえば、ウィンドウ関数イベント時間トリガーイベント時間タイマーは使用できません。

代わりに、チュートリアルの例に示されているように、GlobalWindows イベント以外の時間トリガーを使用して、この遅延データをペインにグループ化できます。トリガーとペインの詳細については、Beam プログラミング ガイドのトリガーをご覧ください。

自動スケーリング

コネクタは Dataflow 自動スケーリングをサポートしています。Runner v2 を使用する場合、これはデフォルトで有効になっています(必須)。Dataflow の自動スケーリング アルゴリズムは、推定された変更ストリームのバックログを考慮します。これは、Dataflow モニタリング ページの Backlog セクションでモニタリングできます。ワーカー数の上限を設定するジョブをデプロイする場合は、--maxNumWorkers フラグを使用します。

自動スケーリングを使用する代わりにパイプラインを手動でスケーリングするには、ストリーミング パイプラインを手動でスケーリングするをご覧ください。

制限事項

Dataflow で Bigtable Beam コネクタを使用する前に、次の制限事項に注意してください。

Dataflow Runner V2

コネクタを実行するには、Dataflow Runner v2 を使用する必要があります。これを有効にするには、コマンドライン引数で --experiments=use_runner_v2 を指定します。Runner v1 で実行すると、次の例外によりパイプラインが失敗します。

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

スナップショット

コネクタは Dataflow のスナップショットをサポートしていません。

重複

Bigtable Beam コネクタは、各行キーと各クラスタの変更を commit タイムスタンプ順にストリーミングしますが、ストリーム内のより早い時点から再起動される可能性があるため、重複が発生する可能性があります。

始める前に

コネクタを使用する前に、次の前提条件を満たす必要があります。

認証の設定

このページの Java サンプルをローカル開発環境から使用するには、gcloud CLI をインストールして初期化し、自身のユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定してください。

  1. Google Cloud CLI をインストールします。
  2. gcloud CLI を初期化するには:

    gcloud init
  3. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login

詳細については、 ローカル開発環境の認証の設定 をご覧ください。

本番環境の認証の設定については、 Google Cloud で実行するコードのためのアプリケーションのデフォルト認証情報を設定するをご覧ください。

変更ストリームを有効にする

変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。新しいテーブルを作成して、変更ストリームを有効にすることもできます。

必要なロール

Dataflow を使用して Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

Bigtable から変更を読み取るには、次のロールが必要です。

  • 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin

Dataflow ジョブを実行するには、次のロールが必要です。

ロールの付与の詳細については、アクセスの管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Bigtable Beam コネクタを依存関係として追加する

Maven pom.xml ファイルに次の依存関係のようなコードを追加します。バージョンは 2.48.0 以降にする必要があります。

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

変更ストリームを読み取る

データ変更レコードを読み取る Dataflow パイプラインを構築するには、コネクタを構成してから、変換とシンクを追加します。次に、コネクタを使用して、Beam パイプラインの ChangeStreamMutation オブジェクトを読み込みます。

このセクションの Java で記述されたコードサンプルは、パイプラインを作成して使用し Key-Value ペアを文字列に変換する方法を示しています。各ペアは、行キーと ChangeStreamMutation オブジェクトで構成されます。このパイプラインは、各オブジェクトのエントリをカンマ区切りの文字列に変換します。

パイプラインをビルドする

次の Java のコードサンプルは、パイプラインの構築方法を示しています。

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

データ変更レコードを処理する

このサンプルは、行のデータ変更レコード内のエントリをすべてループ処理し、エントリのタイプに基づいて文字列の変換メソッドを呼び出す方法を示しています。

データ変更レコードに含めることができるエントリタイプのリストについては、データ変更レコードの内容をご覧ください。

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

このサンプルでは、書き込みエントリが変換されています。

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

この例では、セルの削除のエントリが変換されます。

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

このサンプルでは、列ファミリーの削除エントリが変換されています。


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

モニタリング

Google Cloud コンソールの次のリソースでは、Dataflow パイプラインを実行して Bigtable の変更ストリームを読み取る際に Google Cloud リソースをモニタリングできます。

特に、次の指標を確認してください。

  • Bigtable の [モニタリング] ページで、次の指標を確認します。
    • 指標 cpu_load_by_app_profile_by_method_by_table変更ストリーム別の CPU 使用率データ。変更ストリームがクラスタの CPU 使用率に与える影響を示します。
    • 変更ストリームのストレージ使用量(バイト)change_stream_log_used_bytes
  • Dataflow のモニタリング ページで、データの鮮度を確認します。これにより、現在の時刻とウォーターマークの違いが示されます。通常は 2 分程度ですが、急激な増大により 1~2 分長くなる場合もあります。データの鮮度の指標がこのしきい値を常に上回っていると、パイプラインがリソース不足になり、Dataflow ワーカーを追加する必要があります。 データの鮮度は、データ変更レコードの処理速度が遅いかどうかを示すものではありません。
  • Dataflow processing_delay_from_commit_timestamp_MEAN 指標は、ジョブの存続期間中のデータ変更レコードの平均処理時間を示します。

Bigtable の server/latencies 指標は、データ変更レコードの処理のレイテンシではなく、ストリーミング リクエストの期間を反映しているため、Bigtable 変更ストリームを読み取る Dataflow パイプラインをモニタリングする場合は役立ちません。して、ソース別にトラフィック データを分類します。変更ストリームのレイテンシが高くても、リクエストの処理が遅いわけではありません。これは、接続が長時間開かれていたことを意味します。

次のステップ