Java で変更ストリームを読み取る

Java 用 Cloud Bigtable クライアント ライブラリには、データ変更レコードを処理する低レベルのメソッドが用意されています。ただし、ほとんどの場合、Dataflow がパーティション分割を処理し、結合するため、このページで説明されている方法を使用する代わりに、Dataflow で変更をストリーミングすることをおすすめします。

始める前に

Java で変更ストリームを読む前に、変更ストリームの概要を十分に理解してください。その後、次の前提条件を完了します。

認証の設定

ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

詳細については Set up authentication for a local development environment をご覧ください。

本番環境での認証の設定については、 Set up Application Default Credentials for code running on Google Cloudをご覧ください。

変更ストリームを有効にする

変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。変更ストリームを有効にして新しいテーブルを作成することもできます。

必要なロール

Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

  • 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin

Java クライアント ライブラリを依存関係として追加する

Maven pom.xml ファイルに次のようなコードを追加します。VERSION は、使用しているクライアント ライブラリのバージョンに置き換えます。バージョンは 2.21.0 以降にする必要があります。

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

テーブルのパーティションを特定する

ReadChangeStream リクエストを開始するには、テーブルのパーティションを把握する必要があります。これは、GenerateInitialChangeStreamPartitions メソッドを使用して把握できます。次の例では、このメソッドを使用して、テーブル内の各パーティションを表す ByteStringRanges のストリームを取得する方法を示しています。各 ByteStringRange には、パーティションの開始キーと終了キーが含まれています。

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

各パーティションの変更を処理する

次に、ReadChangeStream メソッドを使用して、各パーティションの変更を処理します。次の例は、現在の時刻からパーティションのストリームを開始する方法を示しています。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery は、以下の引数を受け入れます。

  • ストリーム パーティション(必須)- 変更のストリーミング元となるパーティション
  • 次のいずれかです。
    • 開始時刻 - 変更の処理を開始する時点の commit タイムスタンプ
    • 継続トークン - ストリーミングを再開する位置を示すトークン
  • 終了時刻(省略可)- 上限に達したときに変更の処理を停止する時点の commit タイムスタンプ。値を指定しない場合、ストリームの読み取りが継続します。
  • ハートビートの期間(省略可)- 新しい変更がない場合のハートビート メッセージの頻度(デフォルトは 5 秒)

変更ストリームのレコード形式

返される変更ストリーム レコードは、次のいずれかのレスポンス タイプになります。

  • ChangeStreamMutation - データ変更レコードを表すメッセージ。

  • CloseStream - クライアントがストリームからの読み取りを停止する必要があることを示すメッセージ。

    • ステータス - ストリームの終了理由を示します。次のいずれか:
      • OK - 指定されたパーティションの終了時刻に達しました。
      • OUT_OF_RANGE - 指定されたパーティションはすでに存在しません。つまり、このパーティションでスプリットまたはマージが行われました。新しいパーティションに対して、新しい ReadChangeStream リクエストを送信する必要があります。
    • NewPartitions - 更新されたパーティションの情報を提供します(OUT_OF_RANGE レスポンスが返された場合)。
    • ChangeStreamContinuationTokens - 同じ位置から新しい ReadChangeStream リクエストを再開するために使用されるトークンのリスト。NewPartition につき 1 つ。
  • Heartbeat - ストリームの状態を確認できる情報を含む定期的なメッセージ。

    • EstimatedLowWatermark - 指定されたパーティションの低ウォーターマークの見積もり。
    • ContinuationToken - 現在の位置から指定パーティションのストリーミングを再開するためのトークン。

データ変更レコードの内容

変更ストリーム レコードの詳細については、データ変更レコードの内容をご覧ください。

パーティションでの変更を処理する

テーブルのパーティションが変更されると、ReadChangeStream リクエストは、新しいパーティションからのストリーミングを再開するために必要な情報を含む CloseStream メッセージを返します。

スプリットの場合は、複数の新しいパーティションと、各パーティションの対応する ContinuationToken が含まれます。同じ位置から新しいパーティションのストリーミングを再開するには、対応するトークンを持つ新しいパーティションごとに新しい ReadChangeStream リクエストを行います。

たとえば、パーティション [A,C) をストリーミングしているときに、このパーティションが [A,B)[B,C) の 2 つのパーティションに分割された場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

同じ位置から各パーティションのストリーミングを再開するには、次の ReadChangeStreamQuery リクエストを送信します。

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

マージの場合、同じパーティションから再開するには、マージされたパーティションの各トークンを含む新しい ReadChangeStream リクエストを送信する必要があります。

たとえば、2 つのパーティション([A,B)[B,C))をストリーミングしているときに、これらのパーティションが [A,C) マージされた場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

同じ位置からパーティション [A, C) のストリーミングを再開するには、次のような ReadChangeStreamQuery を送信します。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

次のステップ