Trasmetti il flusso di dati utilizzando l'API Storage Scrivi
Questo documento descrive come utilizzare
API BigQuery StorageWrite per inviare flussi di dati
in BigQuery.
Negli scenari in modalità flusso, i dati arrivano continuamente e dovrebbero essere disponibili
legge con latenza minima. Quando utilizzi l'API BigQuery StorageWrite per i flussi di dati
carichi di lavoro, valuta le garanzie di cui hai bisogno:
- Se la tua applicazione ha bisogno solo di semantica almeno una volta, utilizza il metodo predefinito
streaming.
- Se hai bisogno di semantica "exactly-once", crea uno o più flussi in
confermato e usare gli offset di flusso per garantire le scritture "exactly-once".
Nel tipo sottoposto a commit, i dati scritti nel flusso sono disponibili per la query non appena
il server riconosce la richiesta di scrittura. Lo stream predefinito utilizza anche
ma non fornisce garanzie "exactly-once".
Usa il flusso predefinito per la semantica "at-least-once"
Se la tua applicazione accetta la possibilità di visualizzare record duplicati
visualizzato nella tabella di destinazione, ti consigliamo di utilizzare
stream predefinito per lo streaming
diversi scenari.
Il codice seguente mostra come scrivere dati nel flusso predefinito:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta
Librerie client di BigQuery.
Per ulteriori informazioni, consulta
API Java BigQuery
documentazione di riferimento.
Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione.
Per ulteriori informazioni, vedi
Configura l'autenticazione per le librerie client.
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;
public class WriteToDefaultStream {
public static void runWriteToDefaultStream()
throws DescriptorValidationException, InterruptedException, IOException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
writeToDefaultStream(projectId, datasetName, tableName);
}
private static ByteString buildByteString() {
byte[] bytes = new byte[] {1, 2, 3, 4, 5};
return ByteString.copyFrom(bytes);
}
// Create a JSON object that is compatible with the table schema.
private static JSONObject buildRecord(int i, int j) {
JSONObject record = new JSONObject();
StringBuilder sbSuffix = new StringBuilder();
for (int k = 0; k < j; k++) {
sbSuffix.append(k);
}
record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString()));
ByteString byteString = buildByteString();
record.put("test_bytes", byteString);
record.put(
"test_geo",
"POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))");
return record;
}
public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
TableName parentTable = TableName.of(projectId, datasetName, tableName);
DataWriter writer = new DataWriter();
// One time initialization for the worker.
writer.initialize(parentTable);
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
for (int i = 0; i < 2; i++) {
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = buildRecord(i, j);
jsonArr.put(record);
}
writer.append(new AppendContext(jsonArr));
}
// Final cleanup for the stream during worker teardown.
writer.cleanup();
verifyExpectedRowCount(parentTable, 12);
System.out.println("Appended records successfully.");
}
private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount)
throws InterruptedException {
String queryRowCount =
"SELECT COUNT(*) FROM `"
+ parentTable.getProject()
+ "."
+ parentTable.getDataset()
+ "."
+ parentTable.getTable()
+ "`";
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build();
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableResult results = bigquery.query(queryConfig);
int countRowsActual =
Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());
if (countRowsActual != expectedRowCount) {
throw new RuntimeException(
"Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual);
}
}
private static class AppendContext {
JSONArray data;
AppendContext(JSONArray data) {
this.data = data;
}
}
private static class DataWriter {
private static final int MAX_RECREATE_COUNT = 3;
private BigQueryWriteClient client;
// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
private JsonStreamWriter streamWriter;
@GuardedBy("lock")
private RuntimeException error = null;
private AtomicInteger recreateCount = new AtomicInteger(0);
private JsonStreamWriter createStreamWriter(String tableName)
throws DescriptorValidationException, IOException, InterruptedException {
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return JsonStreamWriter.newBuilder(tableName, client)
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveWithoutCalls(true)
.setChannelsPerCpu(2)
.build())
.setEnableConnectionPool(true)
// If value is missing in json and there is a default value configured on bigquery
// column, apply the default value to the missing value field.
.setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
.setRetrySettings(retrySettings)
.build();
}
public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Initialize client without settings, internally within stream writer a new client will be
// created with full settings.
client = BigQueryWriteClient.create();
streamWriter = createStreamWriter(parentTable.toString());
}
public void append(AppendContext appendContext)
throws DescriptorValidationException, IOException, InterruptedException {
synchronized (this.lock) {
if (!streamWriter.isUserClosed()
&& streamWriter.isClosed()
&& recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter = createStreamWriter(streamWriter.getStreamName());
this.error = null;
}
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.data);
ApiFutures.addCallback(
future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor());
// Increase the count of in-flight requests.
inflightRequestCount.register();
}
public void cleanup() {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();
client.close();
// Close the connection to the server.
streamWriter.close();
// Verify that no error occurred in the stream.
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
}
static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
private final DataWriter parent;
private final AppendContext appendContext;
public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
this.parent = parent;
this.appendContext = appendContext;
}
public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
this.parent.recreateCount.set(0);
done();
}
public void onFailure(Throwable throwable) {
if (throwable instanceof AppendSerializationError) {
AppendSerializationError ase = (AppendSerializationError) throwable;
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
if (rowIndexToErrorMessage.size() > 0) {
// Omit the faulty rows
JSONArray dataNew = new JSONArray();
for (int i = 0; i < appendContext.data.length(); i++) {
if (!rowIndexToErrorMessage.containsKey(i)) {
dataNew.put(appendContext.data.get(i));
} else {
// process faulty rows by placing them on a dead-letter-queue, for instance
}
}
// Retry the remaining valid rows, but using a separate thread to
// avoid potentially blocking while we are in a callback.
if (dataNew.length() > 0) {
try {
this.parent.append(new AppendContext(dataNew));
} catch (DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// Mark the existing attempt as done since we got a response for it
done();
return;
}
}
boolean resendRequest = false;
if (throwable instanceof MaximumRequestCallbackWaitTimeExceededException) {
resendRequest = true;
} else if (throwable instanceof StreamWriterClosedException) {
if (!parent.streamWriter.isUserClosed()) {
resendRequest = true;
}
}
if (resendRequest) {
// Retry this request.
try {
this.parent.append(new AppendContext(appendContext.data));
} catch (DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Mark the existing attempt as done since we got a response for it
done();
return;
}
synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(throwable);
this.parent.error =
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
done();
}
private void done() {
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}
Utilizzare il multiplexing
Attiva
multiplexing
a livello di autore dello stream solo per lo stream predefinito. Per attivare il multiplexing in
Java, chiama il metodo setEnableConnectionPool
quando crei un
Oggetto StreamWriter
o JsonStreamWriter
:
// One possible way for constructing StreamWriter
StreamWriter.newBuilder(streamName)
.setWriterSchema(protoSchema)
.setEnableConnectionPool(true)
.build();
// One possible way for constructing JsonStreamWriter
JsonStreamWriter.newBuilder(tableName, bigqueryClient)
.setEnableConnectionPool(true)
.build();
Per attivare il multiplexing in Go, consulta
Condivisione della connessione (multiplexing).
Utilizza il tipo di commit per la semantica "exactly-once"
Se hai bisogno di semantica della scrittura "exactly-once", crea un flusso di scrittura nel commit
di testo. Con il tipo di commit, i record sono disponibili per la query non appena il client
riceve conferma dal backend.
Il tipo impegnato fornisce la pubblicazione "exactly-once" all'interno di uno stream mediante l'uso di
degli offset record. Utilizzando gli offset di record, l'applicazione specifica
aggiungere l'offset in ogni chiamata a AppendRows
. L'operazione di scrittura
eseguita solo se il valore di offset corrisponde all'offset dell'aggiunta successiva. Per ulteriori informazioni
le informazioni, vedi
Gestisci gli offset dei flussi per ottenere una semantica "exactly-once".
Se non specifichi un offset, i record verranno aggiunti alla fine corrente di
durante lo streaming. In questo caso, se una richiesta di aggiunta restituisce un errore, riprova
il record potrebbe apparire più volte nello stream.
Per utilizzare il tipo di commit, segui questi passaggi:
Java
- Richiama
CreateWriteStream
per creare uno o più stream di tipo sottoposto a commit.
- Per ogni flusso, chiama
AppendRows
in un loop per scrivere batch di record.
- Chiama
FinalizeWriteStream
per ogni stream per rilasciare lo stream. Dopo
non puoi scrivere altre righe nel flusso. Questo passaggio è
facoltativo nel tipo di commit, ma aiuta a evitare di superare il limite
di stream attivi. Per ulteriori informazioni, vedi
Limita la frequenza di creazione dello stream.
Node.js
- Richiama
createWriteStreamFullResponse
per creare uno o più stream di tipo sottoposto a commit.
- Per ogni flusso, chiama
appendRows
in un loop per scrivere batch di record.
- Chiama
finalize
per ogni stream per rilasciare lo stream. Dopo
non puoi scrivere altre righe nel flusso. Questo passaggio è
facoltativo nel tipo di commit, ma aiuta a evitare di superare il limite
di stream attivi. Per ulteriori informazioni, vedi
Limita la frequenza di creazione dello stream.
Non puoi eliminare uno stream in modo esplicito. Gli stream seguono la durata (TTL) definita dal sistema:
- Uno stream impegnato ha un TTL di tre giorni se non c'è traffico nello stream.
- Per impostazione predefinita, uno stream con buffer ha un TTL di sette giorni se non c'è traffico nello stream.
Il codice seguente mostra come utilizzare il tipo di commit:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta
Librerie client di BigQuery.
Per ulteriori informazioni, consulta
API Java BigQuery
documentazione di riferimento.
Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione.
Per ulteriori informazioni, vedi
Configura l'autenticazione per le librerie client.
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;
public class WriteCommittedStream {
public static void runWriteCommittedStream()
throws DescriptorValidationException, InterruptedException, IOException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
writeCommittedStream(projectId, datasetName, tableName);
}
public static void writeCommittedStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
BigQueryWriteClient client = BigQueryWriteClient.create();
TableName parentTable = TableName.of(projectId, datasetName, tableName);
DataWriter writer = new DataWriter();
// One time initialization.
writer.initialize(parentTable, client);
try {
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
long offset = 0;
for (int i = 0; i < 2; i++) {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("col1", String.format("batch-record %03d-%03d", i, j));
jsonArr.put(record);
}
writer.append(jsonArr, offset);
offset += jsonArr.length();
}
} catch (ExecutionException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println("Failed to append records. \n" + e);
}
// Final cleanup for the stream.
writer.cleanup(client);
System.out.println("Appended records successfully.");
}
// A simple wrapper object showing how the stateful stream writer should be used.
private static class DataWriter {
private JsonStreamWriter streamWriter;
// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
@GuardedBy("lock")
private RuntimeException error = null;
void initialize(TableName parentTable, BigQueryWriteClient client)
throws IOException, DescriptorValidationException, InterruptedException {
// Initialize a write stream for the specified table.
// For more information on WriteStream.Type, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(parentTable.toString())
.setWriteStream(stream)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.setRetrySettings(retrySettings)
.build();
}
public void append(JSONArray data, long offset)
throws DescriptorValidationException, IOException, ExecutionException {
synchronized (this.lock) {
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse> future = streamWriter.append(data, offset);
ApiFutures.addCallback(
future, new DataWriter.AppendCompleteCallback(this), MoreExecutors.directExecutor());
// Increase the count of in-flight requests.
inflightRequestCount.register();
}
public void cleanup(BigQueryWriteClient client) {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();
// Close the connection to the server.
streamWriter.close();
// Verify that no error occurred in the stream.
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
// Finalize the stream.
FinalizeWriteStreamResponse finalizeResponse =
client.finalizeWriteStream(streamWriter.getStreamName());
System.out.println("Rows written: " + finalizeResponse.getRowCount());
}
public String getStreamName() {
return streamWriter.getStreamName();
}
static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
private final DataWriter parent;
public AppendCompleteCallback(DataWriter parent) {
this.parent = parent;
}
public void onSuccess(AppendRowsResponse response) {
System.out.format("Append %d success\n", response.getAppendResult().getOffset().getValue());
done();
}
public void onFailure(Throwable throwable) {
synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(throwable);
this.parent.error =
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error: %s\n", throwable.toString());
done();
}
private void done() {
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}