Perform batch and streaming using the BigQuery Storage Write API

The BigQuery Storage Write API is a unified data-ingestion API for BigQuery. It combines the functionality of streaming ingestion and batch loading into a single high-performance API. You can use the Storage Write API to stream records into BigQuery that become available for query as they are written, or to batch process an arbitrarily large number of records and commit them in a single atomic operation.

Advantages of using the Storage Write API

Exactly-once delivery semantics. The Storage Write API supports exactly-once semantics through the use of stream offsets. Unlike the insertAll method, the Storage Write API never writes two messages that have the same offset within a stream, if the client provides stream offsets when appending records.

Stream-level transactions. You can write data to a stream and commit the data as a single transaction. If the commit operation fails, you can safely retry the operation. For example, in a streaming pipeline, if a worker fails, another worker can pick up the task.

Transactions across streams. Multiple workers can create their own streams to process data independently. When all the workers have finished, you can commit all of the streams as a transaction.

Efficient protocol. The Storage Write API is more efficient than the older insertAll method because it uses gRPC streaming rather than REST over HTTP. The Storage Write API also supports binary formats in the form of protocol buffers, which are a more efficient wire format than JSON. Write requests are asynchronous with guaranteed ordering.

Schema update detection. If the underlying table schema changes while the client is streaming, then the Storage Write API notifies the client. The client can decide whether to reconnect using the updated schema, or continue to write to the existing connection.

Lower cost. The Storage Write API has a significantly lower cost than the older tabledata.insertAll streaming API. In addition, you can ingest up to 2 TB per month for free.

You can use the Storage Write API by calling the gRPC API directly or by using one of client libraries, which are available for Java, Python, and Go. In general, we recommend using a client library, because it provides a simpler programming interface and manages the underlying bidirectional streaming RPC for you.

Required permissions

To use the Storage Write API, you must have bigquery.tables.updateData permissions.

The following predefined Identity and Access Management (IAM) roles include bigquery.tables.updateData permissions:

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

For more information about IAM roles and permissions in BigQuery, see Predefined roles and permissions.

Overview of the Storage Write API

The core abstraction in the Storage Write API is a write stream. An application creates a stream and then writes rows of data to the stream. You can also create multiple streams for parallel write operations.

When you create a write stream, you specify a mode for the stream. The mode controls when the data written to stream becomes visible in BigQuery for reading.

  • Committed mode. Records are available for reading immediately as you write them to the stream. Use this mode for streaming workloads that need minimal read latency.

  • Pending mode. Records are buffered in a pending state until you commit the stream. Once a stream is committed, all of the data written to the stream is available for reading. The commit is an atomic operation. Use this mode for batch workloads, as an alternative to BigQuery load jobs.

  • Buffered mode. Records are buffered until you flush the stream. Flushing advances a cursor to a specified offset and makes all the records up to that offset visible. You can keep writing new data to the stream after you flush it. Use buffered mode if you need fine-grained control over when records are available for reading.

The Storage Write API also provides a default stream that appends records in committed mode without exactly-once guarantees. The default stream can potentially support a higher throughput than creating a dedicated stream. Also, the default stream is not subject to the quota limit on creating write streams. Unlike the other modes, you don't need to explicitly create the default stream. Any number of clients can write simultaneously to the default stream. Consider using the default stream if you are migrating from the legacy tabledata.insertAll method and don't need the additional guarantees offered by creating explicit streams and writing to them.

Committed mode

In committed mode, records are available for query as soon as the client receives acknowledgement from the back end. Committed mode provides exactly-once delivery within a stream through the use of record offsets. The application can specify the offset number for each record. The write operation is only performed if the offset value matches the next append offset. If no offset is provided, records are appended to the current end of the stream. In that case, if an append returns an error, retrying it could result in the record appearing more than once in the stream.

Pending mode

Pending mode supports stream-level transactions. A stream can only be committed once. Multiple workers can work on the same unit of data without causing the same data to be written twice. The commit is at atomic operation. If it fails, you can retry it. Pending mode is useful for bulk loading data in large batches.

One architectural pattern for using pending mode is to have a coordinator and multiple workers. The coordinator calls createWriteStream and passes it to a worker. Each worker calls append multiple times as needed, and then calls finalizeWriteStream. When all workers are finished, the coordinator calls batchCommitWriteStream. Because committing a stream is an atomic operation, if a worker fails, then another worker can pick up the same processing task and retry it.

Buffered mode

Buffered mode is an advanced mode that allows applications to control exactly when rows are committed and made available for querying. In buffered mode, you can stream data and flush the rows periodically. Rows that are not flushed can be abandoned without affecting the table.

Generally, most applications should prefer committed mode. Applications that are based on batch processing can use pending mode. Advanced applications that need finer-grained control over writes and subsequent flushes might benefit from buffered mode.

Write data using the Java client

The Java client library provides two writer objects:

  • StreamWriter: Accepts data in protocol buffer format.

  • JsonStreamWriter: Accepts data in JSON format and converts it to protocol buffers. For many applications, JSON data is easier to work with than protocol buffers. The JsonStreamWriter automatically converts the data on the client side to the Storage Write API's underlying binary format before sending it over the wire.

The programming model is similar for both writers. The main difference is how you format the payload.

Write data using the JsonStreamWriter

This section shows examples using the JsonStreamWriter in each of the supported modes.

Committed

In committed mode, records are available for query as soon as the client receives acknowledgement from the back end.

To use the JsonStreamWriter object in committed mode, perform the following steps:

  1. Create an instance of the BigQueryWriteClient object.

  2. Call BigQueryWriteClient.createWriteStream to create a WriteStream object. Specify the mode as WriteStream.Type.COMMITTED.

  3. Create a JsonStreamWriter instance. Provide a schema that is compatible with the table schema. You can construct a schema manually as a TableSchema object or get the schema from the BigQuery table.

  4. In a loop, call StreamWriter.append to write records to the each stream. Optionally, specify the offset of the record in the stream. If you provide the offset, then the Storage Write API never writes two messages that have the same offset. The append method is asynchronous. Ordering is guaranteed if you provide offsets.

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Java API reference documentation.

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteCommittedStream {

  public static void runWriteCommittedStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";

    writeCommittedStream(projectId, datasetName, tableName);
  }

  public static void writeCommittedStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {

    try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
      // Initialize a write stream for the specified table.
      // For more information on WriteStream.Type, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html
      WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
      TableName parentTable = TableName.of(projectId, datasetName, tableName);
      CreateWriteStreamRequest createWriteStreamRequest =
          CreateWriteStreamRequest.newBuilder()
              .setParent(parentTable.toString())
              .setWriteStream(stream)
              .build();
      WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

      // Use the JSON stream writer to send records in JSON format.
      // For more information about JsonStreamWriter, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
      try (JsonStreamWriter writer =
          JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
              .build()) {
        // Write two batches to the stream, each with 10 JSON records.
        for (int i = 0; i < 2; i++) {
          // Create a JSON object that is compatible with the table schema.
          JSONArray jsonArr = new JSONArray();
          for (int j = 0; j < 10; j++) {
            JSONObject record = new JSONObject();
            record.put("col1", String.format("record %03d-%03d", i, j));
            jsonArr.put(record);
          }

          // To detect duplicate records, pass the index as the record offset.
          // To disable deduplication, omit the offset or use WriteStream.Type.DEFAULT.
          ApiFuture<AppendRowsResponse> future = writer.append(jsonArr, /*offset=*/ i * 10);
          AppendRowsResponse response = future.get();
        }
      }
      System.out.println("Appended records successfully.");
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println("Failed to append records. \n" + e.toString());
    }
  }
}

Pending

In pending mode, records are buffered in a pending state until you commit the entire stream.

To use the JsonStreamWriter object in pending mode, perform the following steps:

  1. Create an instance of the BigQueryWriteClient object.

  2. Call BigQueryWriteClient.createWriteStream to create one or more WriteStream objects. Specify the mode as WriteStream.Type.PENDING.

  3. Create a JsonStreamWriter instance. Provide a schema that is compatible with the table schema. You can construct a schema manually as a TableSchema object or get the schema from the BigQuery table.

  4. In a loop, call StreamWriter.append to write records to the each stream. Optionally, specify the offset of the record in the stream. If you provide the offset, then the Storage Write API never writes two messages that have the same offset. The append method is asynchronous. Ordering is guaranteed if you provide offsets.

  5. When you finish writing to a stream, call BigQueryWriteClient.finalizeWriteStream to finalize the stream. After you call this method, you can't write any more records to that stream.

  6. When all streams are finalized, call BigQueryWriteClient.batchCommitWriteStreams with the list of streams. This method commits the data for those streams.

  7. Call BatchCommitWriteStreamsResponse.hasCommitTime on the returned BatchCommitWriteStreamsResponse object. If this method returns false, then the commit failed. Call the getStreamErrorsList method to get the list of errors for each stream that failed.

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Java API reference documentation.

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WritePendingStream {

  public static void runWritePendingStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";

    writePendingStream(projectId, datasetName, tableName);
  }

  public static void writePendingStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {
    try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
      // Initialize a write stream for the specified table.
      // For more information on WriteStream.Type, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html
      WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();
      TableName parentTable = TableName.of(projectId, datasetName, tableName);
      CreateWriteStreamRequest createWriteStreamRequest =
          CreateWriteStreamRequest.newBuilder()
              .setParent(parentTable.toString())
              .setWriteStream(stream)
              .build();
      WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

      // Use the JSON stream writer to send records in JSON format.
      // For more information about JsonStreamWriter, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
      try (JsonStreamWriter writer =
          JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
              .build()) {
        // Write two batches to the stream, each with 10 JSON records.
        for (int i = 0; i < 2; i++) {
          // Create a JSON object that is compatible with the table schema.
          JSONArray jsonArr = new JSONArray();
          for (int j = 0; j < 10; j++) {
            JSONObject record = new JSONObject();
            record.put("col1", String.format("batch-record %03d-%03d", i, j));
            jsonArr.put(record);
          }
          ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
          AppendRowsResponse response = future.get();
        }
        FinalizeWriteStreamResponse finalizeResponse =
            client.finalizeWriteStream(writeStream.getName());
        System.out.println("Rows written: " + finalizeResponse.getRowCount());
      }

      // Commit the streams.
      BatchCommitWriteStreamsRequest commitRequest =
          BatchCommitWriteStreamsRequest.newBuilder()
              .setParent(parentTable.toString())
              .addWriteStreams(writeStream.getName())
              .build();
      BatchCommitWriteStreamsResponse commitResponse =
          client.batchCommitWriteStreams(commitRequest);
      // If the response does not have a commit time, it means the commit operation failed.
      if (commitResponse.hasCommitTime() == false) {
        for (StorageError err : commitResponse.getStreamErrorsList()) {
          System.out.println(err.getErrorMessage());
        }
        throw new RuntimeException("Error committing the streams");
      }
      System.out.println("Appended and committed records successfully.");
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println(e);
    }
  }
}

Buffered

In buffered mode, records are buffered until you flush them from the stream.

To use the JsonStreamWriter object in buffered mode, perform the following steps:

  1. Create an instance of the BigQueryWriteClient object.

  2. Call BigQueryWriteClient.createWriteStream to create one or more WriteStream objects. Specify the mode as WriteStream.Type.BUFFERED.

  3. Create a JsonStreamWriter instance. Provide a schema that is compatible with the table schema. You can construct a schema manually as a TableSchema object or get the schema from the BigQuery table.

  4. In a loop, call StreamWriter.append to write records to the each stream. Optionally, specify the offset of the record in the stream. If you provide the offset, then the Storage Write API never writes two messages that have the same offset. The append method is asynchronous. Ordering is guaranteed if you provide offsets.

  5. To make data available for reading, call JsonStreamWriter.flushRows and specify an ending offset. The method flushes all rows up to and including the specified offset.

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Java API reference documentation.

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteBufferedStream {

  public static void runWriteBufferedStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";

    writeBufferedStream(projectId, datasetName, tableName);
  }

  public static void writeBufferedStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {
    try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
      // Initialize a write stream for the specified table.
      // For more information on WriteStream.Type, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html
      WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build();
      TableName parentTable = TableName.of(projectId, datasetName, tableName);
      CreateWriteStreamRequest createWriteStreamRequest =
          CreateWriteStreamRequest.newBuilder()
              .setParent(parentTable.toString())
              .setWriteStream(stream)
              .build();
      WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

      // Use the JSON stream writer to send records in JSON format.
      // For more information about JsonStreamWriter, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
      try (JsonStreamWriter writer =
          JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
              .build()) {
        // Write two batches to the stream, each with 10 JSON records.
        for (int i = 0; i < 2; i++) {
          JSONArray jsonArr = new JSONArray();
          for (int j = 0; j < 10; j++) {
            // Create a JSON object that is compatible with the table schema.
            JSONObject record = new JSONObject();
            record.put("col1", String.format("buffered-record %03d", i));
            jsonArr.put(record);
          }
          ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
          AppendRowsResponse response = future.get();
        }

        // Flush the buffer.
        FlushRowsRequest flushRowsRequest =
            FlushRowsRequest.newBuilder()
                .setWriteStream(writeStream.getName())
                .setOffset(Int64Value.of(10 * 2 - 1)) // Advance the cursor to the latest record.
                .build();
        FlushRowsResponse flushRowsResponse = client.flushRows(flushRowsRequest);
        // You can continue to write to the stream after flushing the buffer.
      }
      System.out.println("Appended and committed records successfully.");
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println(e);
    }
  }
}

Default stream

The default stream appends records in committed mode without exactly-once guarantees.

The following code shows how to use the default stream. The code is similar to writing in committed mode, but you don't explicitly create a write stream.

  1. Create a JsonStreamWriter instance. Provide a schema that is compatible with the table schema. You can construct a schema manually as a TableSchema object or get the schema from the BigQuery table.

  2. In a loop, call StreamWriter.append to write records to the each stream. You don't provide a stream offset, and the response doesn't include any offset information.

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Java API reference documentation.

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteToDefaultStream {

  public static void runWriteToDefaultStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    writeToDefaultStream(projectId, datasetName, tableName);
  }

  public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {
    BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
    Table table = bigquery.getTable(datasetName, tableName);
    TableName parentTable = TableName.of(projectId, datasetName, tableName);
    Schema schema = table.getDefinition().getSchema();
    TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);

    // Use the JSON stream writer to send records in JSON format.
    // For more information about JsonStreamWriter, see:
    // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
    try (JsonStreamWriter writer =
        JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
      // Write two batches to the stream, each with 10 JSON records.
      for (int i = 0; i < 2; i++) {
        // Create a JSON object that is compatible with the table schema.
        JSONArray jsonArr = new JSONArray();
        for (int j = 0; j < 10; j++) {
          JSONObject record = new JSONObject();
          record.put("test_string", String.format("record %03d-%03d", i, j));
          jsonArr.put(record);
        }
        ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
        AppendRowsResponse response = future.get();
      }
      System.out.println("Appended records successfully.");
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println("Failed to append records. \n" + e.toString());
    }
  }
}

Error handling

If the append method throws a StatusRuntimeException exception, then you should check the status code in the exception. The status code might be any of the following:

  • ALREADY_EXISTS: The row was already written. This error can happen when you provide stream offsets. It indicates that a duplicate record was detected. You can safely ignore this error.
  • INTERNAL, CANCELLED, or ABORTED: The operation could not be completed. You can safely retry the operation.
  • INVALID_ARGUMENT: Invalid argument. This error is an application error.
  • NOT_FOUND: The stream or table was not found.
  • OUT_OF_RANGE. The offset is beyond the current write offset. This error can happen if you provide stream offsets and a previous write operation failed. In that case, you can retry from the last successful write. This error can also happen if the application sets the wrong offset value.
  • PERMISSION_DENIED. The application does not have permission to write to this table.

If you receive an error that's not listed above, then try to open a new connection by closing the writer object and creating a new instance.

Working with schemas

When you create the writer object, you specify a schema. This schema must be compatible with the BigQuery table schema:

  • It must include all of the required fields in table schema.
  • It cannot include any extra fields that aren't in the table schema.
  • Data types must be compatible. For more information, see Data type conversions on this page.

When you use the JsonStreamWriter object, you provide the schema as a TableSchema object.

When you use the StreamWriter object, you provide the schema as a ProtoSchema object. The ProtoSchema object is a thin wrapper for a google.protobuf.DescriptorProto object that describes the schema. You can use theProtoSchemaConverter helper class to convert a DescriptorProto object into a self-contained ProtoSchema object. Also, you can use the SchemaCompatibility class to check whether the ProtoSchema object is compatible with the BigQuery table schema.

It's possible for a table schema to change after you start a write session. (For information about table schema changes, see Modifying table schemas.) If BigQuery detects a change in the schema, then the AppendRows response contains the updated schema. To check whether the schema was updated, call the AppendRowsResponse.hasUpdatedSchema method. For now, if you want to write message with updated fields, you will need to create a new JsonWriter or StreamWriter. Be sure to close the previous writer properly so that all inflight requests can be sent.

Write data using the Python client

The Python client is a lower-level client that wraps the gRPC API. To use this client, you must send the data as protocol buffers.

The following code example writes records in pending mode. This example shows a simple record with two fields. For a longer example that shows how to send different data types, including STRUCT types, see the append_rows_proto2 sample on GitHub.

When you define your protocol buffer messages, references to external messages are not allowed. Any message definitions must be nested within the root message representing row data. For an example, see sample_data.proto.

Python

Before trying this sample, follow the Python setup instructions in the BigQuery quickstart using client libraries. For more information, see the BigQuery Python API reference documentation.

"""
This code sample demonstrates how to write records in pending mode
using the low-level generated client for Python.
"""

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2

# If you update the customer_record.proto protocol buffer definition, run:
#
#   protoc --python_out=. customer_record.proto
#
# from the samples/snippets directory to generate the customer_record_pb2.py module.
from . import customer_record_pb2


def create_row_data(row_num: int, name: str):
    row = customer_record_pb2.CustomerRecord()
    row.row_num = row_num
    row.customer_name = name
    return row.SerializeToString()


def append_rows_pending(project_id: str, dataset_id: str, table_id: str):

    """Create a write stream, write some sample data, and commit the stream."""
    write_client = bigquery_storage_v1.BigQueryWriteClient()
    parent = write_client.table_path(project_id, dataset_id, table_id)
    write_stream = types.WriteStream()

    # When creating the stream, choose the type. Use the PENDING type to wait
    # until the stream is committed before it is visible. See:
    # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
    write_stream.type_ = types.WriteStream.Type.PENDING
    write_stream = write_client.create_write_stream(
        parent=parent, write_stream=write_stream
    )
    stream_name = write_stream.name

    # Create a template with fields needed for the first request.
    request_template = types.AppendRowsRequest()

    # The initial request must contain the stream name.
    request_template.write_stream = stream_name

    # So that BigQuery knows how to parse the serialized_rows, generate a
    # protocol buffer representation of your message descriptor.
    proto_schema = types.ProtoSchema()
    proto_descriptor = descriptor_pb2.DescriptorProto()
    customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema.proto_descriptor = proto_descriptor
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.writer_schema = proto_schema
    request_template.proto_rows = proto_data

    # Some stream types support an unbounded number of requests. Construct an
    # AppendRowsStream to send an arbitrary number of requests to a stream.
    append_rows_stream = writer.AppendRowsStream(write_client, request_template)

    # Create a batch of row data by appending proto2 serialized bytes to the
    # serialized_rows repeated field.
    proto_rows = types.ProtoRows()
    proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
    proto_rows.serialized_rows.append(create_row_data(2, "Bob"))

    # Set an offset to allow resuming this stream if the connection breaks.
    # Keep track of which requests the server has acknowledged and resume the
    # stream at the first non-acknowledged message. If the server has already
    # processed a message with that offset, it will return an ALREADY_EXISTS
    # error, which can be safely ignored.
    #
    # The first request must always have an offset of 0.
    request = types.AppendRowsRequest()
    request.offset = 0
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.rows = proto_rows
    request.proto_rows = proto_data

    response_future_1 = append_rows_stream.send(request)

    # Send another batch.
    proto_rows = types.ProtoRows()
    proto_rows.serialized_rows.append(create_row_data(3, "Charles"))

    # Since this is the second request, you only need to include the row data.
    # The name of the stream and protocol buffers DESCRIPTOR is only needed in
    # the first request.
    request = types.AppendRowsRequest()
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.rows = proto_rows
    request.proto_rows = proto_data

    # Offset must equal the number of rows that were previously sent.
    request.offset = 2

    response_future_2 = append_rows_stream.send(request)

    print(response_future_1.result())
    print(response_future_2.result())

    # Shutdown background threads and close the streaming connection.
    append_rows_stream.close()

    # A PENDING type stream must be "finalized" before being committed. No new
    # records can be written to the stream after this method has been called.
    write_client.finalize_write_stream(name=write_stream.name)

    # Commit the stream you created earlier.
    batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
    batch_commit_write_streams_request.parent = parent
    batch_commit_write_streams_request.write_streams = [write_stream.name]
    write_client.batch_commit_write_streams(batch_commit_write_streams_request)

    print(f"Writes to stream: '{write_stream.name}' have been committed.")

Use the gRPC API directly

Although we recommend using the client library, you can also use the Storage Write API by calling the gRPC API directly, as follows:

  1. Call CreateWriteStream to create a write stream. Specify the name of the parent table and the mode (either PENDING or COMMITTED).

  2. In a loop, call AppendRows to append one or more records. Format each record as a protocol buffer serialized to a byte sequence. Optionally, include the write offset. The first call to AppendRows must contain the schema of the stream, specified as a DescriptorProto object.

  3. In pending mode, call FinalizeWriteStream for each stream. After you call this method, you cannot write any more rows to the stream. This method is not required in committed mode.

  4. In pending mode, after you finalize the streams, call BatchCommitWriteStreams to commit the streams. After you call this method, the data becomes available for reading. This method is not required in committed mode.

Data type conversions

The following table shows the supported protocol buffer types for each BigQuery data type:

BigQuery data type Supported protocol buffer types
BOOL bool, int32, int64, uint32, uint64
BYTES bytes
DATE int32 (preferred), int64

The value is the number of days since the Unix epoch (1970-01-01). The valid range is `-719162` (0001-01-01) to `2932896` (9999-12-31).

DATETIME string

The value must be a DATETIME literal.

FLOAT double, float
GEOGRAPHY string

The value is a geometry in either WKT or GeoJson format.

INTEGER int32, int64, uint32, enum
NUMERIC, BIGNUMERIC int32, int64, uint32, uint64, double, float, string

The encoding format is documented in the ZetaSQL GitHub repo.

STRING string, enum
TIME string

The value must be a TIME literal.

TIMESTAMP int64 (preferred), int32, uint32

The value is given in microseconds since the Unix epoch (1970-01-01).

Storage Write API quotas

For information about Storage Write API quotas and limits, see BigQuery Storage Write API requests.

Storage Write API pricing

For pricing, see Data ingestion pricing.