イベントとストリーム

概要

データストリームには、次のデータ階層があります。

  • ストリーム。データのソースと宛先で構成されます。
  • オブジェクト。特定のデータベースのテーブルなど、ストリームの一部です。
  • イベント。データベースの挿入など、特定のオブジェクトによって生成される 1 つの変更です。

ストリーム、オブジェクト、イベントには、データとメタデータが関連付けられています。このデータおよびメタデータは、さまざまな目的で使用できます。

イベントについて

各イベントには、次の 3 種類のデータが含まれます。

  • イベントデータ: ストリームソースから取得されるオブジェクトからのデータ自体の変更を表します。すべてのイベントには、変更された行全体が含まれます。
  • 汎用メタデータ: このメタデータは、データストリームによって生成されたすべてのイベントに含まれ、宛先で重複データを削除するなどのアクションに使用されます。
  • ソース固有のメタデータ: このメタデータは、特定のストリーム元によって生成されたすべてのイベントに含まれます。このメタデータは、ソースによって異なります。

イベントデータ

イベントデータは、ストリーム元から生じた特定のオブジェクトからの変更すべてのペイロードです。

イベントは、Avro または JSON 形式のいずれかになります。Avro 形式の場合、各列について、イベントには列インデックスと値が含まれます。列インデックスを使用すると、Avro ヘッダーのスキーマから列名と統合型を取得できます。

JSON 形式を使用する場合、イベントには、各列ごとに列名と値が含まれます。

イベント メタデータは、イベントの発生元に関する情報の収集だけでなく、宛先での重複データの削除や、下流の利用者によるイベントの並べ替えにも使用できます。

以下の表では、汎用およびソース固有のイベント メタデータのフィールドとデータ型を示します。

汎用メタデータ

このメタデータは、すべての種類のストリームに共通するものです。

項目 Avro 型 JSON 型 説明
stream_name 文字列 文字列 作成時に定義された一意のストリーム名。
read_method 文字列 文字列

ソースからのデータの読み取りが、変更データ キャプチャ(CDC)メソッドを使用したものか、履歴バックフィルの一部としてのものか、または CDC レプリケーション中にトランザクションがロールバックされたときに作成された補完タスクの一部としてのものかを示します。

有効な値は次のとおりです。

  • oracle-cdc-logminer
  • oracle-backfill
  • oracle-supplementation
  • mysql-cdc-binlog
  • mysql-backfill-incremental
  • mysql-backfill-fulldump
  • postgres-cdc-wal
  • postgresql-backfill
object 文字列 文字列 さまざまな種類のイベントをグループ化するために使用される名前。通常は、ソース内のテーブルかオブジェクトの名前です。
schema_key 文字列 文字列 イベントの統合スキーマの一意の識別子。
uuid 文字列 文字列 データストリームが生成するイベントの一意の識別子。
read_timestamp timestamp-millis 文字列 データストリームがレコードを読み取ったタイムスタンプ(UTC)。ミリ秒単位のエポック タイムスタンプ。
source_timestamp timestamp-millis 文字列 ソースでレコードが変更されたときのタイムスタンプ(UTC)。ミリ秒単位のエポック タイムスタンプ。
sort_keys {"type": "array", "items": ["string", "long"]} 配列 イベントを発生順に並べ替えるために使用できる値の配列。

ソース固有のメタデータ

このメタデータは、CDC と、ソース データベースからのバックフィル イベントに関連付けられます。このメタデータを表示するには、下のプルダウン メニューからソースを選択します。

ソース 項目 Avro 型 JSON 型 説明
MySQL log_file 文字列 文字列 CDC レプリケーションで、データストリームがイベントを取得するログファイル。
MySQL log_position long long MySQL バイナリログ内のログの位置(オフセット)。
MySQL primary_keys 文字列配列 文字列配列 テーブルの主キーを構成する(1 つ以上の)列名のリスト。テーブルに主キーがない場合、このフィールドは空です。
MySQL is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
MySQL database 文字列 文字列 イベントに関連付けられているデータベース。
MySQL table 文字列 文字列 イベントに関連付けられているテーブル。
MySQL change_type 文字列 文字列

イベントが表す変更の種類(INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle log_file 文字列 文字列 CDC レプリケーションで、データストリームがイベントを取得するログファイル。
Oracle scn long long Oracle トランザクション ログ内のログの位置(オフセット)。
Oracle row_id 文字列 文字列 Oracle の row_id
Oracle is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
Oracle database 文字列 文字列 イベントに関連付けられているデータベース。
Oracle schema 文字列 文字列 イベントからテーブルに関連付けられているスキーマ。
Oracle table 文字列 文字列 イベントに関連付けられているテーブル。
Oracle change_type 文字列 文字列

イベントが表す変更の種類(INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle tx_id 文字列 文字列 イベントが属するトランザクション ID。
Oracle rs_id 文字列 文字列 レコードセット ID。rs_idssn の結合により、V$LOGMNR_CONTENTS 内の行が一意に識別されます。rs_id は、行を生成した REDO レコードを一意に識別します。
Oracle ssn long long SQL シーケンス番号。この番号は rs_id とともに使用され、V$LOGMNR_CONTENTS 内の行を一意に識別します。
PostgreSQL schema 文字列 文字列 イベントからテーブルに関連付けられているスキーマ。
PostgreSQL table 文字列 文字列 イベントに関連付けられているテーブル。
PostgreSQL is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
PostgreSQL change_type 文字列 文字列 イベントが表す変更の種類(INSERTUPDATEDELETE)。
PostgreSQL tx_id 文字列 文字列 イベントが属するトランザクション ID。
PostgreSQL lsn 文字列 文字列 現在のエントリのログシーケンス番号。
PostgreSQL primary_keys 文字列配列 文字列配列 テーブルの主キーを構成する(1 つ以上の)列名のリスト。テーブルに主キーがない場合、このフィールドは空です。
SQL Server table 文字列 文字列 イベントに関連付けられているテーブル。
SQL Server database long long イベントに関連付けられているデータベース。
SQL Server schema 文字列配列 文字列配列 イベントからテーブルに関連付けられているスキーマ。
SQL Server is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
SQL Server lsn 文字列 文字列 イベントのログ シーケンス番号。
SQL Server tx_id 文字列 文字列 イベントが属するトランザクション ID。
SQL Server physical_location 整数配列 整数配列 ログレコードの物理的な場所。3 つの整数値(ファイル ID、ページ ID、レコードのスロット ID)で記述されます。
SQL Server replication_index 文字列配列 文字列配列 テーブル内の行を一意に識別できるインデックスの列名のリスト。
SQL Server change_type 文字列 文字列

イベントが表す変更の種類(INSERT、UPDATE、DELETE)。

イベントフローの例

このフローでは、ソース データベースの SAMPLE テーブルの単一行に対して、3 つの連続したオペレーション(INSERTUPDATEDELETE)によって生成されるイベントを示します。

時間 THIS_IS_MY_PK(int) FIELD1(nchar、Null も可) FIELD2(nchar、Null も可)>
0 1231535353 foo TLV
1 1231535353 NULL TLV

INSERT(T0)

メッセージ ペイロードは新しい行全体で構成されます。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id":
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}

UPDATE(T1)

メッセージ ペイロードは新しい行全体で構成されます。以前の値は含まれません。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",
  "source_metadata": {
    "log_file":
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

DELETE(T2)

メッセージ ペイロードは新しい行全体で構成されます。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file":
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

順序と整合性

このセクションでは、データストリームによる順序と整合性の扱いについて説明します。

注文

Datastream は順序を保証するものではありませんが、各イベントにはデータの完全な行と、データがソースに書き込まれたタイムスタンプが含まれます。BigQuery では、順不同のイベントが正しい順序で自動的にマージされます。BigQuery は、イベント メタデータと内部変更シーケンス番号(CSN)を使用して、イベントを正しい順序でテーブルに適用します。Cloud Storage では、同じ時刻からのイベントが複数のファイルにまたがることがあります。

設計上、ストリームの開始時に作成されたデータの最初のバックフィルのためにイベントがバックフィルされた場合に、順不同で生成されるイベントが発生します。

順序はソースごとに推測できます。

ソース 説明
MySQL

最初のバックフィルの一部であるイベントには、mysql-backfill で始まる read_method フィールドがあります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは mysql-cdc-binlog に設定されます。

順序は、log_file フィールドと、ログファイルからオフセットされた log_position フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

Oracle

最初のバックフィルの一部であるイベントには、oracle-backfill で始まる read_method フィールドがあります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは oracle-cdc-logminer に設定されます。

順序は rs_id(レコードセット ID)フィールドと ssn(SQL シーケンス番号)フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

PostgreSQL

最初のバックフィルの一部であるイベントには、postgresql-backfill で始まる read_method フィールドがあります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは postgres-cdc-wal に設定されます。

順序は source_timestamp フィールドと lsn(ログシーケンス番号)フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

SQL Server

最初のバックフィルの一部であるイベントには、sqlserver-backfill で始まる read_method フィールドがあります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは sqlserver-cdc に設定されます。

順序は source_timestamp フィールドと lsn(ログシーケンス番号)フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

整合性

データストリームでは、ソース データベースのデータが宛先に少なくとも 1 回配信されることが保証されます。イベントの取りこぼしはありせんが、ストリーム内でイベントが重複する可能性があります。重複するイベントの長さは数分程度にする必要があります。また、重複はイベント メタデータに含まれるイベントの UUID(Universally Unique Identifier)を使用して検出できます。

データベース ログファイルに commit されていないトランザクションが含まれている場合、トランザクションがロールバックされると、データベースはこれをログファイルに「reverse」データ操作言語(DML)オペレーションとして反映します。たとえば、ロールバックした INSERT オペレーションには、対応する DELETE オペレーションが含まれます。Datastream は、これらのオペレーションをログファイルから読み取ります。

ストリームについて

すべてのストリームには、ストリームと、データを pull するソースの両方を記述するメタデータがあります。このメタデータには、ストリーム名、ソースと宛先の接続プロファイルなどの情報が含まれています。

Stream オブジェクトの完全な定義を確認するには、API リファレンス ドキュメントをご覧ください。

ストリームの状態とステータス

ストリームがとり得る状態は、次のとおりです。

  • Not started
  • Starting
  • Running
  • Draining
  • Paused
  • Failed
  • Failed permanently

ログを使用して、テーブルのバックフィル、処理された行数などの追加のステータス情報を確認できます。FetchStreamErrors API を使用してエラーを取得することもできます。

Discover API で使用可能なオブジェクト メタデータ

discover API では、接続プロファイルによって表されるデータソースや宛先で定義されたオブジェクトの構造を表すオブジェクトが返されます。各オブジェクトには、オブジェクト自体のメタデータに加え、取得するデータのすべてのフィールドに関するメタデータがあります。このメタデータは、discover API で取得できます。