Java で変更ストリームを読み取る

Java 用 Cloud Bigtable クライアント ライブラリには、データ変更レコードを処理する低レベルのメソッドが用意されています。このドキュメントを読む前に、変更ストリームの概要をご覧ください。

準備

Java で変更ストリームを読み取る前に、次の前提条件を満たす必要があります。

認証の設定

このページの Java サンプルをローカル開発環境から使用するには、gcloud CLI をインストールして初期化し、自身のユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定してください。

  1. Google Cloud CLI をインストールします。
  2. gcloud CLI を初期化するには:

    gcloud init
  3. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login

詳細については、 ローカル開発環境の認証の設定 をご覧ください。

本番環境の認証の設定については、 Google Cloud で実行するコードのためのアプリケーションのデフォルト認証情報を設定するをご覧ください。

変更ストリームを有効にする

変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。新しいテーブルを作成して変更ストリームを有効にすることもできます。

必要なロール

Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

  • 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin

Java クライアント ライブラリを依存関係として追加する

Maven pom.xml ファイルに次のようなコードを追加します。VERSION は、使用しているクライアント ライブラリのバージョンに置き換えます。バージョンは 2.21.0 以降にする必要があります。

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

テーブルのパーティションを特定する

ReadChangeStream リクエストを送信するには、テーブルのパーティションを特定する必要があります。これは、GenerateInitialChangeStreamPartitions メソッドで特定できます。次の例では、このメソッドを使用して、テーブル内の各パーティションを表す ByteStringRanges のストリームを取得する方法を示しています。各 ByteStringRange には、パーティションの開始キーと終了キーが含まれています。

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

各パーティションの変更を処理する

次に、ReadChangeStream メソッドを使用して、各パーティションの変更を処理します。次の例は、現在の時刻からパーティションのストリームを開始する方法を示しています。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery は、以下の引数を受け入れます。

  • ストリーム パーティション(必須)- 変更のストリーミング元となるパーティション
  • 次のいずれかになります。
    • 開始時間 - 変更の処理を開始する時点の commit タイムスタンプ
    • 継続トークン - ストリーミングの再開位置を示すトークン
  • 終了時間(省略可)- 変更の処理を停止する時点の commit タイムスタンプ。値を指定しない場合、ストリームの読み取りが継続します。
  • ハートビート期間(省略可)- 新しい変更がない場合のハートビート メッセージの頻度(デフォルトは 5 秒)

変更ストリームのレコード形式

返される変更ストリーム レコードは、次のいずれかのレスポンス タイプになります。

  • ChangeStreamMutation - データ変更レコードを表すメッセージ。

  • CloseStream - クライアントがストリームからの読み取りを停止する必要があることを示すメッセージ。

    • ステータス - ストリームの終了理由を示します。次のいずれか:
      • OK - 指定されたパーティションの終了時刻に達しました。
      • OUT_OF_RANGE - 指定されたパーティションが存在しません。このパーティションで分割またはマージが行われています。新しいパーティションに対して、新しい ReadChangeStream リクエストを送信する必要があります。
    • NewPartitions - 更新されたパーティションの情報を提供します(OUT_OF_RANGE レスポンスが返された場合)。
    • ChangeStreamContinuationTokens - 同じ位置から新しい ReadChangeStream リクエストを再開するために使用されるトークンのリスト。NewPartition につき 1 つ。
  • Heartbeat - ストリームの状態を確認できる情報を含む定期的なメッセージ。

    • EstimatedLowWatermark - 指定されたパーティションの低ウォーターマークの見積もり。
    • ContinuationToken - 現在の位置から指定パーティションのストリーミングを再開するためのトークン。

データ変更レコードの内容

各データ変更レコードには以下のものが含まれます。

  • エントリ - 行に加えられた変更。以下に示す対象の 1 つ以上が含まれます。
    • 書き込み
      • 列ファミリー
      • 列修飾子
      • タイムスタンプ
    • セルの削除
      • 列ファミリー
      • 列修飾子
      • タイムスタンプの範囲
    • 列ファミリーの削除
      • 列ファミリー
      • 行からの削除 - 行にデータのある列ファミリーごとに、行からの削除が列ファミリーからの削除リストに変換されます。
  • 行キー - 変更された行の識別子
  • 変更タイプ - ユーザー開始またはガベージ コレクション
  • 変更を受け取ったクラスタの ID
  • commit タイムスタンプ - 変更がテーブルに commit された時点のサーバー側の時間
  • タイブレーカー - ストリームを読み取るアプリケーションが、Bigtable の組み込みの競合解決ポリシーを使用できるようにする値。
  • トークン - ストリームが中断された場合に、使用側のアプリケーションがストリームを再開するために使用します。
  • 低いウォーターマークの見積もり - レコードのパーティションがクラスタ全体のレプリケーションに追いついた後の推定時間。詳細については、パーティションウォーターマークをご覧ください。

データ変更レコードのフィールドの詳細については、ReadChangeStreamAPI リファレンスをご覧ください。

パーティションでの変更を処理する

テーブルのパーティションが変更されると、ReadChangeStream リクエストは、新しいパーティションからのストリーミングを再開するために必要な情報を含む CloseStream メッセージを返します。

分割の場合は、複数の新しいパーティションと、各パーティションの対応する ContinuationToken が含まれます。同じ位置から新しいパーティションのストリーミングを再開するには、対応するトークンを持つ新しいパーティションごとに新しい ReadChangeStream リクエストを送信します。

たとえば、パーティション [A,C) をストリーミングしているときに、このパーティションが [A,B)[B,C) の 2 つのパーティションに分割された場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

同じ位置から各パーティションのストリーミングを再開するには、次の ReadChangeStreamQuery リクエストを送信します。

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

マージの場合、同じパーティションから再開するには、マージされたパーティションの各トークンを含む新しい ReadChangeStream リクエストを送信する必要があります。

たとえば、2 つのパーティション([A,B)[B,C))をストリーミングしているときに、これらのパーティションが [A,C) マージされた場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

同じ位置からパーティション [A, C) のストリーミングを再開するには、次のような ReadChangeStreamQuery を送信します。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

次のステップ