使用 Storage Write API 流式传输数据
本文档介绍如何使用 BigQuery Storage Write API 将数据流式传输至 BigQuery。
在流式传输场景中,数据会连续到达,并且应该能够以最低延时进行读取。使用 BigQuery Storage Write API 处理流式工作负载时,请考虑您需要确保的事项:
- 如果您的应用只需要“至少一次”语义,请使用默认流。
- 如果您需要“正好一次”语义,请在已提交类型中创建一个或多个流,并使用流偏移量来确保“正好一次”写入。
在已提交类型下,只要服务器确认了写入请求,写入到流的数据即可用于查询。默认流也会使用已提交类型,但不提供“正好一次”保证。
使用默认流实现“至少一次”语义
如果您的应用可以接受在目标表中出现重复记录的可能性,那么我们建议您对流式传输场景使用默认流。
以下代码展示了如何将数据写入默认流:
Java
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。 如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Node.js
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Python
以下示例展示了如何使用默认数据流插入包含两个字段的记录:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
该代码示例依赖于已编译的协议模块 sample_data_pb2.py
。如需创建已编译的模块,请执行 protoc --python_out=. sample_data.proto
命令,其中 protoc
是协议缓冲区编译器。sample_data.proto
文件定义了 Python 示例中使用的消息的格式。如需安装 protoc
编译器,请按照 Protocol Buffers - Google 的数据交换格式中的说明操作。
下面显示了 sample_data.proto
文件的内容:
message SampleData {
required string name = 1;
required int64 age = 2;
}
此脚本会使用 entities.json
文件,其中包含要插入 BigQuery 表的示例行数据:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
使用多路复用
您只能在默认流的流写入器级层启用多路复用。如需在 Java 中启用多路复用,请在构建 StreamWriter
或 JsonStreamWriter
对象时调用 setEnableConnectionPool
方法:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
如需在 Go 中启用多路复用,请参阅连接共享(多路复用)。
使用已提交类型实现“正好一次”语义
如果您需要“正好一次”写入语义,请在已提交类型下创建写入流。在已提交类型下,一旦客户端从后端收到确认,记录便可进行查询。
已提交类型通过使用记录偏移量在流中提供“正好一次”传送。通过使用记录偏移量,应用会在每次调用 AppendRows
时指定下一个附加偏移量。仅当偏移值与下一个附加偏移量匹配时,才会执行写入操作。如需了解详情,请参阅管理流偏移量以实现“正好一次”语义。
如果您未提供偏移量,则记录将附加到流的当前末尾处。在这种情况下,如果附加请求返回错误,重试附加请求可能会导致记录在流中多次出现。
如需使用已提交类型,请执行以下步骤:
Java
- 调用
CreateWriteStream
以在已提交类型下创建一个或多个流。 - 对于每个流,在循环中调用
AppendRows
以写入批量记录。 - 对每个流调用
FinalizeWriteStream
以释放该流。调用此方法后,您无法再向流中写入更多行。此步骤在已提交类型下是可选步骤,但它有助于防止超出活跃流的限制。如需了解详情,请参阅限制流的创建速率。
Node.js
- 调用
createWriteStreamFullResponse
以在已提交类型下创建一个或多个流。 - 对于每个流,在循环中调用
appendRows
以写入批量记录。 - 对每个流调用
finalize
以释放该流。调用此方法后,您无法再向流中写入更多行。此步骤在已提交类型下是可选步骤,但它有助于防止超出活跃流的限制。如需了解详情,请参阅限制流的创建速率。
您无法明确删除流。流遵循系统定义的存留时间 (TTL):
- 如果提交的流中没有流量,则流的 TTL 为三天。
- 默认情况下,缓冲流的 TTL 为 7 天(如果流中没有流量)。
以下代码展示了如何使用已提交类型。
Java
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。 如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
Node.js
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。