Verwenden Sie den JSON-Stream-Writer, um Einträge über einen Standardclient anzuhängen.
Weitere Informationen
Eine ausführliche Dokumentation, die dieses Codebeispiel enthält, finden Sie hier:
Codebeispiel
Java
Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Java in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Java API.
Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;
public class WriteToDefaultStream {
public static void runWriteToDefaultStream()
throws DescriptorValidationException, InterruptedException, IOException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
writeToDefaultStream(projectId, datasetName, tableName);
}
private static ByteString buildByteString() {
byte[] bytes = new byte[] {1, 2, 3, 4, 5};
return ByteString.copyFrom(bytes);
}
// Create a JSON object that is compatible with the table schema.
private static JSONObject buildRecord(int i, int j) {
JSONObject record = new JSONObject();
StringBuilder sbSuffix = new StringBuilder();
for (int k = 0; k < j; k++) {
sbSuffix.append(k);
}
record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString()));
ByteString byteString = buildByteString();
record.put("test_bytes", byteString);
record.put(
"test_geo",
"POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))");
return record;
}
public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
TableName parentTable = TableName.of(projectId, datasetName, tableName);
DataWriter writer = new DataWriter();
// One time initialization for the worker.
writer.initialize(parentTable);
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
for (int i = 0; i < 2; i++) {
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = buildRecord(i, j);
jsonArr.put(record);
}
writer.append(new AppendContext(jsonArr));
}
// Final cleanup for the stream during worker teardown.
writer.cleanup();
verifyExpectedRowCount(parentTable, 12);
System.out.println("Appended records successfully.");
}
private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount)
throws InterruptedException {
String queryRowCount =
"SELECT COUNT(*) FROM `"
+ parentTable.getProject()
+ "."
+ parentTable.getDataset()
+ "."
+ parentTable.getTable()
+ "`";
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build();
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableResult results = bigquery.query(queryConfig);
int countRowsActual =
Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());
if (countRowsActual != expectedRowCount) {
throw new RuntimeException(
"Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual);
}
}
private static class AppendContext {
JSONArray data;
AppendContext(JSONArray data) {
this.data = data;
}
}
private static class DataWriter {
private static final int MAX_RECREATE_COUNT = 3;
private BigQueryWriteClient client;
// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
private JsonStreamWriter streamWriter;
@GuardedBy("lock")
private RuntimeException error = null;
private AtomicInteger recreateCount = new AtomicInteger(0);
private JsonStreamWriter createStreamWriter(String tableName)
throws DescriptorValidationException, IOException, InterruptedException {
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return JsonStreamWriter.newBuilder(tableName, client)
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveWithoutCalls(true)
.setChannelsPerCpu(2)
.build())
.setEnableConnectionPool(true)
// If value is missing in json and there is a default value configured on bigquery
// column, apply the default value to the missing value field.
.setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
.setRetrySettings(retrySettings)
.build();
}
public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Initialize client without settings, internally within stream writer a new client will be
// created with full settings.
client = BigQueryWriteClient.create();
streamWriter = createStreamWriter(parentTable.toString());
}
public void append(AppendContext appendContext)
throws DescriptorValidationException, IOException, InterruptedException {
synchronized (this.lock) {
if (!streamWriter.isUserClosed()
&& streamWriter.isClosed()
&& recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter = createStreamWriter(streamWriter.getStreamName());
this.error = null;
}
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.data);
ApiFutures.addCallback(
future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor());
// Increase the count of in-flight requests.
inflightRequestCount.register();
}
public void cleanup() {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();
client.close();
// Close the connection to the server.
streamWriter.close();
// Verify that no error occurred in the stream.
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
}
static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
private final DataWriter parent;
private final AppendContext appendContext;
public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
this.parent = parent;
this.appendContext = appendContext;
}
public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
this.parent.recreateCount.set(0);
done();
}
public void onFailure(Throwable throwable) {
if (throwable instanceof AppendSerializationError) {
AppendSerializationError ase = (AppendSerializationError) throwable;
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
if (rowIndexToErrorMessage.size() > 0) {
// Omit the faulty rows
JSONArray dataNew = new JSONArray();
for (int i = 0; i < appendContext.data.length(); i++) {
if (!rowIndexToErrorMessage.containsKey(i)) {
dataNew.put(appendContext.data.get(i));
} else {
// process faulty rows by placing them on a dead-letter-queue, for instance
}
}
// Retry the remaining valid rows, but using a separate thread to
// avoid potentially blocking while we are in a callback.
if (dataNew.length() > 0) {
try {
this.parent.append(new AppendContext(dataNew));
} catch (DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// Mark the existing attempt as done since we got a response for it
done();
return;
}
}
boolean resendRequest = false;
if (throwable instanceof MaximumRequestCallbackWaitTimeExceededException) {
resendRequest = true;
} else if (throwable instanceof StreamWriterClosedException) {
if (!parent.streamWriter.isUserClosed()) {
resendRequest = true;
}
}
if (resendRequest) {
// Retry this request.
try {
this.parent.append(new AppendContext(appendContext.data));
} catch (DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Mark the existing attempt as done since we got a response for it
done();
return;
}
synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(throwable);
this.parent.error =
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
done();
}
private void done() {
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}
Node.js
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Node.js-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Node.js API.
Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
async function appendJSONRowsDefaultStream() {
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const writeClient = new WriterClient({projectId});
try {
const writeStream = await writeClient.getWriteStream({
streamId: `${destinationTable}/streams/_default`,
view: 'FULL',
});
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
writeStream.tableSchema,
'root'
);
const connection = await writeClient.createStreamConnection({
streamId: managedwriter.DefaultStream,
destinationTable,
});
const streamId = connection.getStreamId();
const writer = new JSONWriter({
streamId,
connection,
protoDescriptor,
});
let rows = [];
const pendingWrites = [];
// Row 1
let row = {
row_num: 1,
customer_name: 'Octavia',
};
rows.push(row);
// Row 2
row = {
row_num: 2,
customer_name: 'Turing',
};
rows.push(row);
// Send batch.
let pw = writer.appendRows(rows);
pendingWrites.push(pw);
rows = [];
// Row 3
row = {
row_num: 3,
customer_name: 'Bell',
};
rows.push(row);
// Send batch.
pw = writer.appendRows(rows);
pendingWrites.push(pw);
const results = await Promise.all(
pendingWrites.map(pw => pw.getResult())
);
console.log('Write results:', results);
} catch (err) {
console.log(err);
} finally {
writeClient.close();
}
}
Nächste Schritte
Informationen zum Suchen und Filtern von Codebeispielen für andere Google Cloud-Produkte finden Sie im Google Cloud-Beispielbrowser.