ストリーム パーティション、レコード、クエリを変更する

このページでは、変更ストリームの次の属性について詳しく説明します。

  • スプリットベースのパーティショニング モデル
  • 変更ストリーム レコードの形式と内容
  • これらのレコードのクエリに使用される低レベルの構文
  • クエリ ワークフローの例

このページの情報は、Spanner API を使用して変更ストリームを直接クエリする場合に最も関連性があります。代わりに Dataflow を使用して変更ストリーム データを読み取るアプリケーションは、ここで説明するデータモデルを直接操作する必要はありません。

変更ストリームの概要については、変更ストリームの概要をご覧ください。

変更ストリーム パーティション

変更ストリームが監視しているテーブルが変更されると、Spanner はデータ変更と同じトランザクションで同期的に、対応する変更ストリーム レコードをデータベースに書き込みます。これにより確実に、トランザクションが成功した場合に、Spanner は変更を正常にキャプチャし、保持することになります。内部的には、Spanner は変更ストリーム レコードとデータ変更を同じ場所に配置し、同じサーバーで処理されるようにすることで、書き込みのオーバーヘッドを最小限に抑えます。

特定のスプリットに対する DML の一環として、Spanner は同じトランザクションで対応する変更ストリーム データ スプリットに書き込みを追加します。このコロケーションにより、変更ストリームは提供リソース間で余分な調整を追加せず、トランザクションの commit オーバーヘッドが最小限に抑えられます。

画像

Spanner は、データベースの負荷とサイズに基づいてデータを動的に分割およびマージし、サービス リソース間でスプリットを分散させてスケーリングします。

変更ストリームの書き込みと読み取りのスケーリングを可能にするために、Spanner は内部の変更ストリーム ストレージをデータベース データとともに分割し、マージするため、自動的にホットスポットを防止します。データベースの書き込みのスケーリングに合わせて、変更ストリーム レコードをニア リアルタイムで読み取りできるようにするため、Spanner API は変更ストリーム パーティションを使用して同時にクエリが実行されるように設計されています。変更ストリーム パーティションは、変更ストリーム レコードを含む変更ストリーム データ スプリットにマッピングされます。変更ストリーム パーティションは時間の経過とともに動的に変更され、Spanner がデータベース データを動的に分割およびマージする方法と関連付けられます。

変更ストリームのパーティションには、特定の期間における不変のキー範囲のレコードが含まれています。変更ストリーム パーティションは、1 つ以上の変更ストリーム パーティションに分割することも、他の変更ストリーム パーティションと統合することもできます。これらの分割イベントまたは統合イベントが発生すると、子パーティションが作成され、次の期間のそれぞれの不変のキー範囲の変更がキャプチャされます。データ変更レコードに加えて、変更ストリーム クエリは、クエリが必要な新しい変更ストリーム パーティションのリーダーに通知する子パーティション レコードと、最近発生した書き込みがない場合の転送の進行状況を示すハートビート レコードを返します。

特定の変更ストリーム パーティションに対してクエリを実行すると、変更レコードは commit タイムスタンプ順に返されます。各変更レコードは 1 回だけ返されます。変更ストリーム パーティション間で、変更レコードの順序が保証されることはありません。特定の主キーの変更レコードは、特定の期間の 1 つのパーティションにのみ返されます。

親子パーティションのリネージのため、commit タイムスタンプ順で特定のキーの変更を処理するには、子パーティションから返されたレコードは、すべての親パーティションからのレコードが処理された後にのみ処理する必要があります。

変更ストリームの読み取り機能とクエリ構文

GoogleSQL

変更ストリームにクエリを実行するには、ExecuteStreamingSql API を使用します。Spanner は、変更ストリームとともに特別な読み取り関数を自動的に作成します。読み取り関数は、変更ストリームのレコードへのアクセス権を付与します。読み取り関数の命名規則は READ_change_stream_name です。

データベースに変更ストリーム SingersNameStream が存在すると仮定すると、GoogleSQL のクエリ構文は次のとおりです。

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

読み取り関数は、次の引数を受け入れます。

引数名 必須 説明
start_timestamp TIMESTAMP 必須 commit_timestampstart_timestamp 以上であるレコードを返すように指定します。値は変更ストリームの保持期間内にする必要があり、現在の時刻以下で、変更ストリームの作成のタイムスタンプ以上である必要があります。
end_timestamp TIMESTAMP 省略可(デフォルト: NULL commit_timestampend_timestamp 以下であるレコードを返すように指定します。値は変更ストリームの保持期間内にする必要があり、start_timestamp 以上である必要があります。クエリは、end_timestamp までのすべての変更レコードを返した後、またはユーザーが接続を終了した後に終了します。NULL の場合、または指定されていない場合、すべての変更レコードが返されるか、ユーザーが接続を終了するまでクエリが実行されます。
partition_token STRING 省略可(デフォルト: NULL 子パーティション レコードの内容に基づいて、クエリする変更ストリーム パーティションを指定します。NULL または指定されていない場合、リーダーが初めて変更ストリームをクエリし、クエリに使用する特定のパーティション トークンを取得していないことを意味します。
heartbeat_milliseconds INT64 必須 このパーティションに commit されたトランザクションがない場合に、ハートビートの ChangeRecord が返される頻度を決定します。 値は 1,000(1 秒)から 300,000(5 分)の範囲で指定してください。
read_options ARRAY 省略可(デフォルト: NULL 将来の使用のために予約された追加の読み取りオプション。現在、指定できる値は NULL のみです。

次の例に示すように、読み取り関数クエリのテキストを構築し、パラメータをバインドするための便利なメソッドを作成することをおすすめします。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

変更ストリームにクエリを実行するには、ExecuteStreamingSql API を使用します。Spanner は、変更ストリームとともに特別な読み取り関数を自動的に作成します。読み取り関数は、変更ストリームのレコードへのアクセス権を付与します。読み取り関数の命名規則は spanner.read_json_change_stream_name です。

データベースに変更ストリーム SingersNameStream が存在すると仮定すると、PostgreSQL のクエリ構文は次のとおりです。

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

読み取り関数は、次の引数を受け入れます。

引数名 必須 説明
start_timestamp timestamp with time zone 必須 commit_timestampstart_timestamp 以上である変更レコードを返すように指定します。値は変更ストリームの保持期間内にする必要があり、現在の時刻以下で、変更ストリームの作成のタイムスタンプ以上である必要があります。
end_timestamp timestamp with timezone 省略可(デフォルト: NULL commit_timestampend_timestamp 以下である変更レコードを返すように指定します。値は変更ストリームの保持期間内にする必要があり、start_timestamp 以上である必要があります。クエリは、end_timestamp までのすべての変更レコードを返すか、ユーザーが接続を終了した後に終了します。 NULL の場合、すべての変更レコードが返されるか、ユーザーが接続を終了するまでクエリが実行されます。
partition_token text 省略可(デフォルト: NULL 子パーティション レコードの内容に基づいて、クエリする変更ストリーム パーティションを指定します。NULL または指定されていない場合、リーダーが初めて変更ストリームをクエリし、クエリに使用する特定のパーティション トークンを取得していないことを意味します。
heartbeat_milliseconds bigint 必須 このパーティションに commit されたトランザクションがない場合に、ハートビートの ChangeRecord が返される頻度を決定します。 値は 1,000(1 秒)から 300,000(5 分)の範囲で指定してください。
null null 必須 将来の使用のために予約

次の例に示すように、読み取り関数のテキストを構築し、そのテキストにパラメータをバインドするための便利なメソッドを作成することをおすすめします。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

変更ストリームのレコード形式

GoogleSQL

変更ストリームの読み取り関数は、ARRAY<STRUCT<...>> 型の単一の ChangeRecord 列を返します。各行のこの配列には、常に 1 つの要素が含まれます。

配列要素の型は次のとおりです。

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

この構造体には、data_change_recordheartbeat_recordchild_partitions_record の 3 つのフィールドがあり、それぞれ ARRAY<STRUCT<...>> 型です。変更ストリーム読み取り関数が返す行では、これらの 3 つのフィールドの 1 つにのみ値が含まれ、他の 2 つは空または NULL です。これらの配列フィールドには、最大で 1 つの要素が含まれます。

以降のセクションでは、これらの 3 つのレコードタイプについて説明します。

PostgreSQL

変更ストリームの読み取り関数は、次の構造を持つ JSON 型の単一の ChangeRecord 列を返します。

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

このオブジェクトには、data_change_recordheartbeat_recordchild_partitions_record の 3 つのキーがあります。対応する値の型は JSON です。変更ストリーム読み取り関数が返す行には、これらの 3 つのキーのいずれか 1 つだけが存在します。

以降のセクションでは、これらの 3 つのレコードタイプについて説明します。

データ変更レコード

データ変更レコードには、同じトランザクションの 1 つの変更ストリーム パーティション内の同じ commit タイムスタンプで commit された同じ変更タイプ(挿入、更新、削除)を持つテーブルに対する一連の変更が含まれます。複数のデータ変更レコードが、複数の変更ストリーム パーティション間の同じトランザクションで返されることがあります。

すべてのデータ変更レコードには commit_timestampserver_transaction_idrecord_sequence の各フィールドがあり、これらを組み合わせてストリーム レコードの変更ストリーム内の順序が決まります。これらの 3 つのフィールドは、変更の順序を導き出し、外部整合性を提供するのに十分です。

重複しないデータにアクセスする複数のトランザクションが、同じ commit タイムスタンプを持つ場合があります。server_transaction_id フィールドを使用すると、同じトランザクション内で発行された変更のセット(潜在的に変更ストリーム パーティション間など)を区別できます。record_sequence フィールドおよび number_of_records_in_transaction フィールドと組み合わせると、特定のトランザクションのすべてのレコードをバッファリングして順序付けすることもできます。

データ変更レコードのフィールドには、次のものが含まれます。

GoogleSQL

フィールド 説明
commit_timestamp TIMESTAMP 変更が commit されたタイムスタンプ。
record_sequence STRING トランザクション内のレコードのシーケンス番号。シーケンス番号は、一意で、単調に増加する(必ずしも連続ではない)ことが保証されています。同じ server_transaction_id のレコードを record_sequence で並べ替えて、トランザクション内の変更の順序を再構築します。この順序は、パフォーマンスを向上させるために Spanner によって最適化される場合があります。また、ユーザーが指定した元の順序と一致しないこともあります。
server_transaction_id STRING 変更が commit された Spanner トランザクションを表す、グローバルに一意の文字列。この値は、変更ストリームのレコードを処理するコンテキストでのみ使用する必要があります。Spanner の API のトランザクション ID とは関連付けられません。
is_last_record_in_transaction_in_partition BOOL これが現在のパーティション内のトランザクションの最後のレコードかどうかを示します。
table_name STRING 変更の影響を受けるテーブルの名前。
value_capture_type STRING

この変更がキャプチャされたときに変更ストリーム構成で指定された値キャプチャ タイプを記述します。

値キャプチャ タイプは、"OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES" のいずれかです。 デフォルトでは "OLD_AND_NEW_VALUES" になっています。詳細については、値キャプチャ タイプをご覧ください。

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
列の名前、列のタイプ、主キーかどうか、スキーマで定義されている列の位置(「ordinal_position」など)。スキーマ内のテーブルの最初の列は、1 の順序位置になります。配列列の場合、列のタイプをネストできます。形式は、Spanner API リファレンスで説明されている型構造と一致します。
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
主キーの値、古い値、変更または追跡列の新しい値など、行われた変更について説明します。 古い値と新しい値の可用性と内容は、構成された value_capture_type によって異なります。new_values フィールドと old_values フィールドには、非キー列のみが含まれます。
mod_type STRING 変更のタイプを記述します。INSERTUPDATEDELETE のいずれかです。
number_of_records_in_transaction INT64 すべての変更ストリーム パーティションのこのトランザクションに含まれるデータ変更レコードの数。
number_of_partitions_in_transaction INT64 このトランザクションのデータ変更レコードを返すパーティションの数。
transaction_tag STRING このトランザクションに関連付けられたトランザクション タグ
is_system_transaction BOOL トランザクションがシステム トランザクションかどうかを示します。

PostgreSQL

フィールド 説明
commit_timestamp STRING 変更が commit された時点のタイムスタンプ。
record_sequence STRING トランザクション内のレコードのシーケンス番号。シーケンス番号は、一意で、単調に増加する(必ずしも連続するではない)ことが保証されています。同じ「server_transaction_id」のレコードを「record_sequence」で並べ替えて、トランザクション内の変更の順序を再構築します。
server_transaction_id STRING 変更が commit された Spanner トランザクションを表す、グローバルに一意の文字列。この値は、変更ストリームのレコードを処理するコンテキストでのみ使用し、Spanner の API のトランザクション ID とは関連付けられません。
is_last_record_in_transaction_in_partition BOOLEAN これが現在のパーティション内のトランザクションの最後のレコードかどうかを示します。
table_name STRING 変更の影響を受けるテーブルの名前。
value_capture_type STRING

この変更がキャプチャされたときに変更ストリーム構成で指定された値キャプチャ タイプを記述します。

値キャプチャ タイプは、"OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES" のいずれかです。 デフォルトでは "OLD_AND_NEW_VALUES" になっています。詳細については、値キャプチャ タイプをご覧ください。

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
列の名前、列のタイプ、主キーかどうか、スキーマで定義されている列の位置(「ordinal_position」など)。スキーマ内のテーブルの最初の列は、1 の順序位置になります。配列列の場合、列のタイプをネストできます。形式は、Spanner API リファレンスで説明されている型構造と一致します。
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
主キーの値、古い値、変更または追跡列の新しい値など、行われた変更について説明します。古い値と新しい値の可用性と内容は、構成された value_capture_type によって異なります。new_values フィールドと old_values フィールドには、非キー列のみが含まれます。
mod_type STRING 変更のタイプを記述します。INSERTUPDATEDELETE のいずれかです。
number_of_records_in_transaction INT64 すべての変更ストリーム パーティションのこのトランザクションに含まれるデータ変更レコードの数。
number_of_partitions_in_transaction NUMBER このトランザクションのデータ変更レコードを返すパーティションの数。
transaction_tag STRING このトランザクションに関連付けられたトランザクション タグ
is_system_transaction BOOLEAN トランザクションがシステム トランザクションかどうかを示します。

以下に、データ変更レコードの例を示します。2 つのアカウント間での転送がある単一のトランザクションを記述します。2 つのアカウントは別々の変更ストリーム パーティションにあります。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

次のデータ変更レコードは、値のキャプチャ タイプが "NEW_VALUES" のレコードの例です。新しい値のみが入力されることに注意してください。 変更されたのは "LastUpdate" 列のみであるため、その列のみが返されました。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

次のデータ変更レコードは、値のキャプチャ タイプが "NEW_ROW" のレコードの例です。"LastUpdate" 列のみが変更されていますが、トラッキング対象のすべての列が返されます。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

次のデータ変更レコードは、値のキャプチャ タイプが "NEW_ROW_AND_OLD_VALUES" のレコードの例です。"LastUpdate" 列のみが変更されていますが、トラッキング対象のすべての列が返されます。この値キャプチャ タイプは、LastUpdate の新しい値と古い値をキャプチャします。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

ハートビート レコード

ハートビート レコードが返されると、ハートビート レコードの timestamp 以下の commit_timestamp を持つすべての変更が返され、このパーティション内の将来のデータレコードには、ハートビート レコードによって返されるものよりも高い commit タイムスタンプが必要であることを示しています。パーティションにデータの変更が書き込まれていない場合、ハートビート レコードが返されます。パーティションにデータ変更が書き込まれた場合、heartbeat_record.timestamp の代わりに data_change_record.commit_timestamp を使用して、リーダーがパーティションの読み取りを進めていることを示すことができます。

パーティションで返されたハートビート レコードを使用して、すべてのパーティションでリーダーを同期できます。すべてのリーダーがあるタイムスタンプ A 以上のハートビートを受信するか、タイムスタンプ A 以上のデータまたは子パーティションのレコードを受信すると、リーダーは、そのタイムスタンプ A 以前に commit されたすべてのレコードを受信したことを認識し、バッファ内のレコードの処理を開始できます(たとえば、タイムスタンプでクロス パーティションのレコードを並べ替え、server_transaction_id でグループ化)。

ハートビート レコードには、次の 1 つのフィールドのみが含まれます。

GoogleSQL

フィールド 説明
timestamp TIMESTAMP ハートビート レコードのタイムスタンプ。

PostgreSQL

フィールド 説明
timestamp STRING ハートビート レコードのタイムスタンプ。

このレコードのタイムスタンプと等しいか、それ以前のタイムスタンプを持つすべてのレコードが返されたことを示すハートビート レコードの例:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

子パーティション レコード

子パーティション レコードは、子パーティションに関する情報(パーティション トークン、親パーティションのトークン、子パーティションに含まれる変更レコードの最も古いタイムスタンプを表す start_timestamp)を返します。commit タイムスタンプが child_partitions_record.start_timestamp の直前のレコードであるレコードが現在のパーティションに返されます。このパーティションの子パーティション レコードがすべて返されると、このクエリは成功ステータスを返します。これは、このパーティションのすべてのレコードが返されていることを示します。

子パーティション レコードのフィールドには、以下が含まれます。

GoogleSQL

フィールド 説明
start_timestamp TIMESTAMP この子パーティション レコードの子パーティションから返されるデータ変更レコードは、start_timestamp 以上の commit タイムスタンプを持ちます。子パーティションをクエリする場合は、子パーティション トークンと、child_partitions_token.start_timestamp 以上の start_timestamp をクエリで指定する必要があります。パーティションによって返されるすべての子パーティション レコードは同じ start_timestamp を持ち、タイムスタンプは常にクエリで指定された start_timestampend_timestamp の間にあります。
record_sequence STRING 単調に増加するシーケンス番号。特定のパーティションで同じ start_timestamp で返された子パーティション レコードが複数ある場合に、子パーティション レコードの順序を定義するために使用できます。パーティション トークン start_timestamprecord_sequence は、子パーティション レコードを一意に識別します。
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
子パーティションとそれに関連する情報を返します。これには、クエリで子パーティションを識別するために使用されるパーティション トークン文字列と、その親パーティションのトークンが含まれます。

PostgreSQL

フィールド 説明
start_timestamp STRING この子パーティション レコードの子パーティションから返されるデータ変更レコードは、start_timestamp 以上の commit タイムスタンプを持ちます。子パーティションをクエリする場合は、子パーティション トークンと、child_partitions_token.start_timestamp 以上の start_timestamp をクエリで指定する必要があります。パーティションによって返されるすべての子パーティション レコードは同じ start_timestamp を持ち、タイムスタンプは常にクエリで指定された start_timestampend_timestamp の間にあります。
record_sequence STRING 単調に増加するシーケンス番号。特定のパーティションで同じ start_timestamp で返された子パーティション レコードが複数ある場合に、子パーティション レコードの順序を定義するために使用できます。パーティション トークン start_timestamprecord_sequence は、子パーティション レコードを一意に識別します。
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
子パーティションとそれに関連する情報の配列を返します。これには、クエリで子パーティションを識別するために使用されるパーティション トークン文字列と、その親パーティションのトークンが含まれます。

子パーティション レコードの例を次に示します。

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

変更ストリームのクエリ ワークフロー

単回使用の読み取り専用トランザクションと強力な ExecuteStreamingSql API と タイムスタンプ バウンドを使用して、変更ストリーム クエリを実行します。変更ストリーム読み取り関数を使用すると、対象の時間範囲の start_timestampend_timestamp を指定できます。保持期間内のすべての変更レコードには、強力な読み取り専用タイムスタンプ バウンドを使用してアクセスできます。

他のすべての TransactionOptions は、変更ストリーム クエリでは無効です。さらに、TransactionOptions.read_only.return_read_timestamp が true に設定されている場合、トランザクションを読み取る Transaction メッセージで、有効なタイムスタンプの読み取りの代わりに、特別な値 kint64max - 1 が返されます。この特殊な値を破棄し、後続のクエリには使用しないでください。

各変更ストリーム クエリは、データ変更レコード、ハートビート レコード、子パーティション レコードのいずれかを含む任意の数の行を返すことができます。リクエストの期限を設定する必要はありません。

例:

ストリーミング クエリ ワークフローは、partition_tokenNULL に指定して最初の変更ストリーム クエリを発行することから始まります。クエリでは、変更ストリームの読み取り関数、対象の開始タイムスタンプと終了タイムスタンプ、ハートビート間隔を指定する必要があります。end_timestampNULL の場合、クエリはパーティションが終了するまでデータ変更を返し続けます。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

子パーティション レコードが返されるまで、このクエリのデータレコードを処理します。次の例では、2 つの子パーティション レコードと 3 つのパーティション トークンが返され、クエリが終了します。特定のクエリからの子パーティション レコードは常に同じ start_timestamp を共有します。

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

2022-05-01T09:00:01Z 以降の変更を処理するには、3 つの新しいクエリを作成し、並行して実行します。3 つのクエリを組み合わせると、親がカバーする同じキー範囲の将来のデータ変更が返されます。start_timestamp は常に同じ子パーティション レコードの start_timestamp に設定し、同じ end_timestamp とハートビート間隔を使用して、すべてのクエリでレコードを一貫して処理します。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

しばらくすると、別の子パーティション レコードが返されると child_token_2 のクエリが終了します。このレコードは、child_token_2child_token_3 の両方で、新しいパーティションが2022-05-01T09:30:15Zカバーされます。まったく同じレコードが、child_token_3 に対するクエリで返されます。これは、両方とも新しい child_token_4 の親パーティションであるためです。 特定のキーのデータレコードの厳密な順序付けされた処理を保証するためには、child_token_4 に対するクエリはすべての親が完了してから開始する必要があります。この場合は、child_token_2child_token_3 です。子パーティション トークンごとに 1 つのクエリのみを作成します。クエリ ワークフローの設計では、child_token_4 でクエリを待機してスケジュールを設定する親を 1 つ指名する必要があります。

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Apache Beam SpannerIO Dataflow コネクタで変更ストリーム レコードを処理して解析する例については、GitHub をご覧ください。