Java で変更ストリームを読み取る
Java 用 Cloud Bigtable クライアント ライブラリには、データ変更レコードを処理する低レベルのメソッドが用意されています。このドキュメントを読む前に、変更ストリームの概要をご覧ください。
準備
Java で変更ストリームを読み取る前に、次の前提条件を満たす必要があります。
認証の設定
このページの Java サンプルをローカル開発環境から使用するには、gcloud CLI をインストールして初期化し、自身のユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定してください。
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google アカウントのローカル認証情報を作成します。
gcloud auth application-default login
詳細については、 ローカル開発環境の認証の設定 をご覧ください。
本番環境の認証の設定については、 Google Cloud で実行するコードのためのアプリケーションのデフォルト認証情報を設定するをご覧ください。
変更ストリームを有効にする
変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。新しいテーブルを作成して変更ストリームを有効にすることもできます。
必要なロール
Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。
- 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(
roles/bigtable.admin
)
Java クライアント ライブラリを依存関係として追加する
Maven pom.xml
ファイルに次のようなコードを追加します。VERSION
は、使用しているクライアント ライブラリのバージョンに置き換えます。バージョンは 2.21.0 以降にする必要があります。
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
テーブルのパーティションを特定する
ReadChangeStream
リクエストを送信するには、テーブルのパーティションを特定する必要があります。これは、GenerateInitialChangeStreamPartitions
メソッドで特定できます。次の例では、このメソッドを使用して、テーブル内の各パーティションを表す ByteStringRanges
のストリームを取得する方法を示しています。各 ByteStringRange
には、パーティションの開始キーと終了キーが含まれています。
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
各パーティションの変更を処理する
次に、ReadChangeStream
メソッドを使用して、各パーティションの変更を処理します。次の例は、現在の時刻からパーティションのストリームを開始する方法を示しています。
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
は、以下の引数を受け入れます。
- ストリーム パーティション(必須)- 変更のストリーミング元となるパーティション
- 次のいずれかになります。
- 開始時間 - 変更の処理を開始する時点の commit タイムスタンプ
- 継続トークン - ストリーミングの再開位置を示すトークン
- 終了時間(省略可)- 変更の処理を停止する時点の commit タイムスタンプ。値を指定しない場合、ストリームの読み取りが継続します。
- ハートビート期間(省略可)- 新しい変更がない場合のハートビート メッセージの頻度(デフォルトは 5 秒)
変更ストリームのレコード形式
返される変更ストリーム レコードは、次のいずれかのレスポンス タイプになります。
ChangeStreamMutation
- データ変更レコードを表すメッセージ。CloseStream
- クライアントがストリームからの読み取りを停止する必要があることを示すメッセージ。- ステータス - ストリームの終了理由を示します。次のいずれか:
OK
- 指定されたパーティションの終了時刻に達しました。OUT_OF_RANGE
- 指定されたパーティションが存在しません。このパーティションで分割またはマージが行われています。新しいパーティションに対して、新しいReadChangeStream
リクエストを送信する必要があります。
NewPartitions
- 更新されたパーティションの情報を提供します(OUT_OF_RANGE
レスポンスが返された場合)。ChangeStreamContinuationTokens
- 同じ位置から新しいReadChangeStream
リクエストを再開するために使用されるトークンのリスト。NewPartition
につき 1 つ。
- ステータス - ストリームの終了理由を示します。次のいずれか:
Heartbeat
- ストリームの状態を確認できる情報を含む定期的なメッセージ。EstimatedLowWatermark
- 指定されたパーティションの低ウォーターマークの見積もり。ContinuationToken
- 現在の位置から指定パーティションのストリーミングを再開するためのトークン。
データ変更レコードの内容
各データ変更レコードには以下のものが含まれます。
- エントリ - 行に加えられた変更。以下に示す対象の 1 つ以上が含まれます。
- 書き込み
- 列ファミリー
- 列修飾子
- タイムスタンプ
- 値
- セルの削除
- 列ファミリー
- 列修飾子
- タイムスタンプの範囲
- 列ファミリーの削除
- 列ファミリー
- 行からの削除 - 行にデータのある列ファミリーごとに、行からの削除が列ファミリーからの削除リストに変換されます。
- 書き込み
- 行キー - 変更された行の識別子
- 変更タイプ - ユーザー開始またはガベージ コレクション
- 変更を受け取ったクラスタの ID
- commit タイムスタンプ - 変更がテーブルに commit された時点のサーバー側の時間
- タイブレーカー - ストリームを読み取るアプリケーションが、Bigtable の組み込みの競合解決ポリシーを使用できるようにする値。
- トークン - ストリームが中断された場合に、使用側のアプリケーションがストリームを再開するために使用します。
- 低いウォーターマークの見積もり - レコードのパーティションがクラスタ全体のレプリケーションに追いついた後の推定時間。詳細については、パーティションとウォーターマークをご覧ください。
データ変更レコードのフィールドの詳細については、ReadChangeStream
の API リファレンスをご覧ください。
パーティションでの変更を処理する
テーブルのパーティションが変更されると、ReadChangeStream
リクエストは、新しいパーティションからのストリーミングを再開するために必要な情報を含む CloseStream
メッセージを返します。
分割の場合は、複数の新しいパーティションと、各パーティションの対応する ContinuationToken
が含まれます。同じ位置から新しいパーティションのストリーミングを再開するには、対応するトークンを持つ新しいパーティションごとに新しい ReadChangeStream
リクエストを送信します。
たとえば、パーティション [A,C)
をストリーミングしているときに、このパーティションが [A,B)
と [B,C)
の 2 つのパーティションに分割された場合、イベント シーケンスは次のようになります。
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
同じ位置から各パーティションのストリーミングを再開するには、次の ReadChangeStreamQuery
リクエストを送信します。
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
マージの場合、同じパーティションから再開するには、マージされたパーティションの各トークンを含む新しい ReadChangeStream
リクエストを送信する必要があります。
たとえば、2 つのパーティション([A,B)
と [B,C)
)をストリーミングしているときに、これらのパーティションが [A,C)
マージされた場合、イベント シーケンスは次のようになります。
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
同じ位置からパーティション [A, C)
のストリーミングを再開するには、次のような ReadChangeStreamQuery
を送信します。
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));