使用 Datastream 和 Dataflow 將資料串流至 BigQuery

本頁面提供使用 Datastream 和 Dataflow 將資料串流至 BigQuery 的最佳做法。

根據使用者定義的索引鍵分割副本資料集

BigQuery 中的暫存資料集會自動進行分割。不過,根據預設,副本資料集不會分區,因為副本資料表的分區鍵必須根據特定業務邏輯定義,而不是由 Datastream 和 Dataflow 強制執行。

針對備用資源資料集中需要分區的每個資料表:

  1. 停止並排空 Dataflow 工作

  2. 在 BigQuery 中使用 SQL 編輯器,針對副本資料集中的每個資料表執行下列 SQL 指令碼。以這個範例來說,datastream_cdc 資料集中的 actor 資料表含有 last_update 欄,我們要將該欄設為分區鍵。執行指令碼即可重新建立資料表,並使用正確的分割區鍵。

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. 使用 Datastream 到 BigQuery 範本重新建立 Dataflow 工作。

執行使用者定義函式來操控事件資料

您可以使用 Datastream 到 BigQuery 範本執行 JavaScript 使用者定義函式。如要執行這項操作,請先將含有函式的檔案放在 Cloud Storage 內的特定位置。然後執行下列操作:

  • 在範本中使用 javascriptTextTransformGcsPath 參數,指定 Cloud Storage 中包含使用者定義函式的檔案位置。
  • 使用 javascriptTextTransformFunctionName 參數指定要呼叫的 JavaScript 函式名稱,做為使用者定義函式。

舉例來說,您可以執行使用者定義函式,在 BigQuery 中保留副本資料集資料表中的已刪除記錄。這個程序稱為軟刪除。

如要達成這個目標,請建立函式,將 _metadata_deleted 資料欄的值複製到名為 is_deleted 的新資料欄,然後將 _metadata_deleted 資料欄值重設為 false。這會導致 Dataflow 工作忽略刪除事件,並在更新 BigQuery 中的副本資料集時保留已刪除的記錄。

以下是這個使用者定義函式的程式碼範例:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

設定合併頻率

使用 Datastream 到 BigQuery 範本mergeFrequencyMinutes 參數設定合併頻率。這是指 BigQuery 中副本資料集內特定資料表合併作業之間的分鐘數。在系統回填歷來資料期間,建議您將合併頻率調低 (12 或 24 小時),以控管費用。

舉例來說,如果將這個參數的值設為 10 分鐘,Dataflow 就會每 10 分鐘執行一次使用範本的工作。不過,工作首次執行時會延遲 5 分鐘。以這個例子來說,如果工作在上午 9:14 執行,則第一次合併作業會在上午 9:29 進行 (合併作業需要 10 分鐘,延遲時間為 5 分鐘)。第二次合併會在上午 9:39 進行,之後每隔 10 分鐘合併一次 (上午 9:49、上午 9:59、上午 10:09 等等)。

如果將合併頻率設為 60 分鐘,工作會在整點執行,但首次執行工作時會延遲 5 分鐘。如果工作排定在上午 10 點執行,但由於延遲 5 分鐘,因此實際執行時間為上午 10 點 5 分。後續合併作業會以 60 分鐘為間隔進行 (上午 11:05、下午 12:05、下午 1:05 等)。

無論是為了控管成本或其他原因,您可能都無法以符合業務需求的頻率執行合併作業。您可能沒有最新資料。如要存取最新資料,請在 BigQuery 中,於暫存和副本資料集的資料表上建立檢視區塊,該檢視區塊會模擬合併作業。這個檢視區會建立為一個邏輯資料表 (適用於暫存和副本資料集)。如果合併頻率較低,且您需要更快存取資料,請使用檢視畫面。