使用 Storage Write API 串流資料
本文說明如何使用 BigQuery Storage Write API,將資料串流至 BigQuery。
在串流情境中,資料會持續抵達,且應能以最低延遲時間供讀取。使用 BigQuery Storage Write API 處理串流工作負載時,請考量您需要哪些保證:
- 如果應用程式只需要「至少一次」語意,請使用預設串流。
- 如需「僅限一次」語意,請在已提交類型中建立一或多個串流,並使用串流偏移量來確保寫入作業只會執行一次。
在已提交類型中,伺服器確認寫入要求後,即可查詢寫入串流的資料。預設串流也會使用已提交類型,但無法提供「只傳送一次」的保證。
使用預設串流,確保至少傳送一次語意
如果應用程式可以接受目的地表格中出現重複記錄的可能性,建議您使用預設串流進行串流作業。
下列程式碼顯示如何將資料寫入預設串流:
Java
如要瞭解如何安裝及使用 BigQuery 的用戶端程式庫,請參閱 BigQuery 用戶端程式庫。詳情請參閱 BigQuery Java API 參考說明文件。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。 詳情請參閱「設定用戶端程式庫的驗證機制」。
Node.js
如要瞭解如何安裝及使用 BigQuery 的用戶端程式庫,請參閱 BigQuery 用戶端程式庫。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。 詳情請參閱「設定用戶端程式庫的驗證機制」。
Python
這個範例說明如何使用預設串流插入含有兩個欄位的記錄:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
這個程式碼範例依附於已編譯的通訊協定模組 sample_data_pb2.py
。如要建立已編譯的模組,請執行 protoc --python_out=. sample_data.proto
指令,其中 protoc
是通訊協定緩衝區編譯器。sample_data.proto
檔案定義 Python 範例中使用的訊息格式。如要安裝 protoc
編譯器,請按照「通訊協定緩衝區 - Google 的資料交換格式」中的操作說明進行。
以下是 sample_data.proto
檔案的內容:
message SampleData {
required string name = 1;
required int64 age = 2;
}
這個指令碼會使用 entries.json
檔案,其中包含要插入 BigQuery 資料表的範例列資料:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
使用多工處理
您只能為預設串流在串流寫入器層級啟用多工。如要在 Java 中啟用多工,請在建構 StreamWriter
或 JsonStreamWriter
物件時呼叫 setEnableConnectionPool
方法。
啟用連線集區後,Java 用戶端程式庫會在背景管理連線,並在現有連線過於忙碌時擴充連線。如要讓自動調高功能更有效,建議調低 maxInflightRequests
限制。
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
如要在 Go 中啟用多路複用,請參閱「連線共用 (多路複用)」。
使用已提交類型,確保「僅限一次」語意
如需「僅限一次」寫入語意,請建立「已提交」類型的寫入串流。在已提交類型中,用戶端收到後端確認後,即可查詢記錄。
「已提交」類型會使用記錄偏移量,在串流中提供僅傳送一次的訊息。應用程式會使用記錄偏移值,在每次呼叫 AppendRows
時指定下一個附加偏移值。只有在偏移值與下一個附加偏移值相符時,系統才會執行寫入作業。詳情請參閱「管理串流偏移量,實現精確一次語意」。
如未提供位移,系統會將記錄附加至串流目前的結尾。在這種情況下,如果附加要求傳回錯誤,重試要求可能會導致記錄在串流中出現多次。
如要使用已提交類型,請按照下列步驟操作:
Java
- 呼叫
CreateWriteStream
,在已提交的型別中建立一或多個串流。 - 針對每個串流,在迴圈中呼叫
AppendRows
,即可寫入批次記錄。 - 針對每個串流呼叫
FinalizeWriteStream
,釋放串流。呼叫這個方法後,您就無法再將任何資料列寫入串流。如果是已承諾方案,這個步驟為選用步驟,但有助於避免超出有效串流的限制。詳情請參閱「限制串流建立速率」。
Node.js
- 呼叫
createWriteStreamFullResponse
,在已提交的型別中建立一或多個串流。 - 針對每個串流,在迴圈中呼叫
appendRows
,即可寫入批次記錄。 - 針對每個串流呼叫
finalize
,釋放串流。呼叫這個方法後,您就無法再將任何資料列寫入串流。如果是已承諾方案,這個步驟為選用步驟,但有助於避免超出有效串流的限制。詳情請參閱「限制串流建立速率」。
您無法明確刪除串流。串流會遵循系統定義的存留時間 (TTL):
- 如果串流沒有任何流量,已提交的串流 TTL 為三天。
- 如果緩衝串流沒有任何流量,預設存留時間為七天。
以下程式碼說明如何使用已提交的型別:
Java
如要瞭解如何安裝及使用 BigQuery 的用戶端程式庫,請參閱 BigQuery 用戶端程式庫。詳情請參閱 BigQuery Java API 參考說明文件。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。 詳情請參閱「設定用戶端程式庫的驗證機制」。
Node.js
如要瞭解如何安裝及使用 BigQuery 的用戶端程式庫,請參閱 BigQuery 用戶端程式庫。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。 詳情請參閱「設定用戶端程式庫的驗證機制」。
使用 Apache Arrow 格式擷取資料
下列程式碼顯示如何使用 Apache Arrow 格式擷取資料。如需更詳細的端對端範例,請參閱 GitHub 上的 PyArrow 範例。
Python
這個範例說明如何使用預設串流,擷取序列化的 PyArrow 資料表。
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()