本文說明如何將資料從 BigQuery 讀取至 Dataflow。
總覽
在大多數的使用情況下,建議您使用受管理 I/O 從 BigQuery 讀取資料。代管 I/O 提供自動升級和一致的設定 API 等功能。從 BigQuery 讀取資料時,Managed I/O 會執行直接資料表讀取,提供最佳讀取效能。
如需更進階的效能調整,請考慮使用 BigQueryIO
連接器。BigQueryIO
連接器支援直接讀取表格,以及從 BigQuery 匯出工作讀取資料。此外,您也可以更精細地控制表格記錄的反序列化作業。詳情請參閱本文中的「使用 BigQueryIO
連接器」。
資料欄投影和篩選
如要減少管道從 BigQuery 讀取的資料量,可以使用下列技巧:
- 資料欄投影:指定要從資料表讀取的資料欄子集。 如果資料表有大量資料欄,但您只需要讀取其中一部分,請使用資料欄投影。
- 資料列篩選會指定要套用至資料表的述詞。BigQuery 讀取作業只會傳回符合篩選條件的資料列,因此可減少管道擷取的資料總量。
以下範例會從資料表讀取 "user_name"
和 "age"
資料欄,並篩除不符合述詞 "age > 18"
的資料列。本範例使用受管理 I/O。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
從查詢結果讀取資料
以下範例使用 Managed I/O 讀取 SQL 查詢的結果。這項作業會針對 BigQuery 公開資料集執行查詢。您也可以使用 SQL 查詢,從 BigQuery 檢視區塊或具體化檢視區塊讀取資料。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
使用 BigQueryIO
連接器
BigQueryIO
連接器支援下列序列化方法:
- 以 Avro 格式記錄讀取資料。使用這個方法時,您會提供函式,將 Avro 記錄剖析為自訂資料型別。
- 以
TableRow
物件形式讀取資料。這個方法很方便,因為不需要自訂資料型別。不過,與讀取 Avro 格式的記錄相比,這類記錄的效能通常較低。
這個連接器支援兩種讀取資料的選項:
- 匯出工作。根據預設,
BigQueryIO
連接器會執行 BigQuery 匯出工作,將資料表資料寫入 Cloud Storage。然後從 Cloud Storage 讀取資料。 - 直接讀取資料表。這個選項比匯出工作更快,因為它使用 BigQuery Storage Read API,並略過匯出步驟。如要使用直接資料表讀取功能,請在建構管道時呼叫
withMethod(Method.DIRECT_READ)
。
選擇要使用哪個選項時,請考量下列幾點:
一般來說,我們建議使用直接讀取資料表的方式。相較於匯出作業,Storage Read API 更適合用於資料管道,因為不需要匯出資料的中間步驟。
如果您使用直接讀取,系統會針對 Storage Read API 的用量收費。請參閱 BigQuery 定價頁面的「資料提取定價」一節。
匯出工作不會產生額外費用。不過,匯出工作有限制。如要移動大量資料,且以即時性為優先考量,並可調整成本,建議直接讀取。
Storage Read API 有配額限制。使用Google Cloud 指標監控配額用量。
如果您使用匯出工作,請設定
--tempLocation
pipeline 選項,指定匯出檔案的 Cloud Storage 值區。使用 Storage Read API 時,您可能會在記錄中看到租約到期和工作階段逾時錯誤,例如:
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
如果作業時間超過逾時時間,就可能發生這類錯誤,通常是在執行時間超過 6 小時的管道中。如要緩解這個問題,請改用檔案匯出功能。
平行處理作業的程度取決於讀取方法:
直接讀取:I/O 連接器會根據匯出要求的大小,產生動態數量的串流。並行從 BigQuery 直接讀取這些串流。
匯出工作:BigQuery 會決定要寫入 Cloud Storage 的檔案數量。檔案數量取決於查詢和資料量。I/O 連接器會平行讀取匯出的檔案。
下表顯示各種 BigQuery I/O 讀取選項的效能指標。這些工作負載是在一個 e2-standard2
工作站上執行,使用 Java 適用的 Apache Beam SDK 2.49.0。他們沒有使用 Runner v2。
1 億筆記錄 | 1 kB | 1 個資料欄 | 處理量 (位元組) | 處理量 (元素) |
---|---|---|
儲存空間讀取 | 120 MBps | 每秒 88,000 個元素 |
匯出 Avro | 105 MBps | 每秒 78,000 個元素 |
Json Export | 110 MBps | 每秒 81,000 個元素 |
這些指標是以簡單的批次管道為依據。這些基準旨在比較 I/O 連接器之間的效能,不一定代表實際的管道。Dataflow 管道效能相當複雜,取決於 VM 類型、處理的資料、外部來源和接收器的效能,以及使用者程式碼。這些指標是根據執行 Java SDK 取得,無法代表其他語言 SDK 的效能特徵。詳情請參閱「Beam IO 效能」。
範例
下列程式碼範例會搭配使用 BigQueryIO
連接器和直接資料表讀取作業。如要改用匯出工作,請省略對 withMethod
的呼叫。
讀取 Avro 格式的記錄
這個範例說明如何使用 BigQueryIO
連接器讀取 Avro 格式的記錄。
如要將 BigQuery 資料讀取至 Avro 格式的記錄,請使用 read(SerializableFunction)
方法。這個方法會採用應用程式定義的函式,該函式會剖析 SchemaAndRecord
物件並傳回自訂資料型別。連接器的輸出內容是自訂資料類型的 PCollection
。
下列程式碼會從 BigQuery 表格讀取 PCollection<MyData>
,其中 MyData
是應用程式定義的類別。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
read
方法會採用 SerializableFunction<SchemaAndRecord, T>
介面,該介面會定義從 Avro 記錄轉換為自訂資料類別的函式。在上一個程式碼範例中,MyData.apply
方法會實作這項轉換函式。範例函式會剖析 Avro 記錄中的 name
和 age
欄位,並傳回 MyData
例項。
如要指定要讀取的 BigQuery 資料表,請呼叫 from
方法,如上一個範例所示。詳情請參閱 BigQuery I/O 連接器說明文件中的「資料表名稱」。
讀取 TableRow
個物件
這個範例說明如何使用 BigQueryIO
連接器讀取 TableRow
物件。
readTableRows
方法會將 BigQuery 資料讀取至 TableRow
物件的 PCollection
中。每個 TableRow
都是鍵/值組合對應,其中包含單一資料列的資料。呼叫 from
方法,指定要讀取的 BigQuery 資料表。
下列程式碼會從 BigQuery 資料表讀取 PCollection<TableRows>
。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。
這個範例也說明如何從 TableRow
字典存取值。
整數值會編碼為字串,以符合 BigQuery 匯出的 JSON 格式。