使用 Storage Write API 批量加载数据
本文档介绍如何使用 BigQuery Storage Write API 将数据批量加载到 BigQuery 中。
在批量加载场景中,应用会写入数据并作为单个原子事务提交数据。使用 Storage Write API 批量加载数据时,在待处理类型下创建一个或多个流。待处理类型支持流级事务。在提交流之前,记录会缓冲在待处理状态。
对于批处理工作负载,另请考虑使用 Dataproc 通过 BigQuery 的 Apache Spark SQL 连接器来使用 Storage Write API,而不是编写自定义 Storage Write API 代码。
Storage Write API 非常适合数据流水线架构。主进程创建多个流。对于每个流,它都会分配一个工作器线程或单独的进程来写入批量数据的一部分。每个工作器都会创建与其流的连接,写入数据,并在完成写入后完成流。在所有工作器都向主进程发出成功完成的信号后,主进程将提交数据。如果工作器发生故障,则其已分配的数据部分不会显示在最终结果中,并且可以安全地重试整个工作器。在较复杂的流水线中,工作器通过报告写入主进程的最后一个偏移量来设置其进度检查点。这种方法可确保流水线具有可靠的故障恢复能力。
使用待处理类型批量加载数据
为了使用待处理类型,应用将执行以下操作:
- 调用
CreateWriteStream
以在待处理类型下创建一个或多个数据流。 - 对于每个流,在循环中调用
AppendRows
以写入批量记录。 - 对于每个数据流,调用
FinalizeWriteStream
。调用此方法后,您无法再向流中写入更多行。如果您在调用FinalizeWriteStream
后调用AppendRows
,则会返回google.rpc.Status
错误中的StorageError
和StorageErrorCode.STREAM_FINALIZED
。如需详细了解google.rpc.Status
错误模型,请参阅错误。 - 调用
BatchCommitWriteStreams
以提交数据流。调用此方法后,数据便可供读取。如果在提交任何数据流时出错,则会在BatchCommitWriteStreamsResponse
的stream_errors
字段中返回错误。
提交是一项原子操作,您可以一次提交多个流。数据流只能提交一次,因此,如果提交操作失败,可以放心地重试。在您提交数据流之前,数据处于待处理状态,无法读取。
在流完成之后但在提交之前,数据可以在缓冲区中保留最长 4 小时。待处理数据流必须在 24 小时内提交。待处理数据流缓冲区的总大小有配额限制。
以下代码展示了如何在待处理类型下写入数据。
C#
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。如需了解详情,请参阅 BigQuery C# API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
using Google.Api.Gax.Grpc;
using Google.Cloud.BigQuery.Storage.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using static Google.Cloud.BigQuery.Storage.V1.AppendRowsRequest.Types;
public class AppendRowsPendingSample
{
/// <summary>
/// This code sample demonstrates how to write records in pending mode.
/// Create a write stream, write some sample data, and commit the stream to append the rows.
/// The CustomerRecord proto used in the sample can be seen in Resources folder and generated C# is placed in Data folder in
/// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/tree/main/bigquery-storage/api/BigQueryStorage.Samples
/// </summary>
public async Task AppendRowsPendingAsync(string projectId, string datasetId, string tableId)
{
BigQueryWriteClient bigQueryWriteClient = await BigQueryWriteClient.CreateAsync();
// Initialize a write stream for the specified table.
// When creating the stream, choose the type. Use the Pending type to wait
// until the stream is committed before it is visible. See:
// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
WriteStream stream = new WriteStream { Type = WriteStream.Types.Type.Pending };
TableName tableName = TableName.FromProjectDatasetTable(projectId, datasetId, tableId);
stream = await bigQueryWriteClient.CreateWriteStreamAsync(tableName, stream);
// Initialize streaming call, retrieving the stream object
BigQueryWriteClient.AppendRowsStream rowAppender = bigQueryWriteClient.AppendRows();
// Sending requests and retrieving responses can be arbitrarily interleaved.
// Exact sequence will depend on client/server behavior.
// Create task to do something with responses from server.
Task appendResultsHandlerTask = Task.Run(async () =>
{
AsyncResponseStream<AppendRowsResponse> appendRowResults = rowAppender.GetResponseStream();
while (await appendRowResults.MoveNextAsync())
{
AppendRowsResponse responseItem = appendRowResults.Current;
// Do something with responses.
Console.WriteLine($"Appending rows resulted in: {responseItem.AppendResult}");
}
// The response stream has completed.
});
// List of records to be appended in the table.
List<CustomerRecord> records = new List<CustomerRecord>
{
new CustomerRecord { CustomerNumber = 1, CustomerName = "Alice" },
new CustomerRecord { CustomerNumber = 2, CustomerName = "Bob" }
};
// Create a batch of row data by appending serialized bytes to the
// SerializedRows repeated field.
ProtoData protoData = new ProtoData
{
WriterSchema = new ProtoSchema { ProtoDescriptor = CustomerRecord.Descriptor.ToProto() },
Rows = new ProtoRows { SerializedRows = { records.Select(r => r.ToByteString()) } }
};
// Initialize the append row request.
AppendRowsRequest appendRowRequest = new AppendRowsRequest
{
WriteStreamAsWriteStreamName = stream.WriteStreamName,
ProtoRows = protoData
};
// Stream a request to the server.
await rowAppender.WriteAsync(appendRowRequest);
// Append a second batch of data.
protoData = new ProtoData
{
Rows = new ProtoRows { SerializedRows = { new CustomerRecord { CustomerNumber = 3, CustomerName = "Charles" }.ToByteString() } }
};
// Since this is the second request, you only need to include the row data.
// The name of the stream and protocol buffers descriptor is only needed in
// the first request.
appendRowRequest = new AppendRowsRequest
{
// If Offset is not present, the write is performed at the current end of stream.
ProtoRows = protoData
};
await rowAppender.WriteAsync(appendRowRequest);
// Complete writing requests to the stream.
await rowAppender.WriteCompleteAsync();
// Await the handler. This will complete once all server responses have been processed.
await appendResultsHandlerTask;
// A Pending type stream must be "finalized" before being committed. No new
// records can be written to the stream after this method has been called.
await bigQueryWriteClient.FinalizeWriteStreamAsync(stream.Name);
BatchCommitWriteStreamsRequest batchCommitWriteStreamsRequest = new BatchCommitWriteStreamsRequest
{
Parent = tableName.ToString(),
WriteStreams = { stream.Name }
};
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
await bigQueryWriteClient.BatchCommitWriteStreamsAsync(batchCommitWriteStreamsRequest);
if (batchCommitWriteStreamsResponse.StreamErrors?.Count > 0)
{
// Handle errors here.
Console.WriteLine("Error committing write streams. Individual errors:");
foreach (StorageError error in batchCommitWriteStreamsResponse.StreamErrors)
{
Console.WriteLine(error.ErrorMessage);
}
}
else
{
Console.WriteLine($"Writes to stream {stream.Name} have been committed.");
}
}
}
Go
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。如需了解详情,请参阅 BigQuery Go API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import (
"context"
"fmt"
"io"
"math/rand"
"time"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"github.com/GoogleCloudPlatform/golang-samples/bigquery/snippets/managedwriter/exampleproto"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
)
// generateExampleMessages generates a slice of serialized protobuf messages using a statically defined
// and compiled protocol buffer file, and returns the binary serialized representation.
func generateExampleMessages(numMessages int) ([][]byte, error) {
msgs := make([][]byte, numMessages)
for i := 0; i < numMessages; i++ {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
// Our example data embeds an array of structs, so we'll construct that first.
sList := make([]*exampleproto.SampleStruct, 5)
for i := 0; i < int(random.Int63n(5)+1); i++ {
sList[i] = &exampleproto.SampleStruct{
SubIntCol: proto.Int64(random.Int63()),
}
}
m := &exampleproto.SampleData{
BoolCol: proto.Bool(true),
BytesCol: []byte("some bytes"),
Float64Col: proto.Float64(3.14),
Int64Col: proto.Int64(123),
StringCol: proto.String("example string value"),
// These types require special encoding/formatting to transmit.
// DATE values are number of days since the Unix epoch.
DateCol: proto.Int32(int32(time.Now().UnixNano() / 86400000000000)),
// DATETIME uses the literal format.
DatetimeCol: proto.String("2022-01-01 12:13:14.000000"),
// GEOGRAPHY uses Well-Known-Text (WKT) format.
GeographyCol: proto.String("POINT(-122.350220 47.649154)"),
// NUMERIC and BIGNUMERIC can be passed as string, or more efficiently
// using a packed byte representation.
NumericCol: proto.String("99999999999999999999999999999.999999999"),
BignumericCol: proto.String("578960446186580977117854925043439539266.34992332820282019728792003956564819967"),
// TIME also uses literal format.
TimeCol: proto.String("12:13:14.000000"),
// TIMESTAMP uses microseconds since Unix epoch.
TimestampCol: proto.Int64(time.Now().UnixNano() / 1000),
// Int64List is an array of INT64 types.
Int64List: []int64{2, 4, 6, 8},
// This is a required field, and thus must be present.
RowNum: proto.Int64(23),
// StructCol is a single nested message.
StructCol: &exampleproto.SampleStruct{
SubIntCol: proto.Int64(random.Int63()),
},
// StructList is a repeated array of a nested message.
StructList: sList,
}
b, err := proto.Marshal(m)
if err != nil {
return nil, fmt.Errorf("error generating message %d: %w", i, err)
}
msgs[i] = b
}
return msgs, nil
}
// appendToPendingStream demonstrates using the managedwriter package to write some example data
// to a pending stream, and then committing it to a table.
func appendToPendingStream(w io.Writer, projectID, datasetID, tableID string) error {
// projectID := "myproject"
// datasetID := "mydataset"
// tableID := "mytable"
ctx := context.Background()
// Instantiate a managedwriter client to handle interactions with the service.
client, err := managedwriter.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("managedwriter.NewClient: %w", err)
}
// Close the client when we exit the function.
defer client.Close()
// Create a new pending stream. We'll use the stream name to construct a writer.
pendingStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_PENDING,
},
})
if err != nil {
return fmt.Errorf("CreateWriteStream: %w", err)
}
// We need to communicate the descriptor of the protocol buffer message we're using, which
// is analagous to the "schema" for the message. Both SampleData and SampleStruct are
// two distinct messages in the compiled proto file, so we'll use adapt.NormalizeDescriptor
// to unify them into a single self-contained descriptor representation.
m := &exampleproto.SampleData{}
descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
if err != nil {
return fmt.Errorf("NormalizeDescriptor: %w", err)
}
// Instantiate a ManagedStream, which manages low level details like connection state and provides
// additional features like a future-like callback for appends, etc. NewManagedStream can also create
// the stream on your behalf, but in this example we're being explicit about stream creation.
managedStream, err := client.NewManagedStream(ctx, managedwriter.WithStreamName(pendingStream.GetName()),
managedwriter.WithSchemaDescriptor(descriptorProto))
if err != nil {
return fmt.Errorf("NewManagedStream: %w", err)
}
defer managedStream.Close()
// First, we'll append a single row.
rows, err := generateExampleMessages(1)
if err != nil {
return fmt.Errorf("generateExampleMessages: %w", err)
}
// We'll keep track of the current offset in the stream with curOffset.
var curOffset int64
// We can append data asyncronously, so we'll check our appends at the end.
var results []*managedwriter.AppendResult
result, err := managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(0))
if err != nil {
return fmt.Errorf("AppendRows first call error: %w", err)
}
results = append(results, result)
// Advance our current offset.
curOffset = curOffset + 1
// This time, we'll append three more rows in a single request.
rows, err = generateExampleMessages(3)
if err != nil {
return fmt.Errorf("generateExampleMessages: %w", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
return fmt.Errorf("AppendRows second call error: %w", err)
}
results = append(results, result)
// Advance our offset again.
curOffset = curOffset + 3
// Finally, we'll append two more rows.
rows, err = generateExampleMessages(2)
if err != nil {
return fmt.Errorf("generateExampleMessages: %w", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
return fmt.Errorf("AppendRows third call error: %w", err)
}
results = append(results, result)
// Now, we'll check that our batch of three appends all completed successfully.
// Monitoring the results could also be done out of band via a goroutine.
for k, v := range results {
// GetResult blocks until we receive a response from the API.
recvOffset, err := v.GetResult(ctx)
if err != nil {
return fmt.Errorf("append %d returned error: %w", k, err)
}
fmt.Fprintf(w, "Successfully appended data at offset %d.\n", recvOffset)
}
// We're now done appending to this stream. We now mark pending stream finalized, which blocks
// further appends.
rowCount, err := managedStream.Finalize(ctx)
if err != nil {
return fmt.Errorf("error during Finalize: %w", err)
}
fmt.Fprintf(w, "Stream %s finalized with %d rows.\n", managedStream.StreamName(), rowCount)
// To commit the data to the table, we need to run a batch commit. You can commit several streams
// atomically as a group, but in this instance we'll only commit the single stream.
req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: managedwriter.TableParentFromStreamName(managedStream.StreamName()),
WriteStreams: []string{managedStream.StreamName()},
}
resp, err := client.BatchCommitWriteStreams(ctx, req)
if err != nil {
return fmt.Errorf("client.BatchCommit: %w", err)
}
if len(resp.GetStreamErrors()) > 0 {
return fmt.Errorf("stream errors present: %v", resp.GetStreamErrors())
}
fmt.Fprintf(w, "Table data committed at %s\n", resp.GetCommitTime().AsTime().Format(time.RFC3339Nano))
return nil
}
Java
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;
public class WritePendingStream {
public static void runWritePendingStream()
throws DescriptorValidationException, InterruptedException, IOException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
writePendingStream(projectId, datasetName, tableName);
}
public static void writePendingStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
BigQueryWriteClient client = BigQueryWriteClient.create();
TableName parentTable = TableName.of(projectId, datasetName, tableName);
DataWriter writer = new DataWriter();
// One time initialization.
writer.initialize(parentTable, client);
try {
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
long offset = 0;
for (int i = 0; i < 2; i++) {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("col1", String.format("batch-record %03d-%03d", i, j));
jsonArr.put(record);
}
writer.append(jsonArr, offset);
offset += jsonArr.length();
}
} catch (ExecutionException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println("Failed to append records. \n" + e);
}
// Final cleanup for the stream.
writer.cleanup(client);
System.out.println("Appended records successfully.");
// Once all streams are done, if all writes were successful, commit all of them in one request.
// This example only has the one stream. If any streams failed, their workload may be
// retried on a new stream, and then only the successful stream should be included in the
// commit.
BatchCommitWriteStreamsRequest commitRequest =
BatchCommitWriteStreamsRequest.newBuilder()
.setParent(parentTable.toString())
.addWriteStreams(writer.getStreamName())
.build();
BatchCommitWriteStreamsResponse commitResponse = client.batchCommitWriteStreams(commitRequest);
// If the response does not have a commit time, it means the commit operation failed.
if (commitResponse.hasCommitTime() == false) {
for (StorageError err : commitResponse.getStreamErrorsList()) {
System.out.println(err.getErrorMessage());
}
throw new RuntimeException("Error committing the streams");
}
System.out.println("Appended and committed records successfully.");
}
// A simple wrapper object showing how the stateful stream writer should be used.
private static class DataWriter {
private JsonStreamWriter streamWriter;
// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
@GuardedBy("lock")
private RuntimeException error = null;
void initialize(TableName parentTable, BigQueryWriteClient client)
throws IOException, DescriptorValidationException, InterruptedException {
// Initialize a write stream for the specified table.
// For more information on WriteStream.Type, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(parentTable.toString())
.setWriteStream(stream)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build();
}
public void append(JSONArray data, long offset)
throws DescriptorValidationException, IOException, ExecutionException {
synchronized (this.lock) {
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse> future = streamWriter.append(data, offset);
ApiFutures.addCallback(
future, new AppendCompleteCallback(this), MoreExecutors.directExecutor());
// Increase the count of in-flight requests.
inflightRequestCount.register();
}
public void cleanup(BigQueryWriteClient client) {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();
// Close the connection to the server.
streamWriter.close();
// Verify that no error occurred in the stream.
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
// Finalize the stream.
FinalizeWriteStreamResponse finalizeResponse =
client.finalizeWriteStream(streamWriter.getStreamName());
System.out.println("Rows written: " + finalizeResponse.getRowCount());
}
public String getStreamName() {
return streamWriter.getStreamName();
}
static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
private final DataWriter parent;
public AppendCompleteCallback(DataWriter parent) {
this.parent = parent;
}
public void onSuccess(AppendRowsResponse response) {
System.out.format("Append %d success\n", response.getAppendResult().getOffset().getValue());
done();
}
public void onFailure(Throwable throwable) {
synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(throwable);
this.parent.error =
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error: %s\n", throwable.toString());
done();
}
private void done() {
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}
Node.js
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。如需了解详情,请参阅 BigQuery Node.js API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, Writer} = managedwriter;
const customer_record_pb = require('./customer_record_pb.js');
const {CustomerRecord} = customer_record_pb;
const protobufjs = require('protobufjs');
require('protobufjs/ext/descriptor');
async function appendRowsPending() {
/**
* If you make updates to the customer_record.proto protocol buffers definition,
* run:
* pbjs customer_record.proto -t static-module -w commonjs -o customer_record.js
* pbjs customer_record.proto -t json --keep-case -o customer_record.json
* from the /samples directory to generate the customer_record module.
*/
// So that BigQuery knows how to parse the serialized_rows, create a
// protocol buffer representation of your message descriptor.
const root = protobufjs.loadSync('./customer_record.json');
const descriptor = root.lookupType('CustomerRecord').toDescriptor('proto2');
const protoDescriptor = adapt.normalizeDescriptor(descriptor).toJSON();
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({projectId});
try {
const streamId = await writeClient.createWriteStream({
streamType,
destinationTable,
});
console.log(`Stream created: ${streamId}`);
// Append data to the given stream.
const connection = await writeClient.createStreamConnection({
streamId,
});
const writer = new Writer({
connection,
protoDescriptor,
});
let serializedRows = [];
const pendingWrites = [];
// Row 1
let row = {
rowNum: 1,
customerName: 'Octavia',
};
serializedRows.push(CustomerRecord.encode(row).finish());
// Row 2
row = {
rowNum: 2,
customerName: 'Turing',
};
serializedRows.push(CustomerRecord.encode(row).finish());
// Set an offset to allow resuming this stream if the connection breaks.
// Keep track of which requests the server has acknowledged and resume the
// stream at the first non-acknowledged message. If the server has already
// processed a message with that offset, it will return an ALREADY_EXISTS
// error, which can be safely ignored.
// The first request must always have an offset of 0.
let offsetValue = 0;
// Send batch.
let pw = writer.appendRows({serializedRows}, offsetValue);
pendingWrites.push(pw);
serializedRows = [];
// Row 3
row = {
rowNum: 3,
customerName: 'Bell',
};
serializedRows.push(CustomerRecord.encode(row).finish());
// Offset must equal the number of rows that were previously sent.
offsetValue = 2;
// Send batch.
pw = writer.appendRows({serializedRows}, offsetValue);
pendingWrites.push(pw);
const results = await Promise.all(
pendingWrites.map(pw => pw.getResult())
);
console.log('Write results:', results);
const {rowCount} = await connection.finalize();
console.log(`Row count: ${rowCount}`);
const response = await writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
});
console.log(response);
} catch (err) {
console.log(err);
} finally {
writeClient.close();
}
}
Python
此示例显示了一个包含两个字段的简单记录。如需查看展示如何发送不同数据类型(包括 STRUCT
类型)的较长示例,请参阅 GitHub 上的 append_rows_proto2 示例。
如需了解如何安装和使用 BigQuery 客户端库,请参阅 BigQuery 客户端库。如需了解详情,请参阅 BigQuery Python API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
"""
This code sample demonstrates how to write records in pending mode
using the low-level generated client for Python.
"""
from google.protobuf import descriptor_pb2
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types, writer
# If you update the customer_record.proto protocol buffer definition, run:
#
# protoc --python_out=. customer_record.proto
#
# from the samples/snippets directory to generate the customer_record_pb2.py module.
from . import customer_record_pb2
def create_row_data(row_num: int, name: str):
row = customer_record_pb2.CustomerRecord()
row.row_num = row_num
row.customer_name = name
return row.SerializeToString()
def append_rows_pending(project_id: str, dataset_id: str, table_id: str):
"""Create a write stream, write some sample data, and commit the stream."""
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
write_stream = types.WriteStream()
# When creating the stream, choose the type. Use the PENDING type to wait
# until the stream is committed before it is visible. See:
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The initial request must contain the stream name.
request_template.write_stream = stream_name
# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
proto_rows.serialized_rows.append(create_row_data(2, "Bob"))
# Set an offset to allow resuming this stream if the connection breaks.
# Keep track of which requests the server has acknowledged and resume the
# stream at the first non-acknowledged message. If the server has already
# processed a message with that offset, it will return an ALREADY_EXISTS
# error, which can be safely ignored.
#
# The first request must always have an offset of 0.
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
response_future_1 = append_rows_stream.send(request)
# Send another batch.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(3, "Charles"))
# Since this is the second request, you only need to include the row data.
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
# the first request.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
# Offset must equal the number of rows that were previously sent.
request.offset = 2
response_future_2 = append_rows_stream.send(request)
print(response_future_1.result())
print(response_future_2.result())
# Shutdown background threads and close the streaming connection.
append_rows_stream.close()
# A PENDING type stream must be "finalized" before being committed. No new
# records can be written to the stream after this method has been called.
write_client.finalize_write_stream(name=write_stream.name)
# Commit the stream you created earlier.
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)
print(f"Writes to stream: '{write_stream.name}' have been committed.")
该代码示例依赖于已编译的协议模块 customer_record_pb2.py
。如需创建已编译的模块,请执行 protoc --python_out=. customer_record.proto
,其中 protoc
是协议缓冲区编译器。customer_record.proto
文件定义了 Python 示例中使用的消息的格式。
// The BigQuery Storage API expects protocol buffer data to be encoded in the
// proto2 wire format. This allows it to disambiguate missing optional fields
// from default values without the need for wrapper types.
syntax = "proto2";
// Define a message type representing the rows in your table. The message
// cannot contain fields which are not present in the table.
message CustomerRecord {
optional string customer_name = 1;
// Use the required keyword for client-side validation of required fields.
required int64 row_num = 2;
}