Using the BigQuery Storage Write API

The BigQuery Storage Write API is a gRPC-based streaming API for ingesting data into BigQuery at high throughput. The Storage Write API combines the functionality of streaming ingestion and batch loading into a single API. You can use the Storage Write API to stream records into BigQuery that become available for query as they are written, or to batch process an arbitrarily large number of records and commit them in a single atomic operation.

Advantages of using the Storage Write API

Exactly-once delivery semantics. The Storage Write API supports exactly-once semantics through the use of stream offsets. Unlike the insertAll method, the Storage Write API never writes two messages that have the same offset within a stream, if the client provides stream offsets when appending records.

Stream-level transactions. You can write data to a stream and commit the data as a single transaction. If the commit operation fails, you can safely retry the operation. For example, in a streaming pipeline, if a worker fails, another worker can pick up the task.

Transactions across streams. Multiple workers can create their own streams to process data independently. When all the workers have finished, you can commit all of the streams as a transaction.

Efficient protocol. The Storage Write API is more efficient than the older insertAll method because it uses gRPC streaming rather than REST over HTTP. The Storage Write API also supports binary formats in the form of protocol buffers, which are a more efficient wire format than JSON. Write requests are asynchronous with guaranteed ordering.

Schema update detection. If the underlying table schema changes while the client is streaming, then the Storage Write API notifies the client. The client can decide whether to reconnect using the updated schema, or continue to write to the existing connection.

Required permissions

To use the Storage Write API, you must have bigquery.tables.updateData permissions.

The following predefined Identity and Access Management (IAM) roles include bigquery.tables.updateData permissions:

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

For more information about IAM roles and permissions in BigQuery, see Predefined roles and permissions.

Overview of using the Storage Write API

The Storage Write API supports two modes:

  • Committed mode. In this mode, the client writes records that are available immediately for querying. Use this mode for streaming workloads.

  • Pending mode. In this mode, the client writes a batch of records, which are buffered in a pending state. After the client finishes writing, it commits each pending stream. Once a stream is committed, the data that was written to the stream is available for reading. The commit is an atomic operation. Use this mode for batch workloads, as an alternative to BigQuery load jobs.

You can use the Storage Write API by calling the gRPC API directly or by using the client library, which is available for Java. In general, we recommend using the client library because it provides a simpler programming interface and manages the underlying bidirectional streaming RPC for you.

The client library provides two writer objects:

  • StreamWriterV2. The StreamWriterV2 object accepts data in protocol buffer format.

  • JsonStreamWriter. The JsonStreamWriter object accepts data in JSON format and converts it to protocol buffers. If you already have JSON format data in your application, you can use the JsonStreamWriter to ingest your data. The JsonStreamWriter automatically converts the data on the client side to the Storage Write API's underlying binary format before sending it over the wire.

The programming model is similar for both writers. The main difference is how you format the payload. The next sections describe how to use the client library for committed mode and pending mode.

Using the Storage Write API with the Java client

The next sections describe how to use the Java client to write data to the Storage Write API.

Write to a stream in committed mode

In committed mode, records are available for query as soon as the client receives acknowledgement from the back end. Committed mode provides exactly-once delivery within a stream through the use of record offsets. The application can specify the offset number for each record. The write operation is only performed if the offset value matches the next append offset. If no offset is provided, records are appended to the current end of the stream. In that case, if an append returns an error, retrying it could result in the record appearing more than once in the stream.

To write records in committed mode using the JsonStreamWriter object, perform the following steps:

  1. Create a new instance of the BigQueryWriteClient object.

  2. Call BigQueryWriteClient.createWriteStream to create a WriteStream object. For the mode, specify WriteStream.Type.COMMITTED.

  3. Create a JsonStreamWriter instance. Provide a schema that is compatible with the table schema. You can construct a schema manually as a TableSchema object or fetch the schema from the BigQuery table.

  4. Call JsonStreamWriter.append and pass in a JSONArray array of JSONObject records. Optionally, specify the offset of the record in the stream. If you provide the offset, then the Storage Write API never writes two messages that have the same offset. The append method is asynchronous. Ordering is guaranteed if you provide offsets.

Java

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Java API reference documentation.

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteCommittedStream {

  public static void runWriteCommittedStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";

    writeCommittedStream(projectId, datasetName, tableName);
  }

  public static void writeCommittedStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {

    try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
      // Initialize a write stream for the specified table.
      // For more information on WriteStream.Type, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html
      WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
      TableName parentTable = TableName.of(projectId, datasetName, tableName);
      CreateWriteStreamRequest createWriteStreamRequest =
          CreateWriteStreamRequest.newBuilder()
              .setParent(parentTable.toString())
              .setWriteStream(stream)
              .build();
      WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

      // Use the JSON stream writer to send records in JSON format.
      // For more information about JsonStreamWriter, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
      try (JsonStreamWriter writer =
          JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
              .build()) {
        // Append 10 JSON objects to the stream.
        for (int i = 0; i < 10; i++) {
          // Create a JSON object that is compatible with the table schema.
          JSONObject record = new JSONObject();
          record.put("col1", String.format("record %03d", i));
          JSONArray jsonArr = new JSONArray();
          jsonArr.put(record);

          // To detect duplicate records, pass the index as the record offset.
          // To disable deduplication, omit the offset or use WriteStream.Type.DEFAULT.
          ApiFuture<AppendRowsResponse> future = writer.append(jsonArr, /*offset=*/ i);
          AppendRowsResponse response = future.get();
        }
      }
      System.out.println("Appended records successfully.");
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println("Failed to append records. \n" + e.toString());
    }
  }
}

The StreamWriterV2 object is similar to the JsonStreamWriter object, but the StreamWriter.append method takes an AppendRowsRequest object, which holds a list of protocol buffer messages, rather than JSON objects.

Write to the default stream

The Storage Write API provides a default stream that appends records in committed mode without exactly-once guarantees. The default stream can potentially support a higher throughput than creating a dedicated stream. Also, the default stream is not subject to the quota limit on creating write streams. Any number of clients can write simultaneously to the default stream.

The following code shows how to use the default stream with the JsonStreamWriter object:

Java

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Java API reference documentation.

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteToDefaultStream {

  public static void runWriteToDefaultStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    writeToDefaultStream(projectId, datasetName, tableName);
  }

  public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {
    BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
    Table table = bigquery.getTable(datasetName, tableName);
    TableName parentTable = TableName.of(projectId, datasetName, tableName);
    Schema schema = table.getDefinition().getSchema();
    TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);

    // Use the JSON stream writer to send records in JSON format.
    // For more information about JsonStreamWriter, see:
    // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
    try (JsonStreamWriter writer =
        JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
      // Append 10 JSON objects to the stream.
      for (int i = 0; i < 10; i++) {
        // Create a JSON object that is compatible with the table schema.
        JSONObject record = new JSONObject();
        record.put("test_string", String.format("record %03d", i));
        JSONArray jsonArr = new JSONArray();
        jsonArr.put(record);

        ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
        AppendRowsResponse response = future.get();
      }
      System.out.println("Appended records successfully.");
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println("Failed to append records. \n" + e.toString());
    }
  }
}

This code is similar to the previous example, but instead of calling BigQueryWriteClient.createWriteStream to create a write stream, JsonStreamWriter.Builder creates a default write stream for you given a table name.

Write to a stream in pending mode

Pending mode is useful for bulk loading data in large batches. It's also useful for loading smaller batches, which is a common pattern in some data processing architectures.

Pending mode supports stream-level transactions. A stream can only be committed once. Multiple workers can work on the same unit of data without causing the same data to be written twice. The commit is at atomic operation. If it fails, you can retry it.

To use the JsonStreamWriter object in pending mode, perform the following steps:

  1. Create an instance of the BigQueryWriteClient object.

  2. Call BigQueryWriteClient.createWriteStream to create one or more WriteStream objects. For the mode, specify WriteStream.Type.PENDING.

  3. Call StreamWriter.append and pass in a JSONArray of JSONObject records. Optionally, if you want duplicate message detection, then specify the offset of the record in the stream.

  4. When you finish writing to a stream, call BigQueryWriteClient.finalizeWriteStream to finalize the stream. After you call this method, you can't write any more records to that stream.

  5. When all streams are finalized, call BigQueryWriteClient.batchCommitWriteStreams with the list of streams. This method commits the data for those streams.

  6. Call BatchCommitWriteStreamsResponse.hasCommitTime on the returned BatchCommitWriteStreamsResponse object. If this method returns false, then the commit failed. Call the getStreamErrorsList method to get the list of errors for each stream that failed.

Java

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Java API reference documentation.

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.StorageError;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WritePendingStream {

  public static void runWritePendingStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";

    writePendingStream(projectId, datasetName, tableName);
  }

  public static void writePendingStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {
    try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
      // Initialize a write stream for the specified table.
      // For more information on WriteStream.Type, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html
      WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();
      TableName parentTable = TableName.of(projectId, datasetName, tableName);
      CreateWriteStreamRequest createWriteStreamRequest =
          CreateWriteStreamRequest.newBuilder()
              .setParent(parentTable.toString())
              .setWriteStream(stream)
              .build();
      WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

      // Use the JSON stream writer to send records in JSON format.
      // For more information about JsonStreamWriter, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
      try (JsonStreamWriter writer =
          JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
              .build()) {
        // Append 10 JSON objects to the stream.
        for (int i = 0; i < 10; i++) {
          // Create a JSON object that is compatible with the table schema.
          JSONObject record = new JSONObject();
          record.put("col1", String.format("batch-record %03d", i));
          JSONArray jsonArr = new JSONArray();
          jsonArr.put(record);

          ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
          AppendRowsResponse response = future.get();
        }
        FinalizeWriteStreamResponse finalizeResponse =
            client.finalizeWriteStream(writeStream.getName());
        System.out.println("Rows written: " + finalizeResponse.getRowCount());
      }

      // Commit the streams.
      BatchCommitWriteStreamsRequest commitRequest =
          BatchCommitWriteStreamsRequest.newBuilder()
              .setParent(parentTable.toString())
              .addWriteStreams(writeStream.getName())
              .build();
      BatchCommitWriteStreamsResponse commitResponse =
          client.batchCommitWriteStreams(commitRequest);
      // If the response does not have a commit time, it means the commit operation failed.
      if (commitResponse.hasCommitTime() == false) {
        for (StorageError err : commitResponse.getStreamErrorsList()) {
          System.out.println(err.getErrorMessage());
        }
        throw new RuntimeException("Error committing the streams");
      }
      System.out.println("Appended and committed records successfully.");
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println(e);
    }
  }
}

One architectural pattern for using pending mode is to have a coordinator and multiple workers. The coordinator calls createWriteStream and passes it to a worker. Each worker calls append multiple times as needed, and then calls finalizeWriteStream. When all workers are finished, the coordinator calls batchCommitWriteStream. Because committing a stream is an atomic operation, if a worker fails, then another worker can pick up the same processing task and retry it.

Error handling

If the append method throws a StatusRuntimeException exception, then you should check the status code in the exception. The status code might be any of the following:

  • ALREADY_EXISTS: The row was already written. This error can happen when you provide stream offsets. It indicates that a duplicate record was detected. You can safely ignore this error.
  • INTERNAL, CANCELLED, or ABORTED: The operation could not be completed. You can safely retry the operation.
  • INVALID_ARGUMENT: Invalid argument. This error is an application error.
  • NOT_FOUND: The stream or table was not found.
  • OUT_OF_RANGE. The offset is beyond the current write offset. This error can happen if you provide stream offsets and a previous write operation failed. In that case, you can retry from the last successful write. This error can also happen if the application sets the wrong offset value.
  • PERMISSION_DENIED. The application does not have permission to write to this table.

If you receive an error that's not listed above, then try to open a new connection by closing the writer object and creating a new instance.

Working with schemas

When you create the writer object, you specify a schema. This schema must be compatible with the BigQuery table schema:

  • It must include all of the required fields in table schema.
  • It cannot include any extra fields that aren't in the table schema.
  • Data types must be compatible. For more information, see Data type conversions on this page.

When you use the JsonStreamWriter object, you provide the schema as a TableSchema object.

When you use the StreamWriterV2 object, you provide the schema as a ProtoSchema object. The ProtoSchema object is a thin wrapper for a google.protobuf.DescriptorProto object that describes the schema. You can use theProtoSchemaConverter helper class to convert a DescriptorProto object into a self-contained ProtoSchema object. Also, you can use the SchemaCompatibility class to check whether the ProtoSchema object is compatible with the BigQuery table schema.

It's possible for a table schema to change after you start a write session. (For information about table schema changes, see Modifying table schemas.) If BigQuery detects a change in the schema, then the AppendRows response contains the updated schema. To check whether the schema was updated, call the AppendRowsResponse.hasUpdatedSchema method.

With the JsonStreamWriter object, you can start sending records that use the updated schema after hasUpdatedSchema returns true. You don't need to close the connection or reconfigure the writer object. For the StreamWriterV2 object, close the current instance and create a new instance with the updated schema.

Using the gRPC API directly

Although we recommend using the client library, you can also use the Storage Write API by calling the gRPC API directly, as follows:

  1. Call CreateWriteStream to create a write stream. Specify the name of the parent table and the mode (either PENDING or COMMITTED).

  2. In a loop, call AppendRows to append one or more records. Format each record as a protocol buffer serialized to a byte sequence. Optionally, include the write offset. The first call to AppendRows must contain the schema of the stream, specified as a DescriptorProto object.

  3. In pending mode, call FinalizeWriteStream for each stream. After you call this method, you cannot write any more rows to the stream. This method is not required in committed mode.

  4. In pending mode, after you finalize the streams, call BatchCommitWriteStreams to commit the streams. After you call this method, the data becomes available for reading. This method is not required in committed mode.

Data type conversions

The following table shows the supported protocol buffer types for each BigQuery data type:

BigQuery data type Supported protocol buffer types
BOOL bool, int32, int64, uint32, uint64
BYTES bytes
DATE int32 (preferred), int64

The value is the number of days since the Unix epoch (1970-01-01). The valid range is `-719162` (0001-01-01) to `2932896` (9999-12-31).

DATETIME string

The value must be a DATETIME literal.

FLOAT double, float
GEOGRAPHY string

The value is a geometry in either WKT or GeoJson format.

INTEGER int32, int64, uint32, enum
NUMERIC, BIGNUMERIC int32, int64, uint32, uint64, double, float, string

The encoding format is documented in the ZetaSQL GitHub repo.

STRING string, enum
TIME string

The value must be a TIME literal.

TIMESTAMP int64 (preferred), int32, uint32

The value is given in microseconds since the Unix epoch (1970-01-01).

Storage Write API quotas

For information about Storage Write API quotas and limits, see BigQuery Storage Write API requests.

Storage Write API pricing

For pricing, see Data ingestion pricing.

Supported regions

The preview phase of the Storage Write API supports the following locations.

Regional locations

Region description Region name
Americas
South Carolina us-east1
Europe
Belgium europe-west1
Asia Pacific
Sydney australia-southeast1
Tokyo asia-northeast1

Multi-regional locations

Multi-region description Multi-region name
Data centers within member states of the European Union EU
Data centers in the United States US