ストリーム パーティション、レコード、クエリを変更する

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

このページでは、変更ストリームの次の属性について詳しく説明します。

  • 分割ベースのパーティショニング モデル
  • 変更ストリーム レコードの形式と内容
  • レコードの照会に使用する低レベル構文
  • クエリ ワークフローの例

このページの情報は、Spanner API を使用して変更ストリームを直接クエリする場合に最も関連性があります。代わりに Dataflow を使用して変更ストリーム データを読み取るアプリケーションでは、ここで説明するデータモデルを直接操作する必要はありません。

ストリームの変更に関する広範な入門ガイドについては、変更ストリームの概要をご覧ください。

ストリーム パーティションの変更

変更ストリームで監視されるテーブルが変更されると、Cloud Spanner はデータ変更と同じトランザクションで、対応する変更ストリーム レコードをデータベースに書き込みます。こうすれば、トランザクションが成功すると、Spanner は変更を正常にキャプチャして永続化します。内部的には、Spanner は変更ストリーム レコードとデータ変更が同じ場所に配置され、書き込みオーバーヘッドを最小限に抑えます。

特定のスプリットに DML の一部として、Spanner は同じ書き込み内の対応する変更ストリーム データ分割に書き込みを追加します。このコロケーションにより、変更ストリームによってサービス リソース間の余分な調整が行われず、トランザクションの commit オーバーヘッドが最小限に抑えられます。

イメージ

Spanner は、データベースの負荷とサイズに基づいてデータの動的分割とマージを行い、サービス リソース間で分割を分散させることによってスケーリングを行います。

変更ストリームの書き込みと読み取りを有効にしてスケーリングできるように、Spanner は内部の変更ストリーム ストレージをデータベース データと一緒に分割してマージし、ホットスポットを自動的に回避します。データベースの書き込み規模に合わせてほぼリアルタイムで変更ストリーム レコードを読み取るため、Spanner API は変更ストリーム パーティションを使用して変更ストリームを同時にクエリできるように設計されています。変更ストリーム レコードを含むストリーム データ分割を変更するには、ストリーム パーティション マップを変更します。変更ストリームのパーティションは時間の経過とともに動的に変化し、Spanner がデータベース データを動的に分割およびマージする方法に相関しています。

変更ストリーム パーティションには、特定の期間の不変のキー範囲のレコードが含まれています。変更ストリーム パーティションは、1 つ以上の変更ストリーム パーティションに分割することや、他の変更ストリーム パーティションとマージすることができます。これらの分割イベントまたはマージ イベントが発生すると、次のパーティションで変更できないキー範囲の変更をキャプチャするために、子パーティションが作成されます。データ変更レコードに加えて、変更ストリーム クエリは、子パーティション レコードを返して、クエリを実行する必要がある新しい変更ストリーム パーティションを読者に通知します。また、書き込みがない場合に前進を表すハートビート レコードを返します。最近発生したものです。

特定の変更ストリーム パーティションをクエリすると、変更レコードは commit タイムスタンプ順に返されます。 各変更レコードは 1 回だけ返されます。変更ストリーム パーティション間で、変更レコードの順序が保証されるわけではありません。特定の主キーの変更レコードは、特定の時間範囲の 1 つのパーティションでのみ返されます。

親子パーティションのリネージのため、commit タイムスタンプ順で特定のキーの変更を処理するには、子パーティションから返されたレコードは、すべての親パーティションからのレコードが処理された後にのみ処理する必要があります。

ストリーム クエリ構文の変更

変更ストリームは、ExecuteStreamingSql API を使用してクエリされます。特別なテーブル値関数(TVF)は、変更ストリームとともに自動的に作成されます。これによって、変更ストリームのレコードにアクセスできます。TVF の命名規則は READ_change_stream_name です。

変更ストリーム SingersNameStream がデータベースに存在している場合、クエリの構文は次のようになります。

SELECT ChangeRecord
    FROM READ_SingersNameStream (
        start_timestamp,
        end_timestamp,
        partition_token,
        heartbeat_milliseconds
    )

この関数は次の引数を受け入れます。

引数名 Type 要否 説明
start_timestamp TIMESTAMP 必須 commit_timestampstart_timestamp 以上のレコードが返されるように指定します。値は、変更ストリームの保持期間内であり、現在の時刻以下で、変更ストリームの作成のタイムスタンプ以上である必要があります。
end_timestamp TIMESTAMP 省略可(デフォルト: NULL commit_timestampend_timestamp 以下のレコードが返されるように指定します。値は、変更ストリームの保持期間内であり、start_timestamp 以上である必要があります。クエリは、end_timestamp までのすべての ChangeRecord を返すか、子パーティション レコードのセットを返すことで終了します。NULL または指定されていない場合、現在のパーティションが終了し、child_partition_record フィールドが設定されたすべての ChangeRecord が返されるまで、クエリが実行されます。end_timestampNULL を指定すると、常に最新の変更を読み取ります。
partition_token STRING 省略可(デフォルト: NULL 子パーティション レコードの内容に基づいて、クエリを実行する変更パーティションを指定します。NULL または指定されていない場合、リーダーは初めて変更ストリームをクエリし、クエリ対象の特定のパーティション トークンは取得しません。
heartbeat_milliseconds INT64 必須 このパーティションで commit されたトランザクションがない場合に、ハートビート ChangeRecord が返される頻度を決定します。値は、1000(1 秒)~300000(5 分)にする必要があります。

次の例に示すように、TVF クエリのテキストを作成し、それにパラメータをバインドするために、便利なメソッドを作成することをおすすめします。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
    return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

ストリーム レコードの形式の変更

変更ストリーム TVF は、ARRAY<STRUCT<...>> 型の単一の ChangeRecord 列を返します。各行には、この配列に 1 つの要素が常に含まれています。

配列要素のタイプは次のとおりです。

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

この構造体には data_change_recordheartbeat_recordchild_partitions_record 型の 3 つのフィールドがあり、それぞれ ARRAY<STRUCT<...>> 型です。変更ストリーム TVF から返された行で、この 3 つのフィールドのうち 1 つのみに値が含まれます。他の 2 つは空または NULL です。これらの配列フィールドには、最大で 1 つの要素が含まれます。

次のセクションでは、この 3 つのレコードタイプについてそれぞれ説明します。

データ変更レコード

データ変更レコードには、同じトランザクションの 1 つの変更ストリーム パーティションに同じ commit タイムスタンプで commit された同じ変更タイプ(挿入、更新、削除)を持つテーブルに対する変更のセットが含まれます。同じトランザクションに対して、複数の変更ストリーム パーティション間で複数のデータ変更レコードを返すことができます。

すべてのデータ変更レコードには、commit_timestampserver_transaction_idrecord_sequence のフィールドがあり、ストリーム レコードの変更ストリームの順序が決定されます。この 3 つのフィールドで、変更の順序を決定し、外部整合性を確保できます。

重複しないデータに触れる場合、複数のトランザクションが同じ commit タイムスタンプを持つ可能性があります。server_transaction_id フィールドを使用すると、同じトランザクション内で発行された一連の変更(変更ストリーム パーティション間での可能性がある)を区別できます。record_sequence フィールドと number_of_records_in_transaction フィールドと組み合わせることで、特定のトランザクションのすべてのレコードをバッファして順序付けできます。

データ変更レコードのフィールドは次のとおりです。

フィールド タイプ 説明
commit_timestamp TIMESTAMP 変更が commit されたタイムスタンプ。
record_sequence STRING トランザクション内のレコードのシーケンス番号。シーケンス番号は、トランザクション内で一意で、単調に増加する(ただし、必ずしも連続していない)ことが保証されています。同じ「server_transaction_id」のレコードを「record_sequence」で並べ替えて、トランザクション内の変更の順序を再構築します。
server_transaction_id STRING 変更が commit されたトランザクションを表すグローバルに一意の文字列。この値は、変更ストリーム レコードの処理のコンテキストでのみ使用する必要があります。Spanner の API のトランザクション ID とは関係ありません(TransactionSelector.id など)。これらは、同じコンテキスト内の他の値(つまり、変更ストリーム「data_change_records」、または Spanner API)と比較した場合に、トランザクションを一意に識別します。
is_last_record_in_transaction_in_partition BOOL これが現在のパーティション内のトランザクションの最後のレコードかどうかを示します。
table_name STRING 変更の影響を受けるテーブルの名前。
value_capture_type STRING

この変更がキャプチャされた際に、変更ストリーム構成で指定された値キャプチャ タイプを記述します。

現在、常に "OLD_AND_NEW_VALUES" です。

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
列の名前、列の型、主キーかどうか、スキーマで定義されている列の位置(`ordinal_position`)。スキーマの最初のテーブル列は、「1」の順序位置になります。列の型は、配列の列に対してネストできます。形式は、Spanner API リファレンスで説明されている型構造と一致します。
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
主キーの値、変更された列の古い値と新しい値(変更ストリームが「value_capture_type」で構成されている場合)など、行われた変更を記述します。 new_values フィールドと old_values フィールドには、非キー列のみが含まれます。JSON オブジェクトのメンバーは辞書順に並べ替えられます。
mod_type STRING 変更の種類を表します。INSERTUPDATEDELETE のいずれかです。
number_of_records_in_transaction INT64 すべての変更ストリーム パーティション全体でこのトランザクションに含まれるデータ変更レコードの数。
number_of_partitions_in_transaction INT64 このトランザクションのデータ変更レコードを返すパーティションの数。
transaction_tag STRING このトランザクションに関連付けられた トランザクション タグ
is_system_transaction BOOL トランザクションがシステム トランザクションかどうかを示します。

次の 2 つのサンプルデータ変更レコードがあります。2 つのアカウント間の転送がある単一のトランザクションを表します。2 つのアカウントは別々の変更ストリーム パーティションにあります。

data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
         "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

ハートビート レコード

ハートビート レコードが返される場合、commit_timestamp がハートビート レコードの timestamp 以下であるすべての変更が返され、このパーティション内の将来のデータレコードが必要であることを示します。 commit タイムスタンプがハートビート レコードよりも大きくなります。パーティションに書き込まれたデータの変更がない場合、ハートビート レコードが返されます。パーティションに書き込まれたデータの変更がある場合、heartbeat_record.timestamp の代わりに data_change_record.commit_timestamp を使用して、リーダーがパーティションの読み取りを進めていることを通知できます。

パーティションで返されるハートビート レコードを使用して、すべてのパーティションでリーダーを同期できます。すべてのリーダーがタイムスタンプ A 以上のハートビートを受信するか、タイムスタンプ A 以上のデータまたは子パーティション レコードを受信すると、受信したことを認識します。そのタイムスタンプ A 以前に commit されたすべてのレコードについて、バッファ内のレコードの処理を開始できます。たとえば、クロスパーティション レコードをタイムスタンプで並べ替え、server_transaction_id でグループ化できます。

ハートビート レコードには、次のフィールドが 1 つだけ含まれます。

フィールド タイプ 説明
timestamp TIMESTAMP ハートビート レコードのタイムスタンプ。

ハートビート レコードの例。タイムスタンプがこのレコードのタイムスタンプと同じかそれよりも小さいレコードがすべて返されたことを示しています。

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

子パーティション レコード

子パーティション レコードは、子パーティションに関する情報(パーティション トークン、親パーティションのトークン、start_timestamp子パーティションに変更レコードが含まれる最初のタイムスタンプを表します。commit タイムスタンプが child_partitions_record.start_timestamp の直前のレコードが現在のパーティションに返されます。このパーティションのすべての子パーティション レコードを返すと、このクエリは成功ステータスで返され、このパーティションのすべてのレコードが返されたことを示します。

子パーティション レコードのフィールドは次のとおりです。

フィールド タイプ 説明
start_timestamp TIMESTAMP この子パーティション レコードの子パーティションから返されたデータ変更レコードの commit タイムスタンプは、start_timestamp 以上になります。子パーティションをクエリする場合、クエリに子パーティション トークンと child_partitions_token.start_timestamp 以上の start_timestamp を指定する必要があります。1 つのパーティションによって返されるすべての子パーティション レコードの start_timestamp は同じになり、タイムスタンプは常にクエリで指定された start_timestampend_timestamp の間になります。
record_sequence STRING 特定のパーティションに同じ start_timestamp で複数の子パーティション レコードが返される場合に、子パーティション レコードの順序を定義するために使用できる単調に増加するシーケンス番号。パーティション トークン start_timestamprecord_sequence は、子パーティション レコードを一意に識別します。
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
子パーティションとそれに関連する情報のセットを返します。 これには、クエリ内の子パーティションの識別に使用されるパーティション トークン文字列と、その親パーティションのトークンが含まれます。

子パーティションのレコードの例:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"],
    }
  ],
}

変更ストリームのクエリ ワークフロー

単回使用の読み取り専用トランザクションと強力な ExecuteStreamingSql API と タイムスタンプ バウンドを使用して、変更ストリーム クエリを実行します。変更ストリーム TVF では、目的の期間の start_timestampend_timestamp を指定できます。保持期間内のすべての変更レコードには、強力な読み取り専用タイムスタンプ バウンドを使用してアクセスできます。

その他すべての TransactionOptions は、変更ストリームのクエリに対して無効です。さらに、TransactionOptions.read_only.return_read_timestamp が true に設定されている場合、トランザクションを読み取る Transaction メッセージで、有効なタイムスタンプの読み取りの代わりに、特別な値 kint64max - 1 が返されます。この特殊な値は破棄し、後続のクエリには使用しないでください。

それぞれの変更ストリーム クエリから任意の数の行を返すことができ、各行にはデータ変更レコード、ハートビート レコード、または子パーティション レコードのいずれかが含まれます。リクエストの期限を設定する必要はありません。

例:

ストリーミング クエリのワークフローでは、partition_tokenNULL に指定して最初の変更ストリーム クエリを発行します。クエリでは、変更ストリームの TVF 関数、開始と終了のタイムスタンプ、ハートビート間隔を指定する必要があります。end_timestampNULL の場合、子パーティションが作成されるまで、クエリによってデータの変更が返されます。

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:00-00",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

子パーティションのレコードが返されるまで、このクエリのデータレコードを処理します。以下の例では、2 つの子パーティション レコードと 3 つのパーティション トークンが返され、クエリは終了します。特定のクエリの子パーティション レコードは常に同じ start_timestamp を共有します。

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
      // from the initial change stream queries.
      "parent_partition_tokens": [NULL],
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL],
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL],
    }
  ],
}

2022-05-01 09:00:01-00 より後の変更を処理するために、3 つの新しいクエリを作成して並列に実行します。3 つのクエリは、親がカバーしている同じキー範囲の今後のデータ変更を返します。常に同じ子パーティション レコードの start_timestampstart_timestamp に設定し、同じ end_timestamp とハートビート間隔を使用して、すべてのクエリでレコードが一貫して処理されるようにします。

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000);

しばらくすると、別の子パーティション レコードが返されると child_token_2 のクエリが終了します。このレコードは、child_token_2child_token_3 の両方で、新しいパーティションが2022-05-01 09:30:15-00カバーされます。まったく同じレコードが child_token_3 に対するクエリによって返されます。どちらも新しい child_token_4 の親パーティションであるためです。特定のキーのデータレコードの厳密な順序付けを保証するため、次のクエリでは、child_token_4すべての親が終了した後でのみ開始する必要があります。child_token_2さらに、次のものがあるとします。child_token_3 。子パーティション トークンごとに 1 つのクエリのみを作成するため、クエリ ワークフロー設計では、child_token_4 でクエリを待機してスケジュールする親を 1 つ割り当てる必要があります。

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:30:15-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:30:15-00",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

変更ストリーム レコードの処理と解析の例について、GitHub の Apache Beam SpannerIO Dataflow コネクタで確認する。