このページでは、変更ストリームの次の属性について詳しく説明します。
- 分割ベースのパーティショニング モデル
- 変更ストリーム レコードの形式と内容
- レコードの照会に使用する低レベル構文
- クエリ ワークフローの例
このページの情報は、Spanner API を使用して変更ストリームを直接クエリする場合に最も関連性があります。代わりに Dataflow を使用して変更ストリーム データを読み取るアプリケーションでは、ここで説明するデータモデルを直接操作する必要はありません。
ストリームの変更に関する広範な入門ガイドについては、変更ストリームの概要をご覧ください。
ストリーム パーティションの変更
変更ストリームで監視されるテーブルが変更されると、Cloud Spanner はデータ変更と同じトランザクションで、対応する変更ストリーム レコードをデータベースに書き込みます。こうすれば、トランザクションが成功すると、Spanner は変更を正常にキャプチャして永続化します。内部的には、Spanner は変更ストリーム レコードとデータ変更が同じ場所に配置され、書き込みオーバーヘッドを最小限に抑えます。
特定のスプリットに DML の一部として、Spanner は同じ書き込み内の対応する変更ストリーム データ分割に書き込みを追加します。このコロケーションにより、変更ストリームによってサービス リソース間の余分な調整が行われず、トランザクションの commit オーバーヘッドが最小限に抑えられます。
Spanner は、データベースの負荷とサイズに基づいてデータを動的に分割してマージし、サービス リソース間でスプリットを分散することでスケーリングします。
変更ストリームの書き込みと読み取りのスケーリングを可能にするために、Spanner は内部の変更ストリーム ストレージをデータベース データとともに分割し、マージするため、自動的にホットスポットを防止します。データベースの書き込み規模に合わせてほぼリアルタイムで変更ストリーム レコードを読み取るため、Spanner API は変更ストリーム パーティションを使用して変更ストリームを同時にクエリできるように設計されています。変更ストリーム レコードを含むストリーム データ分割を変更するには、ストリーム パーティション マップを変更します。変更ストリームのパーティションは時間の経過とともに動的に変化し、Spanner がデータベース データを動的に分割およびマージする方法に相関しています。
変更ストリーム パーティションには、特定の期間の不変のキー範囲のレコードが含まれています。変更ストリーム パーティションは、1 つ以上の変更ストリーム パーティションに分割することや、他の変更ストリーム パーティションとマージすることができます。これらの分割イベントまたはマージ イベントが発生すると、次のパーティションで変更できないキー範囲の変更をキャプチャするために、子パーティションが作成されます。データ変更レコードに加えて、変更ストリーム クエリは、子パーティション レコードを返して、クエリを実行する必要がある新しい変更ストリーム パーティションを読者に通知します。また、書き込みがない場合に前進を表すハートビート レコードを返します。最近発生したものです。
特定の変更ストリーム パーティションをクエリすると、変更レコードは commit タイムスタンプ順に返されます。 各変更レコードは 1 回だけ返されます。変更ストリーム パーティション間で、変更レコードの順序が保証されるわけではありません。特定の主キーの変更レコードは、特定の時間範囲の 1 つのパーティションでのみ返されます。
親子パーティションのリネージのため、commit タイムスタンプ順で特定のキーの変更を処理するには、子パーティションから返されたレコードは、すべての親パーティションからのレコードが処理された後にのみ処理する必要があります。
ストリーム クエリの構文を変更する
GoogleSQL
変更ストリームにクエリを実行するには、ExecuteStreamingSql
API を使用します。 特別なストリームの関数(TVF)が、変更ストリームとともに自動的に作成されます。変更ストリームのレコードにアクセスできます。TVF の命名規則は READ_change_stream_name
です。
データベースに変更ストリーム SingersNameStream
が存在すると仮定すると、GoogleSQL のクエリ構文は次のようになります。
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds
)
この関数は次の引数を受け入れます。
引数名 | Type | 要否 | 説明 |
---|---|---|---|
start_timestamp |
TIMESTAMP |
必須 | commit_timestamp が start_timestamp 以上のレコードが返されるように指定します。値は変更ストリームの保持期間内にする必要があり、現在の時刻以下で、変更ストリームの作成のタイムスタンプ以上である必要があります。 |
end_timestamp |
TIMESTAMP |
省略可(デフォルト: NULL ) |
commit_timestamp が end_timestamp 以下のレコードを返すことを指定します。値は変更ストリームの保持期間内にあり、start_timestamp 以上でなければなりません。クエリは、end_timestamp までのすべての ChangeRecords を返すか、ユーザーが接続を終了した後に終了します。NULL または指定されていない場合、すべての ChangeRecords が返されるか、ユーザーが接続を終了するまで、クエリが実行されます。 |
partition_token |
STRING |
省略可(デフォルト: NULL ) |
子パーティション レコードの内容に基づいて、クエリを実行する変更パーティションを指定します。NULL または指定されていない場合、リーダーは初めて変更ストリームをクエリし、クエリ対象の特定のパーティション トークンは取得しません。 |
heartbeat_milliseconds |
INT64 |
必須 | パーティションに commit されたトランザクションがない場合にハートビートの ChangeRecord が返される頻度を指定します。値は 1,000 (1 秒)~30,0000 (5 分)の範囲にする必要があります。 |
次の例に示すように、TVF クエリのテキストを作成し、それにパラメータをバインドするために、便利なメソッドを作成することをおすすめします。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
変更ストリームにクエリを実行するには、ExecuteStreamingSql
API を使用します。変更ストリームとともに特別な関数が自動的に作成されます。変更ストリームのレコードにアクセスできます。関数の命名規則は spanner.read_json_change_stream_name
です。
データベースに変更ストリーム SingersNameStream
が存在すると仮定すると、PostgreSQL のクエリ構文は次のようになります。
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
この関数は次の引数を受け入れます。
引数名 | Type | 要否 | 説明 |
---|---|---|---|
start_timestamp |
timestamp with time zone |
必須 | commit_timestamp が start_timestamp 以上の変更レコードを返すように指定します。値は変更ストリームの保持期間内にする必要があり、現在の時刻以下で、変更ストリームの作成のタイムスタンプ以上である必要があります。 |
end_timestamp |
timestamp with timezone |
デフォルト: NULL |
commit_timestamp が end_timestamp 以下の変更レコードを返すように指定します。値は変更ストリームの保持期間内に、start_timestamp 以上でなければなりません。クエリは、end_timestamp までのすべての変更レコードを返すか、ユーザーが接続を終了した後に終了します。NULL の場合、すべての変更レコードが返されるか、ユーザーが接続を終了するまで、クエリが実行されます。 |
partition_token |
text |
省略可(デフォルト: NULL ) |
子パーティション レコードの内容に基づいて、クエリを実行する変更パーティションを指定します。NULL または指定されていない場合、リーダーは初めて変更ストリームをクエリし、クエリ対象の特定のパーティション トークンは取得しません。 |
heartbeat_milliseconds |
bigint |
必須 | このパーティションで commit されたトランザクションがない場合に、ハートビート ChangeRecord が返される頻度を決定します。値は 1,000 (1 秒)~300,000 (5 分)の範囲にする必要があります。 |
null |
null |
必須 | 将来の使用のために予約 |
次の例に示すように、関数のテキストを作成し、それにパラメータをバインドするための便利なメソッドを作成することをおすすめします。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
ストリーム レコード形式を変更する
GoogleSQL
変更ストリーム TVF は、型 ARRAY<STRUCT<...>>
の単一の ChangeRecord 列を返します。各行には、常に 1 つの要素が含まれています。
配列要素には次の型があります。
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
この構造体には 3 つのフィールド data_change_record
、heartbeat_record
、child_partitions_record
があり、それぞれが ARRAY<STRUCT<...>>
型です。変更ストリーム TVF から返される行では、次の 3 つのフィールドのいずれか 1 つのみに値が含まれます。残りの 2 つは空または NULL
です。これらの配列フィールドに格納される要素は 1 つだけです。
以下では、この 3 つのレコードタイプについて説明します。
PostgreSQL
変更ストリーム関数は、次の構造の JSON
型の単一の ChangeRecord 列を返します。
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
このオブジェクトには、data_change_record
、heartbeat_record
、child_partitions_record
の 3 つのキーがあります。対応する値の型は JSON
です。変更ストリーム関数から返される行には、この 3 つのキーのうち 1 つだけが存在します。
以下では、この 3 つのレコードタイプについて説明します。
データ変更レコード
データ変更レコードには、同じトランザクションの 1 つの変更ストリーム パーティションに同じ commit タイムスタンプで commit された同じ変更タイプ(挿入、更新、削除)を持つテーブルに対する変更のセットが含まれます。同じトランザクションに対して、複数の変更ストリーム パーティション間で複数のデータ変更レコードを返すことができます。
すべてのデータ変更レコードには、commit_timestamp
、server_transaction_id
、record_sequence
のフィールドがあり、ストリーム レコードの変更ストリームの順序が決定されます。この 3 つのフィールドで、変更の順序を決定し、外部整合性を確保できます。
重複しないデータに触れる場合、複数のトランザクションが同じ commit タイムスタンプを持つ可能性があります。server_transaction_id
フィールドを使用すると、同じトランザクション内で発行された一連の変更(変更ストリーム パーティション間での可能性がある)を区別できます。record_sequence
フィールドと number_of_records_in_transaction
フィールドと組み合わせることで、特定のトランザクションのすべてのレコードをバッファして順序付けできます。
データ変更レコードのフィールドは次のとおりです。
GoogleSQL
フィールド | タイプ | 説明 |
---|---|---|
commit_timestamp |
TIMESTAMP |
変更が commit されたタイムスタンプ。 |
record_sequence |
STRING |
トランザクション内のレコードのシーケンス番号。シーケンス番号は、一意で、単調に増加する(必ずしも連続ではない)ことが保証されています。同じ server_transaction_id のレコードを record_sequence で並べ替え、トランザクション内の変更の順序を再構築します。 |
server_transaction_id |
STRING |
変更が commit されたトランザクションを表すグローバルに一意の文字列。この値は、変更ストリーム レコードの処理中にのみ使用し、Spanner の API のトランザクション ID とは関連付けないでください。 |
is_last_record_in_transaction_in_partition |
BOOL |
これが現在のパーティション内のトランザクションの最後のレコードかどうかを示します。 |
table_name |
STRING |
変更の影響を受けるテーブルの名前。 |
value_capture_type |
STRING |
この変更がキャプチャされたときに変更ストリーム構成で指定された値キャプチャ タイプを記述します。 値キャプチャ タイプは、 |
column_types |
ARRAY<STRUCT< |
列の名前、列のタイプ、主キーかどうか、スキーマで定義されている列の位置(「ordinal_position」など)。スキーマ内のテーブルの最初の列は、1 の順序位置になります。配列列の場合、列のタイプをネストできます。形式は、Spanner API リファレンスで説明されている型構造と一致します。 |
mods |
ARRAY<STRUCT< |
主キーの値、古い値、変更または追跡列の新しい値など、行われた変更について説明します。
古い値と新しい値の可用性と内容は、構成された value_capture_type によって異なります。new_values フィールドと old_values フィールドには非キー列のみが含まれます。 |
mod_type |
STRING |
変更のタイプを説明します。INSERT 、UPDATE 、DELETE のいずれかです。 |
number_of_records_in_transaction |
INT64 |
すべての変更ストリーム パーティション全体でこのトランザクションに含まれるデータ変更レコードの数。 |
number_of_partitions_in_transaction |
INT64 |
このトランザクションのデータ変更レコードを返すパーティションの数。 |
transaction_tag |
STRING |
このトランザクションに関連付けられた トランザクション タグ。 |
is_system_transaction |
BOOL |
トランザクションがシステム トランザクションかどうかを示します。 |
PostgreSQL
フィールド | タイプ | 説明 |
---|---|---|
commit_timestamp |
STRING |
変更が commit されたタイムスタンプ。 |
record_sequence |
STRING |
トランザクション内のレコードのシーケンス番号。シーケンス番号は、一意で、単調に増加する(必ずしも連続するではない)ことが保証されています。同じ「server_transaction_id」のレコードを「record_sequence」で並べ替えて、トランザクション内の変更の順序を再構築します。 |
server_transaction_id |
STRING |
変更が commit されたトランザクションを表すグローバルに一意の文字列。この値は、変更ストリーム レコードを処理するコンテキストでのみ使用し、Spanner の API のトランザクション ID とは関連付けないでください。 |
is_last_record_in_transaction_in_partition |
BOOLEAN |
これが現在のパーティション内のトランザクションの最後のレコードかどうかを示します。 |
table_name |
STRING |
変更の影響を受けるテーブルの名前。 |
value_capture_type |
STRING |
この変更がキャプチャされたときに変更ストリーム構成で指定された値キャプチャ タイプを記述します。 値キャプチャ タイプは、 |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
列の名前、列のタイプ、主キーかどうか、スキーマで定義されている列の位置(「ordinal_position」など)。スキーマ内のテーブルの最初の列は、1 の順序位置になります。配列列の場合、列のタイプをネストできます。形式は、Spanner API リファレンスで説明されている型構造と一致します。 |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
主キーの値、古い値、変更または追跡列の新しい値など、行われた変更について説明します。古い値と新しい値の可用性と内容は、構成された value_capture_type によって異なります。new_values フィールドと old_values フィールドには、非キー列のみが含まれます。 |
mod_type |
STRING |
変更のタイプを説明します。INSERT 、UPDATE 、DELETE のいずれかです。 |
number_of_records_in_transaction |
INT64 |
すべての変更ストリーム パーティション全体でこのトランザクションに含まれるデータ変更レコードの数。 |
number_of_partitions_in_transaction |
NUMBER |
このトランザクションのデータ変更レコードを返すパーティションの数。 |
transaction_tag |
STRING |
このトランザクションに関連付けられた トランザクション タグ。 |
is_system_transaction |
BOOLEAN |
トランザクションがシステム トランザクションかどうかを示します。 |
次の 2 つのサンプルデータ変更レコードがあります。2 つのアカウント間の転送がある単一のトランザクションを表します。2 つのアカウントは別々の変更ストリーム パーティションにあることに注意してください。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
次のデータ変更レコードは、値キャプチャ タイプが "NEW_VALUES"
のレコードの例です。新しい値のみが入力されることに注意してください。"LastUpdate"
列のみが変更されたため、その列のみが返されました。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
次のデータ変更レコードは、値キャプチャ タイプが "NEW_ROW"
のレコードの例です。"LastUpdate"
列のみが変更されましたが、すべての追跡列が返されます。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
ハートビート レコード
ハートビート レコードが返される場合、commit_timestamp
がハートビート レコードの timestamp
以下であるすべての変更が返され、このパーティション内の将来のデータレコードが必要であることを示します。 commit タイムスタンプがハートビート レコードよりも大きくなります。パーティションに書き込まれたデータの変更がない場合、ハートビート レコードが返されます。パーティションに書き込まれたデータの変更がある場合、heartbeat_record.timestamp
の代わりに data_change_record.commit_timestamp
を使用して、リーダーがパーティションの読み取りを進めていることを通知できます。
パーティションで返されるハートビート レコードを使用して、すべてのパーティションでリーダーを同期できます。すべてのリーダーがタイムスタンプ A
以上のハートビートを受信するか、タイムスタンプ A
以上のデータまたは子パーティション レコードを受信すると、受信したことを認識します。そのタイムスタンプ A
以前に commit されたすべてのレコードについて、バッファ内のレコードの処理を開始できます。たとえば、クロスパーティション レコードをタイムスタンプで並べ替え、server_transaction_id
でグループ化できます。
ハートビート レコードには、次の 1 つのフィールドのみが含まれます。
GoogleSQL
フィールド | タイプ | 説明 |
---|---|---|
timestamp |
TIMESTAMP |
ハートビート レコードのタイムスタンプ。 |
PostgreSQL
フィールド | タイプ | 説明 |
---|---|---|
timestamp |
STRING |
ハートビート レコードのタイムスタンプ。 |
ハートビート レコードの例。タイムスタンプがこのレコードのタイムスタンプと同じかそれよりも小さいレコードがすべて返されたことを示しています。
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
子パーティション レコード
子パーティション レコードは、子パーティションに関する情報(パーティション トークン、親パーティションのトークン、start_timestamp
子パーティションに変更レコードが含まれる最初のタイムスタンプを表します。commit タイムスタンプが child_partitions_record.start_timestamp
の直前のレコードが現在のパーティションに返されます。このパーティションのすべての子パーティション レコードを返すと、このクエリは成功ステータスで返され、このパーティションのすべてのレコードが返されたことを示します。
子パーティション レコードのフィールドは次のとおりです。
GoogleSQL
フィールド | タイプ | 説明 |
---|---|---|
start_timestamp |
TIMESTAMP |
この子パーティション レコードの子パーティションから返されるデータ変更レコードには、start_timestamp 以上の commit タイムスタンプがあります。子パーティションをクエリする場合は、子パーティションのトークンと start_timestamp 以上の child_partitions_token.start_timestamp を指定する必要があります。パーティションによって返されるすべての子パーティション レコードは同じ start_timestamp を持ち、タイムスタンプは常にクエリで指定された start_timestamp と end_timestamp の間にあります。 |
record_sequence |
STRING |
特定のパーティションに同じ start_timestamp を持つ複数の子パーティション レコードが返された場合に、子パーティション レコードの順序を定義するために使用できる単調に増加するシーケンス番号。パーティション トークン start_timestamp と record_sequence は、子パーティション レコードを一意に識別します。 |
child_partitions |
ARRAY<STRUCT< |
子パーティションとそれに関連する情報のセットを返します。 これには、クエリ内の子パーティションを識別するために使用されるパーティション トークン文字列と、その親パーティションのトークンが含まれます。 |
PostgreSQL
フィールド | タイプ | 説明 |
---|---|---|
start_timestamp |
STRING |
この子パーティション レコードの子パーティションから返されるデータ変更レコードには、start_timestamp 以上の commit タイムスタンプがあります。子パーティションをクエリする場合は、子パーティションのトークンと start_timestamp 以上の child_partitions_token.start_timestamp を指定する必要があります。パーティションによって返されるすべての子パーティション レコードは同じ start_timestamp を持ち、タイムスタンプは常にクエリで指定された start_timestamp と end_timestamp の間にあります。 |
record_sequence |
STRING |
特定のパーティションに同じ start_timestamp を持つ複数の子パーティション レコードが返された場合に、子パーティション レコードの順序を定義するために使用できる単調に増加するシーケンス番号。パーティション トークン start_timestamp と record_sequence は、子パーティション レコードを一意に識別します。 |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
子パーティションと関連する情報の配列を返します。 これには、クエリ内の子パーティションを識別するために使用されるパーティション トークン文字列と、その親パーティションのトークンが含まれます。 |
子パーティション レコードの例を次に示します。
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
変更ストリーム クエリのワークフロー
単回使用の読み取り専用トランザクションと強力な ExecuteStreamingSql
API と タイムスタンプ バウンドを使用して、変更ストリーム クエリを実行します。変更ストリーム TVF では、目的の期間の start_timestamp
と end_timestamp
を指定できます。保持期間内のすべての変更レコードには、強力な読み取り専用タイムスタンプ バウンドを使用してアクセスできます。
その他すべての TransactionOptions
は、変更ストリームのクエリに対して無効です。さらに、TransactionOptions.read_only.return_read_timestamp
が true に設定されている場合、トランザクションを読み取る Transaction
メッセージで、有効なタイムスタンプの読み取りの代わりに、特別な値 kint64max - 1
が返されます。この特殊な値は破棄し、後続のクエリには使用しないでください。
それぞれの変更ストリーム クエリから任意の数の行を返すことができ、各行にはデータ変更レコード、ハートビート レコード、または子パーティション レコードのいずれかが含まれます。リクエストの期限を設定する必要はありません。
例:
ストリーミング クエリのワークフローでは、まず partition_token
を NULL
に指定して、最初の変更ストリーム クエリを発行します。クエリでは、変更ストリームの関数、対象となる開始タイムスタンプと終了タイムスタンプ、ハートビート間隔を指定する必要があります。end_timestamp
が NULL
の場合、クエリはパーティションが終了するまでデータの変更を返し続けます。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01 09:00:00-00",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01 09:00:00-00',
NULL,
NULL,
10000,
NULL
) ;
子パーティション レコードが返されるまで、このクエリのデータレコードを処理する。以下の例では、2 つの子パーティション レコードと 3 つのパーティション トークンが返され、クエリは終了します。特定のクエリからの子パーティション レコードは常に start_timestamp
を共有します。
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01 09:00:01-00",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01 09:00:01-00",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
2022-05-01 09:00:01-00
の後に将来の変更を処理するには、3 つの新しいクエリを作成して同時に実行します。3 つのクエリは、親がカバーしている同じキー範囲の今後のデータ変更を返します。常に同じ子パーティション レコード内の start_timestamp
を start_timestamp
に設定し、同じ end_timestamp
とハートビート間隔を使用して、すべてのクエリでレコードを一貫して処理します。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01 09:00:01-00",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01 09:00:01-00",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01 09:00:01-00",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01 09:00:01-00',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01 09:00:01-00',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01 09:00:01-00',
NULL,
'child_token_3',
10000,
NULL
);
しばらくすると、別の子パーティション レコードが返されると child_token_2
のクエリが終了します。このレコードは、child_token_2
と child_token_3
の両方で、新しいパーティションが2022-05-01 09:30:15-00
カバーされます。まったく同じレコードが child_token_3
に対するクエリによって返されます。どちらも新しい child_token_4
の親パーティションであるためです。特定のキーのデータレコードの厳密な順序付けを保証するため、次のクエリでは、child_token_4
すべての親が終了した後でのみ開始する必要があります。child_token_2
さらに、次のものがあるとします。child_token_3
。子パーティション トークンごとにクエリを 1 つだけ作成し、クエリ ワークフロー設計では、待機して child_token_4
でクエリをスケジュールする親を 1 つ指定します。
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01 09:30:15-00",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01 09:30:15-00",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01 09:30:15-00',
NULL,
'child_token_4',
10000,
NULL
);
変更ストリーム レコードの処理と解析の例は、GitHub の Apache Beam SpannerIO Dataflow コネクタで確認できます。