使用 JSON 流写入工具附加待处理记录。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Java
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;
public class WritePendingStream {
public static void runWritePendingStream()
throws DescriptorValidationException, InterruptedException, IOException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
writePendingStream(projectId, datasetName, tableName);
}
public static void writePendingStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
BigQueryWriteClient client = BigQueryWriteClient.create();
TableName parentTable = TableName.of(projectId, datasetName, tableName);
DataWriter writer = new DataWriter();
// One time initialization.
writer.initialize(parentTable, client);
try {
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
long offset = 0;
for (int i = 0; i < 2; i++) {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("col1", String.format("batch-record %03d-%03d", i, j));
jsonArr.put(record);
}
writer.append(jsonArr, offset);
offset += jsonArr.length();
}
} catch (ExecutionException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println("Failed to append records. \n" + e);
}
// Final cleanup for the stream.
writer.cleanup(client);
System.out.println("Appended records successfully.");
// Once all streams are done, if all writes were successful, commit all of them in one request.
// This example only has the one stream. If any streams failed, their workload may be
// retried on a new stream, and then only the successful stream should be included in the
// commit.
BatchCommitWriteStreamsRequest commitRequest =
BatchCommitWriteStreamsRequest.newBuilder()
.setParent(parentTable.toString())
.addWriteStreams(writer.getStreamName())
.build();
BatchCommitWriteStreamsResponse commitResponse = client.batchCommitWriteStreams(commitRequest);
// If the response does not have a commit time, it means the commit operation failed.
if (commitResponse.hasCommitTime() == false) {
for (StorageError err : commitResponse.getStreamErrorsList()) {
System.out.println(err.getErrorMessage());
}
throw new RuntimeException("Error committing the streams");
}
System.out.println("Appended and committed records successfully.");
}
// A simple wrapper object showing how the stateful stream writer should be used.
private static class DataWriter {
private JsonStreamWriter streamWriter;
// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
@GuardedBy("lock")
private RuntimeException error = null;
void initialize(TableName parentTable, BigQueryWriteClient client)
throws IOException, DescriptorValidationException, InterruptedException {
// Initialize a write stream for the specified table.
// For more information on WriteStream.Type, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(parentTable.toString())
.setWriteStream(stream)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build();
}
public void append(JSONArray data, long offset)
throws DescriptorValidationException, IOException, ExecutionException {
synchronized (this.lock) {
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse> future = streamWriter.append(data, offset);
ApiFutures.addCallback(
future, new AppendCompleteCallback(this), MoreExecutors.directExecutor());
// Increase the count of in-flight requests.
inflightRequestCount.register();
}
public void cleanup(BigQueryWriteClient client) {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();
// Close the connection to the server.
streamWriter.close();
// Verify that no error occurred in the stream.
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
// Finalize the stream.
FinalizeWriteStreamResponse finalizeResponse =
client.finalizeWriteStream(streamWriter.getStreamName());
System.out.println("Rows written: " + finalizeResponse.getRowCount());
}
public String getStreamName() {
return streamWriter.getStreamName();
}
static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
private final DataWriter parent;
public AppendCompleteCallback(DataWriter parent) {
this.parent = parent;
}
public void onSuccess(AppendRowsResponse response) {
System.out.format("Append %d success\n", response.getAppendResult().getOffset().getValue());
done();
}
public void onFailure(Throwable throwable) {
synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(throwable);
this.parent.error =
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error: %s\n", throwable.toString());
done();
}
private void done() {
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}
Node.js
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');
async function appendRowsPendingStream() {
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({projectId: projectId});
const bigquery = new BigQuery({projectId: projectId});
try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);
const connection = await writeClient.createStreamConnection({
streamType,
destinationTable,
});
const streamId = connection.getStreamId();
console.log(`Stream created: ${streamId}`);
const writer = new JSONWriter({
connection,
protoDescriptor,
});
let rows = [];
const pendingWrites = [];
// Row 1
let row = {
row_num: 1,
bool_col: true,
bytes_col: Buffer.from('hello world'),
float64_col: parseFloat('+123.44999694824219'),
int64_col: 123,
string_col: 'omg',
};
rows.push(row);
// Row 2
row = {
row_num: 2,
bool_col: false,
};
rows.push(row);
// Row 3
row = {
row_num: 3,
bytes_col: Buffer.from('later, gator'),
};
rows.push(row);
// Row 4
row = {
row_num: 4,
float64_col: 987.6539916992188,
};
rows.push(row);
// Row 5
row = {
row_num: 5,
int64_col: 321,
};
rows.push(row);
// Row 6
row = {
row_num: 6,
string_col: 'octavia',
};
rows.push(row);
// Set an offset to allow resuming this stream if the connection breaks.
// Keep track of which requests the server has acknowledged and resume the
// stream at the first non-acknowledged message. If the server has already
// processed a message with that offset, it will return an ALREADY_EXISTS
// error, which can be safely ignored.
// The first request must always have an offset of 0.
let offsetValue = 0;
// Send batch.
let pw = writer.appendRows(rows, offsetValue);
pendingWrites.push(pw);
// Reset rows.
rows = [];
// Row 7
row = {
row_num: 7,
date_col: new Date('2019-02-07'),
};
rows.push(row);
// Row 8
row = {
row_num: 8,
datetime_col: new Date('2019-02-17T11:24:00.000Z'),
};
rows.push(row);
// Row 9
row = {
row_num: 9,
geography_col: 'POINT(5 5)',
};
rows.push(row);
// Row 10
row = {
row_num: 10,
numeric_col: 123456,
bignumeric_col: '99999999999999999999999999999.999999999',
};
rows.push(row);
// Row 11
row = {
row_num: 11,
time_col: '18:00:00',
};
rows.push(row);
// Row 12
row = {
row_num: 12,
timestamp_col: new Date('2022-01-09T03:49:46.564Z'),
};
rows.push(row);
// Offset must equal the number of rows that were previously sent.
offsetValue = 6;
// Send batch.
pw = writer.appendRows(rows, offsetValue);
pendingWrites.push(pw);
rows = [];
// Row 13
row = {
row_num: 13,
int64_list: [1999, 2001],
};
rows.push(row);
// Row 14
row = {
row_num: 14,
struct_col: {
sub_int_col: 99,
},
};
rows.push(row);
// Row 15
row = {
row_num: 15,
struct_list: [{sub_int_col: 100}, {sub_int_col: 101}],
};
rows.push(row);
offsetValue = 12;
// Send batch.
pw = writer.appendRows(rows, offsetValue);
pendingWrites.push(pw);
const results = await Promise.all(
pendingWrites.map(pw => pw.getResult())
);
console.log('Write results:', results);
const {rowCount} = await connection.finalize();
console.log(`Row count: ${rowCount}`);
const response = await writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
});
console.log(response);
} catch (err) {
console.log(err.message, err);
} finally {
writeClient.close();
}
}
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。