活動與串流

Datastream 中的資料階層如下:

  • 串流:由資料來源和目的地組成。
  • 物件:串流的一部分,例如特定資料庫中的表格。
  • 「事件」:特定物件產生的單一變更,例如資料庫插入作業。

串流、物件和事件都有相關聯的資料和中繼資料。這項資料和中繼資料可用於不同用途。

關於活動

每個事件都包含三種資料:

  • 事件資料:代表來自串流來源的物件資料變更。每個事件都包含變更的整列資料。
  • 一般中繼資料:這類中繼資料會顯示在資料串流產生的每個事件中,用於執行動作,例如移除目的地中的重複資料。
  • 特定來源的中繼資料:這類中繼資料會顯示在特定串流來源產生的每個事件中。這類中繼資料會因來源而異。

事件資料

事件資料是來自串流來源的特定物件,每次異動時的酬載。

事件採用 Avro 或 JSON 格式。

使用 Avro 格式時,每個資料欄的事件都會包含資料欄索引和值。使用資料欄索引,即可從 Avro 標頭的結構定義中擷取資料欄名稱和統一類型。

使用 JSON 格式時,每個資料欄的事件都會包含資料欄名稱和值。

事件中繼資料可用於收集事件來源的相關資訊,以及移除目的地中的重複資料,並依下游消費者排序事件。

下表列出並說明一般和來源專屬事件中繼資料的欄位和資料類型。

一般中繼資料

所有類型的串流都會使用這項一致的後設資料。

欄位 Avro 類型 JSON 類型 說明
stream_name 字串 字串 建立時定義的專屬串流名稱。
read_method 字串 字串

指出資料是否是使用變更資料擷取 (CDC) 方法從來源讀取,做為歷史資料補充作業的一部分,或是做為在 CDC 複製期間交易回溯時建立的補充作業一部分。

可能的值包括:

  • oracle-cdc-logminer
  • oracle-backfill
  • oracle-supplementation
  • mysql-cdc-binlog
  • mysql-backfill-incremental
  • mysql-backfill-fulldump
  • postgres-cdc-wal
  • postgresql-backfill
  • salesforce-cdc
  • salesforce-backfill
object 字串 字串 用於將不同類型的事件分組的名稱,通常是來源中的資料表或物件名稱。
schema_key 字串 字串 事件統一結構定義的專屬 ID。
uuid 字串 字串 Datastream 產生的事件專屬 ID。
read_timestamp timestamp-millis 字串 Datastream 讀取記錄的時間戳記 (世界標準時間),以毫秒為單位的 Epoch 時間戳記。
source_timestamp timestamp-millis 字串 記錄在來源端變更時的時間戳記 (世界標準時間),以毫秒為單位的 Epoch 紀元時間戳記。
sort_keys {"type": "array", "items": ["string", "long"]} 陣列 可用於依事件發生順序排序事件的值陣列。

來源專屬中繼資料

這項中繼資料與來源資料庫的 CDC 和回填事件相關聯。如要查看這項中繼資料,請從下方的下拉式選單中選取來源。

來源 欄位 Avro 類型 JSON 類型 說明
MySQL log_file 字串 字串 Datastream 在 CDC 複製作業中擷取事件的記錄檔。
MySQL log_position long long MySQL 二進位記錄檔中的記錄位置 (位移)。
MySQL primary_keys 字串陣列 字串陣列 組成資料表主鍵的一或多個資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。
MySQL is_deleted 布林值 布林值
  • true 值表示來源中的資料列已刪除。
  • false 值表示資料列未刪除。
MySQL database 字串 字串 與事件相關聯的資料庫。
MySQL table 字串 字串 與活動相關的表格。
MySQL change_type 字串 字串

事件代表的變更類型 (INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle log_file 字串 字串 Datastream 在 CDC 複製作業中擷取事件的記錄檔。
Oracle scn long long Oracle 交易記錄中的記錄位置 (位移)。
Oracle row_id 字串 字串 Oracle 的 row_id
Oracle is_deleted 布林值 布林值
  • true 值表示來源中的資料列已刪除。
  • false 值表示資料列未刪除。
Oracle database 字串 字串 與事件相關聯的資料庫。
Oracle schema 字串 字串 與事件中的表格相關聯的結構定義。
Oracle table 字串 字串 與活動相關的表格。
Oracle change_type 字串 字串

事件代表的變更類型 (INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle tx_id 字串 字串 事件所屬的交易 ID。
Oracle rs_id 字串 字串 記錄集 ID。rs_idssn 的配對可唯一識別 V$LOGMNR_CONTENTS 中的資料列。rs_id 可唯一識別產生資料列的重做記錄。
Oracle ssn long long SQL 序號。這個數字會與 rs_id 一併使用,用於在 V$LOGMNR_CONTENTS 中識別資料列。
PostgreSQL schema 字串 字串 與事件中的表格相關聯的結構定義。
PostgreSQL table 字串 字串 與活動相關的表格。
PostgreSQL is_deleted 布林值 布林值
  • true 值表示來源中的資料列已刪除。
  • false 值表示資料列未刪除。
PostgreSQL change_type 字串 字串 事件代表的變更類型 (INSERTUPDATEDELETE)。
PostgreSQL tx_id 字串 字串 事件所屬的交易 ID。
PostgreSQL lsn 字串 字串 目前項目的記錄序號。
PostgreSQL primary_keys 字串陣列 字串陣列 組成資料表主鍵的一或多個資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。
SQL Server table 字串 字串 與活動相關的表格。
SQL Server database long long 與事件相關聯的資料庫。
SQL Server schema 字串陣列 字串陣列 與事件中的表格相關聯的結構定義。
SQL Server is_deleted 布林值 布林值
  • true 值表示來源中的資料列已刪除。
  • false 值表示資料列未刪除。
SQL Server lsn 字串 字串 事件的記錄序號。
SQL Server tx_id 字串 字串 事件所屬的交易 ID。
SQL Server physical_location 整數陣列 整數陣列 記錄檔記錄的實體位置,以三個整數表示:記錄的檔案 ID、頁面 ID 和時段 ID。
SQL Server replication_index 字串陣列 字串陣列 索引的資料欄名稱清單,可用於在資料表中找出不重複的資料列。
SQL Server change_type 字串 字串

事件代表的變更類型 (INSERTUPDATEDELETE)。

Salesforce object_name 字串 字串

與事件相關聯的 Salesforce 物件名稱。

Salesforce domain 字串 字串

與事件相關聯的網域名稱。

Salesforce is_deleted 布林值 布林值
  • true 值表示來源中的資料列已刪除。
  • false 值表示資料列未刪除。
Salesforce change_type 字串 字串

事件代表的變更類型 (INSERTUPDATEDELETE)。

Salesforce primary_keys 字串陣列 字串陣列 構成資料表主鍵的資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。
MongoDB database 字串 字串 與事件相關聯的資料庫。
MongoDB collection 字串 字串 與活動相關的收藏內容。集合類似於關聯式資料庫中的資料表。
MongoDB change_type 字串 字串 事件代表的變更類型 (CREATEUPDATEDELETE)。
MongoDB is_deleted 布林值 布林值
  • true 值表示來源中的資料列已刪除。
  • false 值表示資料列未刪除。
MongoDB primary_keys 字串陣列 字串陣列 _id 欄位,做為集合中每個文件的主要鍵。

事件流程範例

這個流程說明連續三項作業 (INSERTUPDATEDELETE) 在來源資料庫的 SAMPLE 資料表單一資料列中產生的事件。

時間 THIS_IS_MY_PK (int) FIELD1 (nchar 可為空值) FIELD2 (nchar non-null)>
0 1231535353 foo TLV
1 1231535353 空值 TLV

INSERT (T0)

訊息酬載包含新資料列的所有內容。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",  
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id": 
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}

更新 (T1)

訊息酬載包含新資料列的所有內容。不含先前的值。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",  
  "source_metadata": {
    "log_file": 
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

DELETE (T2)

訊息酬載包含新資料列的所有內容。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file": 
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

排序和一致性

本節說明 Datastream 如何處理排序和一致性。

排序

Datastream 不保證順序,但每個事件都包含完整資料列,以及資料寫入來源的時間戳記。在 BigQuery 中,系統會自動以正確順序合併順序錯誤的事件。BigQuery 會使用事件中繼資料和內部變更序號 (CSN),以正確順序將事件套用至資料表。在 Cloud Storage 中,同一時間的事件可能會跨越多個檔案。

如果事件是為在啟動串流時建立的初始資料回填作業而產生,則事件順序錯亂是正常現象。

系統會根據來源推斷順序。

來源 說明
MySQL

初始回填作業的事件 read_method 欄位會以 mysql-backfill 開頭。回填中的事件接收順序不重要,因為事件可以依任意順序使用。

如果事件屬於持續進行的複製作業,read_method 欄位會設為 mysql-cdc-binlog

您可以從 log_file 欄位和記錄檔中偏移的 log_position 欄位組合,推斷出順序。這個組合提供一個獨一無二的遞增數字,用於識別資料庫中的作業順序。

Oracle

初始回填作業的事件 read_method 欄位會以 oracle-backfill 開頭。回填中的事件接收順序不重要,因為事件可以依任意順序使用。

如果事件屬於持續進行的複製作業,read_method 欄位會設為 oracle-cdc-logminer

您可以根據 rs_id (記錄集 ID) 欄位和 ssn (SQL 序號) 欄位的組合推斷順序。這個組合提供一個獨一無二的遞增數字,用於識別資料庫中的作業順序。

PostgreSQL

初始回填作業的事件 read_method 欄位會以 postgresql-backfill 開頭。回填中的事件接收順序不具任何意義,因為事件可依任意順序使用。

如果事件屬於持續進行的複製作業,read_method 欄位會設為 postgres-cdc-wal

您可以根據 source_timestamp 欄位和 lsn (記錄序號) 欄位的組合推斷順序。這個組合提供一個獨一無二的遞增數字,用於識別資料庫中的作業順序。

SQL Server

初始回填作業的事件 read_method 欄位會以 sqlserver-backfill 開頭。回填中的事件接收順序不具任何意義,因為事件可依任意順序使用。

如果事件屬於持續進行的複製作業,read_method 欄位會設為 sqlserver-cdc

您可以根據 source_timestamp 欄位和 lsn (記錄序號) 欄位的組合推斷順序。這個組合提供一個獨一無二的遞增數字,用於識別資料庫中的作業順序。

Salesforce (預覽版)

您可以使用記錄的 source_timestamp 做為排序鍵,決定記錄的順序。Salesforce 中的時間戳記解析度為一秒,但同一筆記錄不能在同一秒發生兩個變更事件。

MongoDB (預先發布版)

您可以透過作業記錄中的 ts 欄位,或記錄變更串流中的 clusterTime 欄位,判斷順序。每筆記錄的欄位都不重複。

一致性

Datastream 會確保來源資料庫的資料至少傳送至目的地一次。不會遺漏任何事件,但串流中可能會出現重複事件。重複事件的偵測時間範圍應以分鐘為單位,並可使用事件中繼資料中的事件通用唯一識別碼 (UUID) 偵測重複事件。

如果資料庫記錄檔含有未修訂的交易,且有任何交易遭到回溯,資料庫就會在記錄檔中以「反向」資料操縱語言 (DML) 作業反映這項情況。舉例來說,如果 INSERT 作業已回溯,就會有對應的 DELETE 作業。Datastream 會從記錄檔讀取這些作業。

關於串流

每個串流都有中繼資料,用於說明串流和擷取資料的來源。中繼資料則含有串流名稱、來源和目的地連線設定檔等資訊。

如要查看 Stream 物件的完整定義,請參閱 API 參考資料說明文件。

串流狀態

串流可以是下列其中一種狀態:

  • Not started
  • Starting
  • Running
  • Draining
  • Paused
  • Failed
  • Failed permanently

您可以使用記錄檔找出其他狀態資訊,例如資料表回填或處理的列數。您也可以使用 FetchStreamErrors API 擷取錯誤。

可透過探索 API 取得的物件中繼資料

探索 API 會傳回物件,代表連線設定檔所代表資料來源或目的地中定義的物件結構。每個物件都有物件本身的中繼資料,以及所擷取每個資料欄位的中繼資料。您可以使用 Discover API 取得這項中繼資料。

後續步驟