Java で変更ストリームを読み取る

Java 用 Cloud Bigtable クライアント ライブラリには、データ変更レコードを処理する低レベルのメソッドが用意されています。ただし、ほとんどの場合、Dataflow がパーティション分割を処理し、結合するため、このページで説明されている方法を使用する代わりに、Dataflow で変更をストリーミングすることをおすすめします。

始める前に

Java で変更ストリームを読み取る前に、変更ストリームの概要を理解しておいてください。そのうえで、次の前提条件を満たす必要があります。

認証の設定

このページの Java サンプルをローカル開発環境から使用するには、gcloud CLI をインストールして初期化し、自身のユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定してください。

  1. Google Cloud CLI をインストールします。
  2. gcloud CLI を初期化するには:

    gcloud init
  3. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login

詳細については、 ローカル開発環境の認証の設定 をご覧ください。

本番環境の認証の設定については、 Google Cloud で実行するコードのためのアプリケーションのデフォルト認証情報を設定するをご覧ください。

変更ストリームを有効にする

変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。新しいテーブルを作成して変更ストリームを有効にすることもできます。

必要なロール

Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

  • 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin

Java クライアント ライブラリを依存関係として追加する

Maven pom.xml ファイルに次のようなコードを追加します。VERSION は、使用しているクライアント ライブラリのバージョンに置き換えます。バージョンは 2.21.0 以降にする必要があります。

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

テーブルのパーティションを特定する

ReadChangeStream リクエストを送信するには、テーブルのパーティションを特定する必要があります。これは、GenerateInitialChangeStreamPartitions メソッドで特定できます。次の例では、このメソッドを使用して、テーブル内の各パーティションを表す ByteStringRanges のストリームを取得する方法を示しています。各 ByteStringRange には、パーティションの開始キーと終了キーが含まれています。

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

各パーティションの変更を処理する

次に、ReadChangeStream メソッドを使用して、各パーティションの変更を処理します。次の例は、現在の時刻からパーティションのストリームを開始する方法を示しています。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery は、以下の引数を受け入れます。

  • ストリーム パーティション(必須)- 変更のストリーミング元となるパーティション
  • 次のいずれかになります。
    • 開始時間 - 変更の処理を開始する時点の commit タイムスタンプ
    • 継続トークン - ストリーミングの再開位置を示すトークン
  • 終了時間(省略可)- 変更の処理を停止する時点の commit タイムスタンプ。値を指定しない場合、ストリームの読み取りが継続します。
  • ハートビート期間(省略可)- 新しい変更がない場合のハートビート メッセージの頻度(デフォルトは 5 秒)

変更ストリームのレコード形式

返される変更ストリーム レコードは、次のいずれかのレスポンス タイプになります。

  • ChangeStreamMutation - データ変更レコードを表すメッセージ。

  • CloseStream - クライアントがストリームからの読み取りを停止する必要があることを示すメッセージ。

    • ステータス - ストリームの終了理由を示します。次のいずれか:
      • OK - 指定されたパーティションの終了時刻に達しました。
      • OUT_OF_RANGE - 指定されたパーティションが存在しません。このパーティションで分割またはマージが行われています。新しいパーティションに対して、新しい ReadChangeStream リクエストを送信する必要があります。
    • NewPartitions - 更新されたパーティションの情報を提供します(OUT_OF_RANGE レスポンスが返された場合)。
    • ChangeStreamContinuationTokens - 同じ位置から新しい ReadChangeStream リクエストを再開するために使用されるトークンのリスト。NewPartition につき 1 つ。
  • Heartbeat - ストリームの状態を確認できる情報を含む定期的なメッセージ。

    • EstimatedLowWatermark - 指定されたパーティションの低ウォーターマークの見積もり。
    • ContinuationToken - 現在の位置から指定パーティションのストリーミングを再開するためのトークン。

データ変更レコードの内容

変更ストリーム レコードの詳細については、データ変更レコードの内容をご覧ください。

パーティションでの変更を処理する

テーブルのパーティションが変更されると、ReadChangeStream リクエストは、新しいパーティションからのストリーミングを再開するために必要な情報を含む CloseStream メッセージを返します。

分割の場合は、複数の新しいパーティションと、各パーティションの対応する ContinuationToken が含まれます。同じ位置から新しいパーティションのストリーミングを再開するには、対応するトークンを持つ新しいパーティションごとに新しい ReadChangeStream リクエストを送信します。

たとえば、パーティション [A,C) をストリーミングしているときに、このパーティションが [A,B)[B,C) の 2 つのパーティションに分割された場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

同じ位置から各パーティションのストリーミングを再開するには、次の ReadChangeStreamQuery リクエストを送信します。

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

マージの場合、同じパーティションから再開するには、マージされたパーティションの各トークンを含む新しい ReadChangeStream リクエストを送信する必要があります。

たとえば、2 つのパーティション([A,B)[B,C))をストリーミングしているときに、これらのパーティションが [A,C) マージされた場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

同じ位置からパーティション [A, C) のストリーミングを再開するには、次のような ReadChangeStreamQuery を送信します。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

次のステップ