ストリーム パーティション、レコード、クエリを変更する

このページでは、変更ストリームの次の属性について詳しく説明します。

  • スプリットベースのパーティショニング モデル
  • 変更ストリーム レコードの形式と内容
  • これらのレコードのクエリに使用する低レベルの構文
  • クエリ ワークフローの例

このページの情報は、Spanner API を使用して変更ストリームを直接クエリする場合に最適です。代わりに Dataflow を使用して変更ストリーム データを読み取るアプリケーションでは、ここで説明するデータモデルを直接操作する必要はありません。

変更ストリームに関するより広範な入門ガイドについては、変更ストリームの概要をご覧ください。

変更ストリーム パーティション

変更ストリームが監視しているテーブルが変更されると、Spanner はデータ変更と同じトランザクションで同期的に、対応する変更ストリーム レコードをデータベースに書き込みます。これにより確実に、トランザクションが成功した場合に、Spanner は変更を正常にキャプチャし、保持することになります。内部的には、Spanner は変更ストリーム レコードとデータ変更を同じ場所に配置し、書き込みオーバーヘッドが最小限になるように同じサーバーで処理されます。

特定のスプリットに対する DML の一環として、Spanner は同じトランザクション内の対応する変更ストリームのデータ スプリットへの書き込みを追加します。このコロケーションにより、変更ストリームは提供リソース間で余分な調整を追加せず、トランザクションの commit オーバーヘッドが最小限に抑えられます。

イメージ

Spanner は、データベースの負荷とサイズに基づいてデータを動的に分割およびマージし、サービス リソース間でスプリットを分散させてスケーリングします。

変更ストリームの書き込みと読み取りのスケーリングを可能にするために、Spanner は内部の変更ストリーム ストレージをデータベース データとともに分割し、マージするため、自動的にホットスポットを防止します。データベースの書き込みのスケーリングに合わせて、変更ストリーム レコードをニア リアルタイムで読み取りできるようにするため、Spanner API は変更ストリーム パーティションを使用して同時にクエリが実行されるように設計されています。変更ストリーム パーティションは、変更ストリーム レコードを含む変更ストリーム データ スプリットにマッピングされます。変更ストリーム パーティションは時間の経過とともに動的に変更され、Spanner がデータベース データを動的に分割およびマージする方法と関連付けられます。

変更ストリームのパーティションには、特定の期間における不変のキー範囲のレコードが含まれています。変更ストリーム パーティションは、1 つ以上の変更ストリーム パーティションに分割することも、他の変更ストリーム パーティションとマージすることもできます。これらの分割イベントまたはマージ イベントが発生すると、子パーティションが作成され、次の期間に対するそれぞれの不変キー範囲の変更をキャプチャします。データ変更レコードに加えて、変更ストリーム クエリは、クエリが必要な新しい変更ストリーム パーティションのリーダーに通知する子パーティション レコードと、最近発生した書き込みがない場合の転送の進行状況を示すハートビート レコードを返します。

特定の変更ストリーム パーティションに対してクエリを実行すると、変更レコードは commit タイムスタンプ順に返されます。各変更レコードは 1 回だけ返されます。変更ストリーム パーティション間では、変更レコードの順序は保証されません。特定の主キーの変更レコードは、特定の期間の 1 つのパーティションでのみ返されます。

親子パーティションのリネージのため、commit タイムスタンプ順で特定のキーの変更を処理するには、子パーティションから返されたレコードは、すべての親パーティションからのレコードが処理された後にのみ処理する必要があります。

ストリーム読み取り関数とクエリ構文の変更

GoogleSQL

変更ストリームにクエリを実行するには、ExecuteStreamingSql API を使用します。Spanner は、変更ストリームとともに特別な読み取り関数を自動的に作成します。読み取り関数で変更ストリームのレコードにアクセスできます。読み取り関数の命名規則は READ_change_stream_name です。

データベースに変更ストリーム SingersNameStream が存在すると仮定すると、GoogleSQL のクエリ構文は次のようになります。

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

読み取り関数は次の引数を受け入れます。

引数名 Type 要否 説明
start_timestamp TIMESTAMP 必須 commit_timestampstart_timestamp 以上のレコードを返すように指定します。値は変更ストリームの保持期間内にする必要があり、現在の時刻以下で、変更ストリームの作成のタイムスタンプ以上である必要があります。
end_timestamp TIMESTAMP 省略可(デフォルト: NULL commit_timestampend_timestamp 以下のレコードを返すように指定します。値は変更ストリームの保持期間内で、start_timestamp 以上である必要があります。クエリは、end_timestamp までのすべての ChangeRecord を返した後、またはユーザーが接続を終了した後に終了します。NULL を指定するか、指定していない場合は、すべての ChangeRecord が返されるか、ユーザーが接続を終了するまでクエリが実行されます。
partition_token STRING 省略可(デフォルト: NULL 子パーティション レコードのコンテンツに基づいて、クエリを実行する変更ストリーム パーティションを指定します。NULL または指定されていない場合、リーダーが初めて変更ストリームをクエリし、クエリに使用する特定のパーティション トークンを取得していないことを意味します。
heartbeat_milliseconds INT64 必須 このパーティションに commit されたトランザクションがない場合に、ハートビートの ChangeRecord が返される頻度を決定します。 値は 1,000(1 秒)~30,0000(5 分)の範囲にする必要があります。
read_options ARRAY 省略可(デフォルト: NULL 今後の使用のために予約されている追加の読み取りオプション。現在、指定できる値は NULL のみです。

次の例に示すように、読み取り関数クエリのテキストとそのクエリにバインドするパラメータの便利なメソッドを作成することをおすすめします。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

変更ストリームにクエリを実行するには、ExecuteStreamingSql API を使用します。Spanner は、変更ストリームとともに特別な読み取り関数を自動的に作成します。読み取り関数で変更ストリームのレコードにアクセスできます。読み取り関数の命名規則は spanner.read_json_change_stream_name です。

データベースに変更ストリーム SingersNameStream が存在すると仮定すると、PostgreSQL のクエリ構文は次のようになります。

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

読み取り関数は次の引数を受け入れます。

引数名 Type 要否 説明
start_timestamp timestamp with time zone 必須 commit_timestampstart_timestamp 以上の変更レコードを返すように指定します。値は変更ストリームの保持期間内にする必要があり、現在の時刻以下で、変更ストリームの作成のタイムスタンプ以上である必要があります。
end_timestamp timestamp with timezone 省略可(デフォルト: NULL commit_timestampend_timestamp 以下の変更レコードを返すように指定します。値は変更ストリームの保持期間内で、start_timestamp 以上でなければなりません。クエリは、end_timestamp までのすべての変更レコードを返すか、ユーザーが接続を終了した後に終了します。 NULL の場合、すべての変更レコードが返されるかユーザーが接続を終了するまで、クエリが実行されます。
partition_token text 省略可(デフォルト: NULL 子パーティション レコードのコンテンツに基づいて、クエリを実行する変更ストリーム パーティションを指定します。NULL または指定されていない場合、リーダーが初めて変更ストリームをクエリし、クエリに使用する特定のパーティション トークンを取得していないことを意味します。
heartbeat_milliseconds bigint 必須 このパーティションに commit されたトランザクションがない場合に、ハートビートの ChangeRecord が返される頻度を決定します。 値は 1,000(1 秒)から 300,000(5 分)の範囲で指定してください。
null null 必須 将来の使用のために予約

次の例に示すように、読み取り関数のテキストの作成と、その関数へのパラメータのバインドに便利なメソッドを作成することをおすすめします。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

変更ストリームのレコード形式

GoogleSQL

変更ストリームの読み取り関数は、ARRAY<STRUCT<...>> 型の ChangeRecord 列を 1 つ返します。各行には、常に 1 つの要素が含まれています。

配列要素には次の型があります。

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

この構造体には 3 つのフィールド data_change_recordheartbeat_recordchild_partitions_record があり、それぞれが ARRAY<STRUCT<...>> 型です。変更ストリームの読み取り関数が返す行には、次の 3 つのフィールドのどちらか 1 つのみに値が含まれます。残りの 2 つは空または NULL です。これらの配列フィールドに格納される要素は 1 つだけです。

次のセクションでは、この 3 つのレコードタイプのそれぞれについて説明します。

PostgreSQL

変更ストリームの読み取り関数は、次の構造を持つ JSON 型の ChangeRecord 列を 1 つ返します。

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

このオブジェクトには、data_change_recordheartbeat_recordchild_partitions_record の 3 つのキーがあります。対応する値の型は JSON です。変更ストリームの読み取り関数が返す行には、この 3 つのキーのうち 1 つだけが存在します。

以下では、この 3 つのレコードタイプについて説明します。

データ変更レコード

データ変更レコードには、同じトランザクションの 1 つの変更ストリーム パーティション内の同じ commit タイムスタンプで commit された同じ変更タイプ(挿入、更新、削除)を持つテーブルに対する一連の変更が含まれます。複数のデータ変更レコードが、複数の変更ストリーム パーティション間の同じトランザクションで返されることがあります。

すべてのデータ変更レコードには、commit_timestampserver_transaction_idrecord_sequence の各フィールドがあり、これらのフィールドの組み合わせにより、ストリーム レコードの変更ストリームの順序が決まります。これら 3 つのフィールドは、変更の順序を導き出し、外部整合性を提供するには十分です。

重複しないデータを扱う場合、複数のトランザクションが同じ commit タイムスタンプを持つ可能性があります。server_transaction_id フィールドを使用すると、同じトランザクション内で発行された変更のセット(潜在的に変更ストリーム パーティション間など)を区別できます。record_sequence フィールドおよび number_of_records_in_transaction フィールドと組み合わせると、特定のトランザクションのすべてのレコードをバッファリングして順序付けすることもできます。

データ変更レコードのフィールドは次のとおりです。

GoogleSQL

フィールド タイプ 説明
commit_timestamp TIMESTAMP 変更が commit されたタイムスタンプ。
record_sequence STRING トランザクション内のレコードのシーケンス番号。シーケンス番号は、一意で、単調に増加する(必ずしも連続ではない)ことが保証されています。同じ server_transaction_id のレコードを record_sequence で並べ替えて、トランザクション内の変更の順序を再構築します。
server_transaction_id STRING 変更が commit された Spanner トランザクションを表す、グローバルに一意の文字列。この値は、変更ストリームのレコードを処理するコンテキストでのみ使用する必要があります。Spanner の API のトランザクション ID と相関はありません。
is_last_record_in_transaction_in_partition BOOL これが現在のパーティションにおけるトランザクションの最後のレコードであるかどうかを示します。
table_name STRING 変更の影響を受けるテーブルの名前。
value_capture_type STRING

この変更がキャプチャされたときに、変更ストリーム構成で指定された値キャプチャ タイプを記述します。

値キャプチャ タイプは、"OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES" のいずれかです。 デフォルトでは "OLD_AND_NEW_VALUES" になっています。詳細については、値キャプチャ タイプをご覧ください。

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
列の名前、列のタイプ、主キーかどうか、スキーマで定義されている列の位置(「ordinal_position」など)。スキーマ内のテーブルの最初の列は、1 の順序位置になります。配列列の場合、列のタイプをネストできます。形式は、Spanner API リファレンスで説明されている型構造と一致します。
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
主キーの値、古い値、変更または追跡列の新しい値など、行われた変更について説明します。 古い値と新しい値の可用性と内容は、構成された value_capture_type によって異なります。new_values フィールドと old_values フィールドには、非キー列のみが含まれます。
mod_type STRING 変更のタイプを表します。INSERTUPDATEDELETE のいずれかです。
number_of_records_in_transaction INT64 すべての変更ストリーム パーティションにわたるこのトランザクションに含まれるデータ変更レコードの数。
number_of_partitions_in_transaction INT64 このトランザクションのデータ変更レコードを返すパーティションの数。
transaction_tag STRING このトランザクションに関連付けられた トランザクション タグ
is_system_transaction BOOL トランザクションがシステム トランザクションかどうかを示します。

PostgreSQL

フィールド タイプ 説明
commit_timestamp STRING 変更が commit されたタイムスタンプ。
record_sequence STRING トランザクション内のレコードのシーケンス番号。シーケンス番号は、一意で、単調に増加する(必ずしも連続するではない)ことが保証されています。同じ「server_transaction_id」のレコードを「record_sequence」で並べ替えて、トランザクション内の変更の順序を再構築します。
server_transaction_id STRING 変更が commit された Spanner トランザクションを表す、グローバルに一意の文字列。この値は、変更ストリームのレコードを処理するコンテキストでのみ使用する必要があります。Spanner の API でトランザクション ID と相関することはできません。
is_last_record_in_transaction_in_partition BOOLEAN これが現在のパーティションにおけるトランザクションの最後のレコードであるかどうかを示します。
table_name STRING 変更の影響を受けるテーブルの名前。
value_capture_type STRING

この変更がキャプチャされたときに、変更ストリーム構成で指定された値キャプチャ タイプを記述します。

値キャプチャ タイプは、"OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES" のいずれかです。 デフォルトでは "OLD_AND_NEW_VALUES" になっています。詳細については、値キャプチャ タイプをご覧ください。

column_types

[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
列の名前、列のタイプ、主キーかどうか、スキーマで定義されている列の位置(「ordinal_position」など)。スキーマ内のテーブルの最初の列は、1 の順序位置になります。配列列の場合、列のタイプをネストできます。形式は、Spanner API リファレンスで説明されている型構造と一致します。
mods

[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
主キーの値、古い値、変更または追跡列の新しい値など、行われた変更について説明します。古い値と新しい値の可用性と内容は、構成された value_capture_type によって異なります。new_values フィールドと old_values フィールドには、非キー列のみが含まれます。
mod_type STRING 変更のタイプを表します。INSERTUPDATEDELETE のいずれかです。
number_of_records_in_transaction INT64 すべての変更ストリーム パーティションにわたるこのトランザクションに含まれるデータ変更レコードの数。
number_of_partitions_in_transaction NUMBER このトランザクションのデータ変更レコードを返すパーティションの数。
transaction_tag STRING このトランザクションに関連付けられた トランザクション タグ
is_system_transaction BOOLEAN トランザクションがシステム トランザクションかどうかを示します。

データ変更レコードのペアの例を以下に示します。2 つのアカウント間で転送がある 1 つのトランザクションを表します。2 つのアカウントは、別々の変更ストリーム パーティションにあることに注意してください。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

次のデータ変更レコードは、値キャプチャ タイプが "NEW_VALUES" のレコードの例です。新しい値のみが入力されることに注意してください。 "LastUpdate" 列のみが変更されたため、その列のみが返されました。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

次のデータ変更レコードは、値キャプチャ タイプが "NEW_ROW" のレコードの例です。"LastUpdate" 列のみが変更されましたが、追跡されたすべての列が返されます。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

次のデータ変更レコードは、値キャプチャ タイプが "NEW_ROW_AND_OLD_VALUES" のレコードの例です。"LastUpdate" 列のみが変更されましたが、追跡されたすべての列が返されます。この値キャプチャ タイプは、LastUpdate の新しい値と古い値をキャプチャします。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

ハートビート レコード

ハートビート レコードが返されると、ハートビート レコードの timestamp 以下の commit_timestamp を持つすべての変更が返され、このパーティション内の将来のデータレコードには、ハートビート レコードによって返されるものよりも高い commit タイムスタンプが必要であることを示しています。パーティションにデータの変更が書き込まれていない場合、ハートビート レコードが返されます。パーティションにデータ変更が書き込まれた場合、heartbeat_record.timestamp の代わりに data_change_record.commit_timestamp を使用して、リーダーがパーティションの読み取りを進めていることを示すことができます。

パーティションで返されたハートビート レコードを使用して、すべてのパーティションでリーダーを同期できます。すべてのリーダーがあるタイムスタンプ A 以上のハートビートを受信するか、タイムスタンプ A 以上のデータまたは子パーティションのレコードを受信すると、リーダーは、そのタイムスタンプ A 以前に commit されたすべてのレコードを受信したことを認識し、バッファ内のレコードの処理を開始できます(たとえば、タイムスタンプでクロス パーティションのレコードを並べ替え、server_transaction_id でグループ化)。

ハートビート レコードに含まれるフィールドは、次のとおりです。

GoogleSQL

フィールド タイプ 説明
timestamp TIMESTAMP ハートビート レコードのタイムスタンプ。

PostgreSQL

フィールド タイプ 説明
timestamp STRING ハートビート レコードのタイムスタンプ。

ハートビート レコードの例。これは、このレコードのタイムスタンプより短いタイムスタンプを持つすべてのレコードが返されたことを表します。

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

子パーティション レコード

子パーティション レコードは、子パーティションに関する情報(パーティション トークン、親パーティションのトークン、子パーティションに含まれる変更レコードの最も古いタイムスタンプを表す start_timestamp)を返します。commit タイムスタンプが child_partitions_record.start_timestamp の直前のレコードであるレコードが現在のパーティションに返されます。このパーティションの子パーティション レコードがすべて返されると、このクエリは成功ステータスを返します。これは、このパーティションのすべてのレコードが返されていることを示します。

子パーティション レコードのフィールドは次のとおりです。

GoogleSQL

フィールド タイプ 説明
start_timestamp TIMESTAMP この子パーティション レコードの子パーティションから返されるデータ変更レコードには、start_timestamp 以上の commit タイムスタンプがあります。子パーティションをクエリする場合は、子パーティション トークンと、child_partitions_token.start_timestamp 以上の start_timestamp をクエリで指定する必要があります。パーティションによって返されるすべての子パーティション レコードには同じ start_timestamp があり、タイムスタンプは常にクエリで指定された start_timestampend_timestamp の間になります。
record_sequence STRING 単調に増加するシーケンス番号。特定のパーティションに同じ start_timestamp で返された複数の子パーティション レコードがある場合に、子パーティション レコードの順序を定義するために使用できます。パーティション トークン start_timestamprecord_sequence は、子パーティション レコードを一意に識別します。
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
一連の子パーティションとそれに関連する情報を返します。 これには、クエリで子パーティションを識別するために使用されるパーティション トークン文字列と、その親パーティションのトークンが含まれます。

PostgreSQL

フィールド タイプ 説明
start_timestamp STRING この子パーティション レコードの子パーティションから返されるデータ変更レコードは、start_timestamp 以上の commit タイムスタンプを持ちます。子パーティションをクエリする場合は、子パーティション トークンと、child_partitions_token.start_timestamp 以上の start_timestamp をクエリで指定する必要があります。パーティションによって返されるすべての子パーティション レコードは同じ start_timestamp を持ち、タイムスタンプは常にクエリで指定された start_timestampend_timestamp の間にあります。
record_sequence STRING 単調に増加するシーケンス番号。特定のパーティションに同じ start_timestamp で返された複数の子パーティション レコードがある場合に、子パーティション レコードの順序を定義するために使用できます。パーティション トークン start_timestamprecord_sequence は、子パーティション レコードを一意に識別します。
child_partitions

[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
子パーティションとそれに関連する情報の配列を返します。 これには、クエリで子パーティションを識別するために使用されるパーティション トークン文字列と、その親パーティションのトークンが含まれます。

子パーティション レコードの例を次に示します。

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

変更ストリーム クエリのワークフロー

単回使用の読み取り専用トランザクションと強力な ExecuteStreamingSql API と タイムスタンプ バウンドを使用して、変更ストリーム クエリを実行します。変更ストリームの読み取り関数で、目的の期間に start_timestampend_timestamp を指定できます。保持期間内のすべての変更レコードには、強力な読み取り専用のタイムスタンプ バウンドを使用してアクセスできます。

他のすべての TransactionOptions は、変更ストリーム クエリに対して無効です。さらに、TransactionOptions.read_only.return_read_timestamp が true に設定されている場合、トランザクションを読み取る Transaction メッセージで、有効なタイムスタンプの読み取りの代わりに、特別な値 kint64max - 1 が返されます。この特殊な値を破棄し、後続のクエリには使用しないでください。

各変更ストリーム クエリは、データ変更レコード、ハートビート レコード、子パーティション レコードのいずれかが含まれる任意の数の行を返すことができます。リクエストの期限を設定する必要はありません。

例:

ストリーミング クエリのワークフローでは、まず partition_tokenNULL に指定して、最初の変更ストリーム クエリを発行します。クエリでは、変更ストリームの読み取り関数、対象となる開始タイムスタンプと終了タイムスタンプ、ハートビート間隔を指定する必要があります。end_timestampNULL の場合、クエリはパーティションが終了するまでデータの変更を返し続けます。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

子パーティション レコードが返されるまで、このクエリのデータレコードを処理します。以下の例では、2 つの子パーティション レコードと 3 つのパーティション トークンが返され、クエリが終了します。特定のクエリからの子パーティション レコードは常に同じ start_timestamp を共有します。

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

2022-05-01T09:00:01Z の後に今後の変更を処理するには、3 つの新しいクエリを作成し、それらを並行して実行します。この 3 つのクエリは、親がカバーする同じキー範囲に対する将来のデータ変更を返します。常に同じ子パーティション レコードで start_timestampstart_timestamp に設定し、同じ end_timestamp とハートビート間隔を使用して、すべてのクエリでレコードを一貫して処理します。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

しばらくすると、別の子パーティション レコードが返されると child_token_2 のクエリが終了します。このレコードは、child_token_2child_token_3 の両方で、新しいパーティションが2022-05-01T09:30:15Zカバーされます。まったく同じレコードが、child_token_3 に対するクエリで返されます。これは、両方とも新しい child_token_4 の親パーティションであるためです。 特定のキーのデータレコードの厳密な順序付けされた処理を保証するためには、child_token_4 に対するクエリはすべての親が完了してから開始する必要があります。この場合は、child_token_2child_token_3 です。子パーティション トークンごとに 1 つのクエリのみを作成します。クエリ ワークフローの設計では、1 つの親を指定して child_token_4 でクエリをスケジュールする必要があります。

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

GitHub の Apache Beam SpannerIO Dataflow コネクタで変更ストリーム レコードの処理と解析を行う例を確認する。