イベントとストリーム

概要

データストリームには、次のデータ階層があります。

  • ストリーム。データのソースと宛先で構成されます。
  • オブジェクト。特定のデータベースのテーブルなど、ストリームの一部です。
  • イベント。データベースの挿入など、特定のオブジェクトによって生成される 1 つの変更です。

ストリーム、オブジェクト、イベントには、データとメタデータが関連付けられています。このデータとメタデータは、さまざまな目的で使用できます。

イベントについて

各イベントは、次の 3 種類のデータで構成されます。

  • イベントデータ: ストリームソースから生じたオブジェクトからデータ自体への変更を表します。すべてのイベントには、変更された行全体が含まれます。
  • 汎用メタデータ: このメタデータは、データストリームによって生成されたすべてのイベントに含まれ、宛先で重複データを削除するなどのアクションに使用されます。
  • ソース固有のメタデータ: このメタデータは、特定のストリーム元によって生成されたすべてのイベントに含まれます。このメタデータは、ソースによって異なります。

イベントデータ

イベントデータは、ストリーム元から生じた特定のオブジェクトからの変更すべてのペイロードです。

イベントは Avro 形式または JSON 形式です。

Avro 形式を使用する場合、イベントには、各列ごとに列インデックスと値が含まれます。列インデックスを使用して、Avro ヘッダーのスキーマから列名、統合タイプを取得できます。

JSON 形式を使用する場合、イベントには、各列ごとに列名と値が含まれます。

イベント メタデータは、イベントの発生元に関する情報の収集だけでなく、宛先での重複データの削除や、下流の利用者によるイベントの並べ替えにも使用できます。

以下の表では、汎用およびソース固有のイベント メタデータのフィールドとデータ型を示します。

汎用メタデータ

このメタデータは、すべての種類のストリームに共通するものです。

フィールド Avro 型 JSON 型 説明
stream_name 文字列 文字列 作成時に定義された一意のストリーミング名。
read_method 文字列 文字列

ソースからのデータの読み取りが、変更データ キャプチャ(CDC)メソッドを使用したものか、履歴バックフィルの一部としてのものか、または CDC レプリケーション中にトランザクションがロールバックされたときに作成された補完タスクの一部としてのものかを示します。

有効な値は次のとおりです。

  • oracle-cdc-logminer
  • oracle-backfill
  • oracle-supplementation
  • mysql-cdc-binlog
  • mysql-backfill-incremental
  • mysql-backfill-fulldump
  • postgres-cdc-wal
  • postgresql-backfill
object 文字列 文字列 さまざまな種類のイベントをグループ化するために使用される名前。通常は、ソース内のテーブルかオブジェクトの名前です。
schema_key 文字列 文字列 イベントの統合スキーマの一意の識別子。
uuid 文字列 文字列 データストリームが生成するイベントの一意の識別子。
read_timestamp timestamp-millis 文字列 データストリームがレコードを読み取ったタイムスタンプ(UTC)。ミリ秒単位のエポック タイムスタンプ。
source_timestamp timestamp-millis 文字列 ソースでレコードが変更されたときのタイムスタンプ(UTC)。ミリ秒単位のエポック タイムスタンプ。
sort_keys {"type": "array", "items": ["string", "long"]} 配列 イベントを発生順に並べ替えるために使用できる値の配列。

ソース固有のメタデータ

このメタデータは、ソース データベースの CDC イベントとバックフィル イベントに関連付けられます。このメタデータを表示するには、下のプルダウン メニューからソースを選択します。

ソース フィールド Avro 型 JSON 型 説明
MySQL log_file 文字列 文字列 CDC レプリケーションで、データストリームがイベントを取得するログファイル。
MySQL log_position long long MySQL バイナリログ内のログの位置(オフセット)。
MySQL primary_keys 文字列配列 文字列配列 テーブルの主キーを構成する 1 つ以上の列名のリスト。テーブルに主キーがない場合、このフィールドは空です。
MySQL is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
MySQL database 文字列 文字列 イベントに関連付けられているデータベース。
MySQL table 文字列 文字列 イベントに関連付けられているテーブル。
MySQL change_type 文字列 文字列

イベントが表す変更の種類(INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle log_file 文字列 文字列 CDC レプリケーションで、データストリームがイベントを取得するログファイル。
Oracle scn long long Oracle トランザクション ログ内のログの位置(オフセット)。
Oracle row_id 文字列 文字列 Oracle の row_id
Oracle is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
Oracle database 文字列 文字列 イベントに関連付けられているデータベース。
Oracle schema 文字列 文字列 イベントからテーブルに関連付けられているスキーマ。
Oracle table 文字列 文字列 イベントに関連付けられているテーブル。
Oracle change_type 文字列 文字列

イベントが表す変更の種類(INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle tx_id 文字列 文字列 イベントが属するトランザクション ID。
Oracle rs_id 文字列 文字列 レコードセット ID。rs_idssn の結合は、V$LOGMNR_CONTENTS 内の行を一意に識別します。rs_id は、行を生成する REDO レコードを一意に識別します。
Oracle ssn long long SQL シーケンス番号。この番号は rs_id で使用され、V$LOGMNR_CONTENTS 内の行を一意に識別します。
PostgreSQL schema 文字列 文字列 イベントからテーブルに関連付けられているスキーマ。
PostgreSQL table 文字列 文字列 イベントに関連付けられているテーブル。
PostgreSQL is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
PostgreSQL change_type 文字列 文字列 イベントが表す変更の種類(INSERTUPDATEDELETE)。
PostgreSQL tx_id 文字列 文字列 イベントが属するトランザクション ID。
PostgreSQL lsn 文字列 文字列 現在のエントリのログ シーケンス番号。
PostgreSQL primary_keys 文字列配列 文字列配列 テーブルの主キーを構成する 1 つ以上の列名のリスト。テーブルに主キーがない場合、このフィールドは空です。
SQL Server table 文字列 文字列 イベントに関連付けられているテーブル。
SQL Server database long long イベントに関連付けられているデータベース。
SQL Server schema 文字列配列 文字列配列 イベントからテーブルに関連付けられているスキーマ。
SQL Server is_deleted ブール値 ブール値
  • true の値は、ソースで行が削除されたことを示します。
  • false の値は、行が削除されなかったことを示します。
SQL Server lsn 文字列 文字列 イベントのログ シーケンス番号。
SQL Server tx_id 文字列 文字列 イベントが属するトランザクション ID。
SQL Server physical_location 整数配列 整数配列 ログレコードの物理的な場所。ファイル ID、ページ ID、レコードのスロット ID の 3 つの整数で表されます。
SQL Server replication_index 文字列配列 文字列配列 テーブル内の行を一意に識別できるインデックスの列名のリスト。
SQL Server change_type 文字列 文字列

イベントが表す変更の種類(「INSERT」、「UPDATE」、「DELETE」)。

イベントフローの例

このフローでは、ソース データベースの SAMPLE テーブルの単一行に対して、3 つの連続したオペレーション(INSERTUPDATEDELETE)によって生成されるイベントを示します。

TIME THIS_IS_MY_PK(int) FIELD1(nchar、Null も可) FIELD2(nchar、Null も可)>
0 1231535353 foo TLV
1 1231535353 NULL TLV

INSERT(T0)

メッセージ ペイロードは、新しい行全体で構成されます。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",  
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id": 
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}

UPDATE(T1)

メッセージ ペイロードは、新しい行全体で構成されます。以前の値は含まれません。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",  
  "source_metadata": {
    "log_file": 
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

DELETE(T2)

メッセージ ペイロードは、新しい行全体で構成されます。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file": 
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

順序と整合性

このセクションでは、データストリームによる順序と整合性の扱いについて説明します。

注文

Datastream は順序を保証するものではありませんが、各イベントにはデータの行全体と、データがソースに書き込まれたタイムスタンプが含まれています。BigQuery では、順序が異なるイベントは自動的に正しい順序で統合されます。BigQuery は、イベント メタデータと内部変更シーケンス番号(CSN)を使用して、イベントをテーブルに正しい順序で適用します。Cloud Storage では、同じ時刻のイベントが複数のファイルにまたがる場合があります。

設計上、ストリームの開始時に作成されたデータの最初のバックフィルのためにイベントがバックフィルされた場合に、順不同で生成されるイベントが発生します。

順序はソースごとに推測できます。

ソース 説明
MySQL

最初のバックフィルの一部であるイベントの read_method フィールドは mysql-backfill で始まります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは mysql-cdc-binlog に設定されます。

順序は、log_file フィールドと、ログファイルからオフセットされた log_position フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

Oracle

最初のバックフィルの一部であるイベントの read_method フィールドは oracle-backfill で始まります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは oracle-cdc-logminer に設定されます。

順序は rs_id(レコードセット ID)フィールドと ssn(SQL シーケンス番号)フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

PostgreSQL

最初のバックフィルの一部であるイベントの read_method フィールドは postgresql-backfill で始まります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは postgres-cdc-wal に設定されます。

順序は source_timestamp フィールドと lsn(ログシーケンス番号)フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

SQL Server

最初のバックフィルの一部であるイベントの read_method フィールドは sqlserver-backfill で始まります。イベントは任意の順序で使用できるため、バックフィル内でイベントを受信する順序には影響しません。

イベントが進行中のレプリケーションの一部の場合、read_method フィールドは sqlserver-cdc に設定されます。

順序は source_timestamp フィールドと lsn(ログシーケンス番号)フィールドの組み合わせによって推測できます。この組み合わせにより、データベース内のオペレーションの順序を識別する一意の数値が徐々に増加します。

整合性

データストリームでは、ソース データベースのデータが宛先に少なくとも 1 回配信されることが保証されます。イベントの取りこぼしはありせんが、ストリーム内でイベントが重複する可能性があります。重複するイベントの長さは数分程度にする必要があります。また、重複はイベント メタデータに含まれるイベントの UUID(Universally Unique Identifier)を使用して検出できます。

データベース ログファイルに commit されていないトランザクションが含まれている場合、トランザクションがロールバックされると、データベースはこれをログファイルに「逆」のデータ操作言語(DML)オペレーションとして反映します。たとえば、ロールバックした INSERT オペレーションには、対応する DELETE オペレーションが含まれます。Datastream は、これらのオペレーションをログファイルから読み取ります。

ストリームについて

すべてのストリームには、ストリームとデータを取得するソースの両方を記述するメタデータがあります。このメタデータには、ストリーム名、ソースと宛先の接続プロファイルなどの情報が含まれます。

Stream オブジェクトの完全な定義を確認するには、API リファレンス ドキュメントをご覧ください。

ストリームの状態とステータス

ストリームがとり得る状態は、次のとおりです。

  • Not started
  • Starting
  • Running
  • Draining
  • Paused
  • Failed
  • Failed permanently

ログを使用して、テーブルのバックフィル、処理された行数などの追加のステータス情報を確認できます。FetchStreamErrors API を使用してエラーを取得することもできます。

Discover API で使用可能なオブジェクト メタデータ

discover API では、接続プロファイルによって表されるデータソースや宛先で定義されたオブジェクトの構造を表すオブジェクトが返されます。各オブジェクトには、オブジェクト自体のメタデータに加え、取得するデータのすべてのフィールドに関するメタデータがあります。このメタデータは、discover API で取得できます。

次のステップ