本文說明如何將 Dataflow 的資料寫入 BigQuery。
總覽
在大多數情況下,建議使用受管理 I/O 寫入 BigQuery。代管 I/O 提供自動升級和一致的設定 API 等功能。寫入 BigQuery 時,Managed I/O 會自動為批次或串流工作選擇最佳寫入方法。
如需更進階的效能調整,請考慮使用 BigQueryIO
連接器。詳情請參閱本文中的「使用 BigQueryIO
連接器」。
成效
下表列出各種工作負載的效能指標。這些工作負載是在一個 e2-standard2
工作站上執行,使用的 Apache Beam SDK 2.49.0 適用於 Java。他們沒有使用 Runner v2。
1 億筆記錄 | 1 kB | 1 個資料欄 | 處理量 (位元組) | 處理量 (元素) |
---|---|---|
儲存空間寫入 | 55 MBps | 每秒 54,000 個元素 |
Avro Load | 78 MB/秒 | 每秒 77,000 個元素 |
Json Load | 54 MBps | 每秒 53,000 個元素 |
這些指標是以簡單的批次管道為依據。這些基準旨在比較 I/O 連接器之間的效能,不一定代表實際的管道。Dataflow 管道效能相當複雜,取決於 VM 類型、處理的資料、外部來源和接收器的效能,以及使用者程式碼。這些指標是根據執行 Java SDK 取得,無法代表其他語言 SDK 的效能特徵。詳情請參閱「Beam IO 效能」。
使用 BigQueryIO
連接器
BigQuery I/O 連接器支援下列 BigQuery 寫入方法:
STORAGE_WRITE_API
。在這個模式下,連接器會使用 BigQuery Storage Write API,直接將資料寫入 BigQuery 儲存空間。Storage Write API 將串流擷取和批次載入功能合併為單一的高效能 API。這個模式可確保語意為「剛好一次」。STORAGE_API_AT_LEAST_ONCE
。這個模式也會使用 Storage Write API,但至少會提供一次語意。這個模式可降低大多數管道的延遲時間。但可能會重複寫入。FILE_LOADS
. 在這個模式下,連接器會將輸入資料寫入 Cloud Storage 中的暫存檔案。然後執行 BigQuery load job,將資料載入 BigQuery。這個模式是受限PCollections
的預設模式,最常見於批次管道。STREAMING_INSERTS
。在此模式下,連接器會使用舊版串流 API。這個模式是無界限PCollections
的預設模式,但不建議用於新專案。
選擇寫入方法時,請考量下列幾點:
- 如果是串流工作,請考慮使用
STORAGE_WRITE_API
或STORAGE_API_AT_LEAST_ONCE
,因為這些模式會直接寫入 BigQuery 儲存空間,不會使用中繼暫存檔案。 - 如果使用至少一次串流模式執行管道,請將寫入模式設為
STORAGE_API_AT_LEAST_ONCE
。這項設定效率更高,且符合「至少一次」串流模式的語意。 - 檔案載入和 Storage Write API 有不同的配額和限制。
- 載入工作會使用共用的 BigQuery 運算單元集區或預留運算單元。如要使用預留運算單元,請在具有
PIPELINE
類型保留項目指派作業的專案中執行載入工作。如果您使用共用 BigQuery 運算單元集區,載入工作就不會產生費用。不過,BigQuery 不保證共用集區的可用容量。詳情請參閱「預留項目簡介」。
平行處理工作數量
在串流管道中,連接器會將
FILE_LOADS
和STORAGE_WRITE_API
的資料分片至多個檔案或串流。一般來說,我們建議呼叫withAutoSharding
來啟用自動分片。在批次管道中,連接器會將資料寫入已分割的檔案,然後平行載入 BigQuery。
FILE_LOADS
在批次管道中,每個工作人員會建立一或多個串流,以便寫入 BigQuery,具體數量取決於分片總數。
STORAGE_WRITE_API
對於
STORAGE_API_AT_LEAST_ONCE
,只有一個預設寫入串流。多個工作站會附加至這個串流。
最佳做法
Storage Write API 有配額限制。連接器會處理大部分管道的這些限制。不過,在某些情況下,可用的 Storage Write API 串流可能會耗盡。舉例來說,如果管線使用自動分片和自動調度資源,且目的地數量龐大,就可能發生這個問題,尤其是在長時間執行的工作負載變動劇烈時。如果發生這個問題,請考慮使用
STORAGE_WRITE_API_AT_LEAST_ONCE
,即可避免問題。使用 Google Cloud 指標監控 Storage Write API 配額用量。
使用檔案載入時,Avro 通常比 JSON 表現更出色。如要使用 Avro,請呼叫
withAvroFormatFunction
。根據預設,載入工作會在與 Dataflow 工作相同的專案中執行。如要指定其他專案,請呼叫
withLoadJobProjectId
。使用 Java SDK 時,請考慮建立代表 BigQuery 資料表結構定義的類別。然後在管道中呼叫
useBeamSchema
,自動在 Apache BeamRow
和 BigQueryTableRow
類型之間轉換。如需結構定義類別的範例,請參閱ExampleModel.java
。如果載入的資料表含有數千個欄位的複雜結構定義,請考慮呼叫
withMaxBytesPerPartition
,為每個載入工作設定較小的最大大小。根據預設,
BigQueryIO
會使用適用於大多數管道的 Storage Write API 設定。不過,如果發現效能問題,可以設定管道選項來調整這些設定。詳情請參閱 Apache Beam 說明文件中的「調整 Storage Write API」。
串流管道
以下建議適用於串流管道。
如果是串流管道,建議使用 Storage Write API (
STORAGE_WRITE_API
或STORAGE_API_AT_LEAST_ONCE
)。串流管道可以使用檔案載入,但這種做法有缺點:
建議盡可能使用
STORAGE_WRITE_API_AT_LEAST_ONCE
。這可能會導致重複的記錄寫入 BigQuery,但與STORAGE_WRITE_API
相比,費用較低且更具擴充性。一般來說,請避免使用
STREAMING_INSERTS
。串流插入的費用比 Storage Write API 高,效能也不如後者。資料分片可提升串流管道的效能。對於大多數管道,自動分片是個不錯的起點。不過,您可以按照下列方式調整分片:
- 如要設定寫入串流的數量,請呼叫
STORAGE_WRITE_API
withNumStorageWriteApiStreams
。 - 如要使用
FILE_LOADS
,請呼叫withNumFileShards
設定檔案分片數量。
- 如要設定寫入串流的數量,請呼叫
如果您使用串流插入,建議將
retryTransientErrors
設為重試政策。
批次管道
下列建議適用於批次管道。
對於大多數大型批次管道,我們建議先嘗試
FILE_LOADS
。批次管道可以使用STORAGE_WRITE_API
,但如果規模龐大 (超過 1,000 個 vCPU) 或同時執行多個管道,就可能超出配額限制。Apache Beam 不會限制批次STORAGE_WRITE_API
工作的寫入串流數量上限,因此工作最終會達到 BigQuery Storage API 限制。使用
FILE_LOADS
時,您可能會用盡共用 BigQuery 運算單元集區或保留運算單元集區。如果發生這類失敗情況,請嘗試下列方法:- 減少工作的工作站數量上限或工作站大小。
- 購買更多預留運算單元。
- 建議使用
STORAGE_WRITE_API
。
如果管道規模不大 (少於 1,000 個 vCPU),建議使用
STORAGE_WRITE_API
。如果是這類較小的工作,建議您使用STORAGE_WRITE_API
(如需無法傳送的訊息佇列),或在FILE_LOADS
共用運算單元集區不足時使用。如果可以容忍重複資料,請考慮使用
STORAGE_WRITE_API_AT_LEAST_ONCE
。這個模式可能會導致重複的記錄寫入 BigQuery,但費用可能比STORAGE_WRITE_API
選項低。不同的寫入模式可能會根據管道的特性而有不同的執行方式。進行實驗,找出最適合工作負載的寫入模式。
處理列層級錯誤
本節說明如何處理可能在列層級發生的錯誤,例如輸入資料格式錯誤或結構定義不符。
如果是 Storage Write API,無法寫入的資料列會放入另一個 PCollection
。如要取得這個集合,請在 WriteResult
物件上呼叫 getFailedStorageApiInserts
。如需這個方法的範例,請參閱「將資料串流至 BigQuery」。
建議您將錯誤傳送至死信佇列或資料表,以供日後處理。如要進一步瞭解這個模式,請參閱BigQueryIO
dead letter 模式。
如果是 FILE_LOADS
,載入資料時發生錯誤,載入工作就會失敗,管道也會擲回執行階段例外狀況。您可以在 Dataflow 記錄中查看錯誤,或查看 BigQuery 工作記錄。I/O 連接器不會傳回個別失敗資料列的相關資訊。
如要進一步瞭解如何排解錯誤,請參閱「BigQuery 連接器錯誤」。
範例
下列範例說明如何使用 Dataflow 寫入 BigQuery。這些範例使用 BigQueryIO
連接器。
寫入現有資料表
以下範例會建立批次管道,將 PCollection<MyData>
寫入 BigQuery,其中 MyData
是自訂資料型別。
BigQueryIO.write()
方法會傳回 BigQueryIO.Write<T>
型別,用於設定寫入作業。詳情請參閱 Apache Beam 說明文件中的「寫入資料表」。這個程式碼範例會寫入現有資料表 (CREATE_NEVER
),並將新資料列附加至資料表 (WRITE_APPEND
)。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
寫入新資料表或現有資料表
如果目的地資料表不存在,下列範例會將建立處置設為 CREATE_IF_NEEDED
,藉此建立新資料表。使用這個選項時,您必須提供表格結構定義。如果連接器建立新資料表,就會使用這個結構定義。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
將資料串流至 BigQuery
以下範例說明如何透過將寫入模式設為 STORAGE_WRITE_API
,使用「只傳送一次」語意串流資料:
並非所有串流管道都需要「僅限一次」語意。舉例來說,您或許可以手動移除目的地表格中的重複項目。如果您的情境可接受重複記錄,請考慮將寫入方法設為 STORAGE_API_AT_LEAST_ONCE
,使用至少一次語意。一般來說,這種方法效率較高,且大多數管道的延遲時間較短。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
後續步驟
- 進一步瞭解受管理 I/O。