Datastream 中的資料階層如下:
- 串流:由資料來源和目的地組成。
 - 物件:串流的一部分,例如特定資料庫中的表格。
 - 「事件」:特定物件產生的單一變更,例如資料庫插入作業。
 
串流、物件和事件都有相關聯的資料和中繼資料。這項資料和中繼資料可用於不同用途。
事件簡介
每個事件都包含三種資料:
- 事件資料:代表來自串流來源的物件資料變更。每個事件都包含變更的整列資料。
 - 一般中繼資料:這類中繼資料會顯示在資料串流產生的每個事件中,用於執行動作,例如移除目的地中的重複資料。
 - 特定來源的中繼資料:這類中繼資料會顯示在特定串流來源產生的每個事件中。這類中繼資料會因來源而異。
 
事件資料
事件資料是來自串流來源的特定物件,每次異動時的酬載。
事件格式為 Avro 或 JSON。
使用 Avro 格式時,每個資料欄的事件都會包含資料欄索引和值。使用資料欄索引,即可從 Avro 標頭的結構定義中擷取資料欄名稱和統一類型。
使用 JSON 格式時,每個資料欄的事件都會包含資料欄名稱和值。
事件中繼資料可用於收集事件來源的相關資訊,以及移除目的地中的重複資料,並依下游消費者排序事件。
下表列出並說明一般和來源專屬事件中繼資料的欄位和資料類型。
一般中繼資料
所有類型的串流都使用一致的這類中繼資料。
| 欄位 | Avro 類型 | JSON 類型 | 說明 | 
|---|---|---|---|
stream_name | 
    字串 | 字串 | 建立時定義的專屬串流名稱。 | 
read_method | 
    字串 | 字串 | 指出資料是否是使用變更資料擷取 (CDC) 方法從來源讀取,做為歷史資料補充作業的一部分,或是做為在 CDC 複製期間交易回溯時建立的補充作業一部分。 可能的值包括: 
  | 
  
object | 
    字串 | 字串 | 用於將不同類型的事件分組的名稱,通常是來源中的資料表或物件名稱。 | 
schema_key | 
  字串 | 字串 | 事件統一架構的專屬 ID。 | 
uuid | 
    字串 | 字串 | Datastream 產生的事件專屬 ID。 | 
read_timestamp | 
    timestamp-millis | 字串 | Datastream 讀取記錄的時間戳記 (世界標準時間),以毫秒為單位的 Epoch 時間戳記。 | 
source_timestamp | 
    timestamp-millis | 字串 | 記錄在來源上變更時的時間戳記 (世界標準時間),以毫秒為單位的 Epoch 紀元時間戳記。 | 
sort_keys | 
    {"type": "array", "items": ["string", "long"]} | 
    陣列 | 可用於依事件發生順序排序事件的值陣列。 | 
來源專屬中繼資料
這項中繼資料與來源資料庫的 CDC 和回填事件相關聯。如要查看這項中繼資料,請從下方的下拉式選單中選取來源。
| 來源 | 欄位 | Avro 類型 | JSON 類型 | 說明 | 
|---|---|---|---|---|
| MySQL | log_file | 
    字串 | 字串 | Datastream 在 CDC 複製作業中擷取事件的記錄檔。 | 
| MySQL | log_position | 
    long | long | MySQL 二進位記錄檔中的記錄位置 (位移)。 | 
| MySQL | primary_keys | 
    字串陣列 | 字串陣列 | 組成資料表主鍵的一或多個資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會空白。 | 
| MySQL | is_deleted | 
    布林值 | 布林值 | 
      
  | 
    
| MySQL | database | 
    字串 | 字串 | 與事件相關聯的資料庫。 | 
| MySQL | table | 
    字串 | 字串 | 與活動相關聯的表格。 | 
| MySQL | change_type | 
    字串 | 字串 | 事件代表的變更類型 (  | 
  
| Oracle | log_file | 
    字串 | 字串 | Datastream 在 CDC 複製作業中擷取事件的記錄檔。 | 
| Oracle | scn | 
    long | long | Oracle 交易記錄中的記錄位置 (位移)。 | 
| Oracle | row_id | 
    字串 | 字串 | Oracle 的 row_id。 | 
| Oracle | is_deleted | 
    布林值 | 布林值 | 
      
  | 
  
| Oracle | database | 
    字串 | 字串 | 與事件相關聯的資料庫。 | 
| Oracle | schema | 
    字串 | 字串 | 與事件中的表格相關聯的結構定義。 | 
| Oracle | table | 
    字串 | 字串 | 與活動相關聯的表格。 | 
| Oracle | change_type | 
    字串 | 字串 | 事件代表的變更類型 (  | 
  
| Oracle | tx_id | 
    字串 | 字串 | 事件所屬的交易 ID。 | 
| Oracle | rs_id | 
    字串 | 字串 | 記錄集 ID。rs_id 和 ssn 的配對可唯一識別 V$LOGMNR_CONTENTS 中的資料列。rs_id 可唯一識別產生資料列的重做記錄。 | 
  
| Oracle | ssn | 
    long | long | SQL 序號。這個數字會與 rs_id 一併使用,用於識別 V$LOGMNR_CONTENTS 中的資料列。 | 
  
| PostgreSQL | schema | 
    字串 | 字串 | 與事件中的表格相關聯的結構定義。 | 
| PostgreSQL | table | 
    字串 | 字串 | 與活動相關聯的表格。 | 
| PostgreSQL | is_deleted | 
    布林值 | 布林值 | 
      
  | 
    
| PostgreSQL | change_type | 
    字串 | 字串 | 事件代表的變更類型 (INSERT、UPDATE、DELETE)。
     | 
  
| PostgreSQL | tx_id | 
    字串 | 字串 | 事件所屬的交易 ID。 | 
| PostgreSQL | lsn | 
    字串 | 字串 | 目前項目的記錄序號。 | 
| PostgreSQL | primary_keys | 
    字串陣列 | 字串陣列 | 組成資料表主鍵的一或多個資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會空白。 | 
| SQL Server | table | 
    字串 | 字串 | 與活動相關聯的表格。 | 
| SQL Server | database | 
    long | long | 與事件相關聯的資料庫。 | 
| SQL Server | schema | 
    字串陣列 | 字串陣列 | 與事件中的表格相關聯的結構定義。 | 
| SQL Server | is_deleted | 
    布林值 | 布林值 | 
    
  | 
    
| SQL Server | lsn | 
    字串 | 字串 | 事件的記錄序號。 | 
| SQL Server | tx_id | 
    字串 | 字串 | 事件所屬的交易 ID。 | 
| SQL Server | physical_location | 
    整數陣列 | 整數陣列 | 記錄檔記錄的實體位置,以三個整數表示:記錄的檔案 ID、頁面 ID 和時段 ID。 | 
| SQL Server | replication_index | 
    字串陣列 | 字串陣列 | 索引的資料欄名稱清單,可用於在資料表中找出不重複的資料列。 | 
| SQL Server | change_type | 
    字串 | 字串 | 事件代表的變更類型 (  | 
    
| Salesforce | object_name | 
    字串 | 字串 | 與事件相關聯的 Salesforce 物件名稱。  | 
    
| Salesforce | domain | 
    字串 | 字串 | 與事件相關聯的網域名稱。  | 
    
| Salesforce | is_deleted | 
    布林值 | 布林值 | 
    
  | 
    
| Salesforce | change_type | 
    字串 | 字串 | 事件代表的變更類型 (  | 
    
| Salesforce | primary_keys | 
    字串陣列 | 字串陣列 | 組成資料表主鍵的資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會空白。 | 
| MongoDB | database | 
    字串 | 字串 | 與事件相關聯的資料庫。 | 
| MongoDB | collection | 
    字串 | 字串 | 與活動相關的收藏內容。集合類似於關聯式資料庫中的資料表。 | 
| MongoDB | change_type | 
    字串 | 字串 | 事件代表的變更類型 (CREATE、UPDATE 和 DELETE)。 | 
    
| MongoDB | is_deleted | 
    布林值 | 布林值 | 
    
  | 
    
| MongoDB | primary_keys | 
    字串陣列 | 字串陣列 | _id 欄位,做為集合中每個文件的主要鍵。 | 
    
事件流程範例
這個流程說明連續三項作業 (INSERT、UPDATE 和 DELETE) 在來源資料庫的 SAMPLE 表格中,對單一資料列產生的事件。
| 時間 | THIS_IS_MY_PK (int) | FIELD1 (nchar 可為空值) | FIELD2 (nchar non-null)> | 
|---|---|---|---|
| 0 | 1231535353 | foo | TLV | 
| 1 | 1231535353 | NULL | TLV | 
INSERT (T0)
訊息酬載包含新資料列的所有內容。
{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",  
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id": 
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}
更新 (T1)
訊息酬載包含新資料列的所有內容。不含先前的值。
{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",  
  "source_metadata": {
    "log_file": 
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}
DELETE (T2)
訊息酬載包含新資料列的所有內容。
{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file": 
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}
順序和一致性
本節說明 Datastream 如何處理排序和一致性。
排序
Datastream 不保證順序,但每個事件都包含完整資料列,以及資料寫入來源的時間戳記。在 BigQuery 中,系統會自動以正確順序合併順序錯誤的事件。BigQuery 會使用事件中繼資料和內部變更序號 (CSN),以正確順序將事件套用至資料表。在 Cloud Storage 中,同一時間的事件可能會跨越多個檔案。
如果事件是為在啟動串流時建立的初始資料回填作業而產生,則事件順序錯亂是正常現象。
系統會根據來源推斷順序。
| 來源 | 說明 | 
|---|---|
| MySQL | 初始回填作業的事件  如果事件屬於持續進行的複製作業, 您可以從   | 
    
| Oracle | 初始回填作業的事件  如果事件屬於持續進行的複製作業, 您可以透過   | 
    
| PostgreSQL | 初始回填作業的事件  如果事件屬於持續進行的複製作業, 您可以透過   | 
    
| SQL Server | 
     初始回填作業的事件  如果事件屬於持續進行的複製作業, 您可以透過   | 
    
| Salesforce | 
     您可以使用記錄的   | 
    
| MongoDB (預先發布版) | 
     您可以透過作業記錄中的   | 
    
一致性
Datastream 會確保來源資料庫中的資料至少傳送至目的地一次。不會遺漏任何事件,但串流中可能會出現重複事件。重複事件的偵測時間範圍應以分鐘為單位,並可使用事件中繼資料中的事件通用唯一識別碼 (UUID) 偵測重複事件。
如果資料庫記錄檔含有未修訂的交易,且有任何交易遭到回溯,資料庫就會在記錄檔中以「反向」資料操縱語言 (DML) 作業反映這項異動。舉例來說,如果 INSERT 作業已回溯,就會有對應的 DELETE 作業。Datastream 會從記錄檔讀取這些作業。
關於串流
每個串流都有中繼資料,用於說明串流和擷取資料的來源。中繼資料則含有串流名稱、來源和目的地連線設定檔等資訊。
如要查看 Stream 物件的完整定義,請參閱 API 參考資料說明文件。
串流狀態
串流可以是下列其中一種狀態:
Not startedStartingRunningDrainingPausedFailedFailed permanently
您可以使用記錄檔找出其他狀態資訊,例如資料表回填或處理的列數。您也可以使用 FetchStreamErrors API 擷取錯誤。
可透過探索 API 取得的物件中繼資料
探索 API 會傳回物件,代表連線設定檔所代表資料來源或目的地中定義的物件結構。每個物件都有物件本身的中繼資料,以及所擷取每個資料欄位的中繼資料。您可以使用 Discover API 取得這項中繼資料。