此示例演示了如何使用协议缓冲区将数据写入 BigQuery 表。
代码示例
Node.js
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, Writer} = managedwriter;
const sample_data_pb = require('./sample_data_pb.js');
const {SampleData} = sample_data_pb;
const protobufjs = require('protobufjs');
require('protobufjs/ext/descriptor');
async function appendRowsProto2() {
/**
* If you make updates to the sample_data.proto protocol buffers definition,
* run:
* pbjs sample_data.proto -t static-module -w commonjs -o sample_data.js
* pbjs sample_data.proto -t json --keep-case -o sample_data.json
* from the /samples directory to generate the sample_data module.
*/
// So that BigQuery knows how to parse the serialized_rows, create a
// protocol buffer representation of your message descriptor.
const root = protobufjs.loadSync('./sample_data.json');
const descriptor = root.lookupType('SampleData').toDescriptor('proto2');
const protoDescriptor = adapt.normalizeDescriptor(descriptor).toJSON();
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({projectId});
try {
// Append data to the given stream.
const connection = await writeClient.createStreamConnection({
streamType,
destinationTable,
});
const streamId = connection.getStreamId();
console.log(`Stream created: ${streamId}`);
const writer = new Writer({
connection,
protoDescriptor,
});
let serializedRows = [];
const pendingWrites = [];
// Row 1
let row = {
rowNum: 1,
boolCol: true,
bytesCol: Buffer.from('hello world'),
float64Col: parseFloat('+123.45'),
int64Col: 123,
stringCol: 'omg',
};
serializedRows.push(SampleData.encode(row).finish());
// Row 2
row = {
rowNum: 2,
boolCol: false,
};
serializedRows.push(SampleData.encode(row).finish());
// Row 3
row = {
rowNum: 3,
bytesCol: Buffer.from('later, gator'),
};
serializedRows.push(SampleData.encode(row).finish());
// Row 4
row = {
rowNum: 4,
float64Col: 987.654,
};
serializedRows.push(SampleData.encode(row).finish());
// Row 5
row = {
rowNum: 5,
int64Col: 321,
};
serializedRows.push(SampleData.encode(row).finish());
// Row 6
row = {
rowNum: 6,
stringCol: 'octavia',
};
serializedRows.push(SampleData.encode(row).finish());
// Set an offset to allow resuming this stream if the connection breaks.
// Keep track of which requests the server has acknowledged and resume the
// stream at the first non-acknowledged message. If the server has already
// processed a message with that offset, it will return an ALREADY_EXISTS
// error, which can be safely ignored.
// The first request must always have an offset of 0.
let offsetValue = 0;
// Send batch.
let pw = writer.appendRows({serializedRows}, offsetValue);
pendingWrites.push(pw);
// Reset rows.
serializedRows = [];
// Row 7
const days = new Date('2019-02-07').getTime() / (1000 * 60 * 60 * 24);
row = {
rowNum: 7,
dateCol: days, // The value is the number of days since the Unix epoch (1970-01-01)
};
serializedRows.push(SampleData.encode(row).finish());
// Row 8
row = {
rowNum: 8,
datetimeCol: '2019-02-17 11:24:00.000',
};
serializedRows.push(SampleData.encode(row).finish());
// Row 9
row = {
rowNum: 9,
geographyCol: 'POINT(5 5)',
};
serializedRows.push(SampleData.encode(row).finish());
// Row 10
row = {
rowNum: 10,
numericCol: '123456',
bignumericCol: '99999999999999999999999999999.999999999',
};
serializedRows.push(SampleData.encode(row).finish());
// Row 11
row = {
rowNum: 11,
timeCol: '18:00:00',
};
serializedRows.push(SampleData.encode(row).finish());
// Row 12
const timestamp = new Date('2022-01-09T03:49:46.564Z').getTime();
row = {
rowNum: 12,
timestampCol: timestamp * 1000, // The value is given in microseconds since the Unix epoch (1970-01-01)
};
serializedRows.push(SampleData.encode(row).finish());
// Offset must equal the number of rows that were previously sent.
offsetValue = 6;
// Send batch.
pw = writer.appendRows({serializedRows}, offsetValue);
pendingWrites.push(pw);
serializedRows = [];
// Row 13
row = {
rowNum: 13,
int64List: [1999, 2001],
};
serializedRows.push(SampleData.encode(row).finish());
// Row 14
row = {
rowNum: 14,
structCol: {
subIntCol: 99,
},
};
serializedRows.push(SampleData.encode(row).finish());
// Row 15
row = {
rowNum: 15,
structList: [{subIntCol: 100}, {subIntCol: 101}],
};
serializedRows.push(SampleData.encode(row).finish());
offsetValue = 12;
// Send batch.
pw = writer.appendRows({serializedRows}, offsetValue);
pendingWrites.push(pw);
const results = await Promise.all(
pendingWrites.map(pw => pw.getResult())
);
console.log('Write results:', results);
const {rowCount} = await connection.finalize();
console.log(`Row count: ${rowCount}`);
const response = await writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
});
console.log(response);
} catch (err) {
console.log(err);
} finally {
writeClient.close();
}
}
Python
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
"""
This code sample demonstrates using the low-level generated client for Python.
"""
import datetime
import decimal
from google.protobuf import descriptor_pb2
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types, writer
# If you make updates to the sample_data.proto protocol buffers definition,
# run:
#
# protoc --python_out=. sample_data.proto
#
# from the samples/snippets directory to generate the sample_data_pb2 module.
from . import sample_data_pb2
def append_rows_proto2(project_id: str, dataset_id: str, table_id: str):
"""Create a write stream, write some sample data, and commit the stream."""
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
write_stream = types.WriteStream()
# When creating the stream, choose the type. Use the PENDING type to wait
# until the stream is committed before it is visible. See:
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The initial request must contain the stream name.
request_template.write_stream = stream_name
# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows = types.ProtoRows()
row = sample_data_pb2.SampleData()
row.row_num = 1
row.bool_col = True
row.bytes_col = b"Hello, World!"
row.float64_col = float("+inf")
row.int64_col = 123
row.string_col = "Howdy!"
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 2
row.bool_col = False
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 3
row.bytes_col = b"See you later!"
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 4
row.float64_col = 1000000.125
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 5
row.int64_col = 67000
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 6
row.string_col = "Auf Wiedersehen!"
proto_rows.serialized_rows.append(row.SerializeToString())
# Set an offset to allow resuming this stream if the connection breaks.
# Keep track of which requests the server has acknowledged and resume the
# stream at the first non-acknowledged message. If the server has already
# processed a message with that offset, it will return an ALREADY_EXISTS
# error, which can be safely ignored.
#
# The first request must always have an offset of 0.
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
response_future_1 = append_rows_stream.send(request)
# Create a batch of rows containing scalar values that don't directly
# correspond to a protocol buffers scalar type. See the documentation for
# the expected data formats:
# https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
proto_rows = types.ProtoRows()
row = sample_data_pb2.SampleData()
row.row_num = 7
date_value = datetime.date(2021, 8, 12)
epoch_value = datetime.date(1970, 1, 1)
delta = date_value - epoch_value
row.date_col = delta.days
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 8
datetime_value = datetime.datetime(2021, 8, 12, 9, 46, 23, 987456)
row.datetime_col = datetime_value.strftime("%Y-%m-%d %H:%M:%S.%f")
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 9
row.geography_col = "POINT(-122.347222 47.651111)"
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 10
numeric_value = decimal.Decimal("1.23456789101112e+6")
row.numeric_col = str(numeric_value)
bignumeric_value = decimal.Decimal("-1.234567891011121314151617181920e+16")
row.bignumeric_col = str(bignumeric_value)
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 11
time_value = datetime.time(11, 7, 48, 123456)
row.time_col = time_value.strftime("%H:%M:%S.%f")
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 12
timestamp_value = datetime.datetime(
2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc
)
epoch_value = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
delta = timestamp_value - epoch_value
row.timestamp_col = int(delta.total_seconds()) * 1000000 + int(delta.microseconds)
proto_rows.serialized_rows.append(row.SerializeToString())
# Since this is the second request, you only need to include the row data.
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
# the first request.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
# Offset must equal the number of rows that were previously sent.
request.offset = 6
response_future_2 = append_rows_stream.send(request)
# Create a batch of rows with STRUCT and ARRAY BigQuery data types. In
# protocol buffers, these correspond to nested messages and repeated
# fields, respectively.
proto_rows = types.ProtoRows()
row = sample_data_pb2.SampleData()
row.row_num = 13
row.int64_list.append(1)
row.int64_list.append(2)
row.int64_list.append(3)
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 14
row.struct_col.sub_int_col = 7
proto_rows.serialized_rows.append(row.SerializeToString())
row = sample_data_pb2.SampleData()
row.row_num = 15
sub_message = sample_data_pb2.SampleData.SampleStruct()
sub_message.sub_int_col = -1
row.struct_list.append(sub_message)
sub_message = sample_data_pb2.SampleData.SampleStruct()
sub_message.sub_int_col = -2
row.struct_list.append(sub_message)
sub_message = sample_data_pb2.SampleData.SampleStruct()
sub_message.sub_int_col = -3
row.struct_list.append(sub_message)
proto_rows.serialized_rows.append(row.SerializeToString())
request = types.AppendRowsRequest()
request.offset = 12
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
# For each request sent, a message is expected in the responses iterable.
# This sample sends 3 requests, therefore expect exactly 3 responses.
response_future_3 = append_rows_stream.send(request)
# All three requests are in-flight, wait for them to finish being processed
# before finalizing the stream.
print(response_future_1.result())
print(response_future_2.result())
print(response_future_3.result())
# Shutdown background threads and close the streaming connection.
append_rows_stream.close()
# A PENDING type stream must be "finalized" before being committed. No new
# records can be written to the stream after this method has been called.
write_client.finalize_write_stream(name=write_stream.name)
# Commit the stream you created earlier.
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)
print(f"Writes to stream: '{write_stream.name}' have been committed.")
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。