使用 Storage Write API 流式传输数据
本文档介绍如何使用 BigQuery Storage Write API 将数据流式传输至 BigQuery。
在流式传输场景中,数据会连续到达,并且应该能够以最低延时进行读取。使用 BigQuery Storage Write API 处理流式工作负载时,请考虑您需要确保的事项:
- 如果您的应用只需要“至少一次”语义,请使用默认流。
- 如果您需要“正好一次”语义,请在已提交类型中创建一个或多个流,并使用流偏移量来确保“正好一次”写入。
在已提交类型下,只要服务器确认了写入请求,写入到流的数据即可用于查询。默认流也会使用已提交类型,但不提供“正好一次”保证。
使用默认流实现“至少一次”语义
如果您的应用可以接受在目标表中出现重复记录的可能性,那么我们建议您对流式传输场景使用默认流。
以下代码展示了如何将数据写入默认流:
Java
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。 如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为客户端库设置身份验证。
Node.js
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。
如需向 BigQuery 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为客户端库设置身份验证。
Python
以下示例展示了如何使用默认数据流插入包含两个字段的记录:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
该代码示例依赖于已编译的协议模块 sample_data_pb2.py
。如需创建已编译的模块,请执行 protoc --python_out=. sample_data.proto
命令,其中 protoc
是协议缓冲区编译器。sample_data.proto
文件定义了 Python 示例中使用的消息的格式。 如需安装 protoc
编译器,请按照协议缓冲区 - Google 的数据交换格式中的说明操作。
下面显示了 sample_data.proto
文件的内容:
message SampleData {
required string name = 1;
required int64 age = 2;
}
此脚本会使用 entries.json
文件,其中包含要插入到 BigQuery 表中的示例行数据:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
使用多路复用
您只能在默认流的流写入器级层启用多路复用。如需在 Java 中启用多路复用,请在构建 StreamWriter
或 JsonStreamWriter
对象时调用 setEnableConnectionPool
方法。
启用连接池后,Java 客户端库会在后台管理您的连接,如果现有连接被认为过于繁忙,则会扩容连接。为了让自动扩容更有效,您应考虑降低 maxInflightRequests
限制。
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
如需在 Go 中启用多路复用,请参阅连接共享(多路复用)。
使用已提交类型实现“正好一次”语义
如果您需要“正好一次”写入语义,请在已提交类型下创建写入流。在已提交类型下,一旦客户端从后端收到确认,记录便可进行查询。
已提交类型通过使用记录偏移量在流中提供“正好一次”传送。通过使用记录偏移量,应用会在每次调用 AppendRows
时指定下一个附加偏移量。仅当偏移值与下一个附加偏移量匹配时,才会执行写入操作。如需了解详情,请参阅管理流偏移量以实现“正好一次”语义。
如果您未提供偏移量,则记录将附加到流的当前末尾处。在这种情况下,如果附加请求返回错误,重试附加请求可能会导致记录在流中多次出现。
如需使用已提交类型,请执行以下步骤:
Java
- 调用
CreateWriteStream
以在已提交类型下创建一个或多个流。 - 对于每个流,在循环中调用
AppendRows
以写入批量记录。 - 对每个流调用
FinalizeWriteStream
以释放该流。调用此方法后,您无法再向流中写入更多行。此步骤在已提交类型下是可选步骤,但它有助于防止超出活跃流的限制。如需了解详情,请参阅限制流的创建速率。
Node.js
- 调用
createWriteStreamFullResponse
以在已提交类型下创建一个或多个流。 - 对于每个流,在循环中调用
appendRows
以写入批量记录。 - 对每个流调用
finalize
以释放该流。调用此方法后,您无法再向流中写入更多行。此步骤在已提交类型下是可选步骤,但它有助于防止超出活跃流的限制。如需了解详情,请参阅限制流的创建速率。
您无法明确删除流。流遵循系统定义的存留时间 (TTL):
- 如果提交的流中没有流量,则流的 TTL 为三天。
- 默认情况下,缓冲流的 TTL 为 7 天(如果流中没有流量)。
以下代码展示了如何使用已提交类型。
Java
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。 如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为客户端库设置身份验证。
Node.js
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。
如需向 BigQuery 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为客户端库设置身份验证。
使用 Apache Arrow 格式注入数据
以下代码展示了如何使用 Apache Arrow 格式注入数据。如需查看更详细的端到端示例,请参阅 GitHub 上的 PyArrow 示例。
Python
此示例展示了如何使用默认数据流提取已序列化的 PyArrow 表。
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()