Java で変更ストリームを読み取る
Java 用 Cloud Bigtable クライアント ライブラリには、データ変更レコードを処理する低レベルのメソッドが用意されています。ただし、ほとんどの場合、Dataflow がパーティション分割を処理し、結合するため、このページで説明されている方法を使用する代わりに、Dataflow で変更をストリーミングすることをおすすめします。
始める前に
Java で変更ストリームを読む前に、変更ストリームの概要を十分に理解してください。その後、次の前提条件を完了します。
認証の設定
ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
詳細については Set up authentication for a local development environment をご覧ください。
本番環境での認証の設定については、 Set up Application Default Credentials for code running on Google Cloudをご覧ください。
変更ストリームを有効にする
変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。変更ストリームを有効にして新しいテーブルを作成することもできます。
必要なロール
Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。
- 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(
roles/bigtable.admin
)
Java クライアント ライブラリを依存関係として追加する
Maven pom.xml
ファイルに次のようなコードを追加します。VERSION
は、使用しているクライアント ライブラリのバージョンに置き換えます。バージョンは 2.21.0 以降にする必要があります。
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
テーブルのパーティションを特定する
ReadChangeStream
リクエストを開始するには、テーブルのパーティションを把握する必要があります。これは、GenerateInitialChangeStreamPartitions
メソッドを使用して把握できます。次の例では、このメソッドを使用して、テーブル内の各パーティションを表す ByteStringRanges
のストリームを取得する方法を示しています。各 ByteStringRange
には、パーティションの開始キーと終了キーが含まれています。
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
各パーティションの変更を処理する
次に、ReadChangeStream
メソッドを使用して、各パーティションの変更を処理します。次の例は、現在の時刻からパーティションのストリームを開始する方法を示しています。
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
は、以下の引数を受け入れます。
- ストリーム パーティション(必須)- 変更のストリーミング元となるパーティション
- 次のいずれかです。
- 開始時刻 - 変更の処理を開始する時点の commit タイムスタンプ
- 継続トークン - ストリーミングを再開する位置を示すトークン
- 終了時刻(省略可)- 上限に達したときに変更の処理を停止する時点の commit タイムスタンプ。値を指定しない場合、ストリームの読み取りが継続します。
- ハートビートの期間(省略可)- 新しい変更がない場合のハートビート メッセージの頻度(デフォルトは 5 秒)
変更ストリームのレコード形式
返される変更ストリーム レコードは、次のいずれかのレスポンス タイプになります。
ChangeStreamMutation
- データ変更レコードを表すメッセージ。CloseStream
- クライアントがストリームからの読み取りを停止する必要があることを示すメッセージ。- ステータス - ストリームの終了理由を示します。次のいずれか:
OK
- 指定されたパーティションの終了時刻に達しました。OUT_OF_RANGE
- 指定されたパーティションはすでに存在しません。つまり、このパーティションでスプリットまたはマージが行われました。新しいパーティションに対して、新しいReadChangeStream
リクエストを送信する必要があります。
NewPartitions
- 更新されたパーティションの情報を提供します(OUT_OF_RANGE
レスポンスが返された場合)。ChangeStreamContinuationTokens
- 同じ位置から新しいReadChangeStream
リクエストを再開するために使用されるトークンのリスト。NewPartition
につき 1 つ。
- ステータス - ストリームの終了理由を示します。次のいずれか:
Heartbeat
- ストリームの状態を確認できる情報を含む定期的なメッセージ。EstimatedLowWatermark
- 指定されたパーティションの低ウォーターマークの見積もり。ContinuationToken
- 現在の位置から指定パーティションのストリーミングを再開するためのトークン。
データ変更レコードの内容
変更ストリーム レコードの詳細については、データ変更レコードの内容をご覧ください。
パーティションでの変更を処理する
テーブルのパーティションが変更されると、ReadChangeStream
リクエストは、新しいパーティションからのストリーミングを再開するために必要な情報を含む CloseStream
メッセージを返します。
スプリットの場合は、複数の新しいパーティションと、各パーティションの対応する ContinuationToken
が含まれます。同じ位置から新しいパーティションのストリーミングを再開するには、対応するトークンを持つ新しいパーティションごとに新しい ReadChangeStream
リクエストを行います。
たとえば、パーティション [A,C)
をストリーミングしているときに、このパーティションが [A,B)
と [B,C)
の 2 つのパーティションに分割された場合、イベント シーケンスは次のようになります。
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
同じ位置から各パーティションのストリーミングを再開するには、次の ReadChangeStreamQuery
リクエストを送信します。
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
マージの場合、同じパーティションから再開するには、マージされたパーティションの各トークンを含む新しい ReadChangeStream
リクエストを送信する必要があります。
たとえば、2 つのパーティション([A,B)
と [B,C)
)をストリーミングしているときに、これらのパーティションが [A,C)
マージされた場合、イベント シーケンスは次のようになります。
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
同じ位置からパーティション [A, C)
のストリーミングを再開するには、次のような ReadChangeStreamQuery
を送信します。
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));