Stream data using the Storage Write API
This document describes how to use the BigQuery Storage Write API to stream data into BigQuery.
In streaming scenarios, data arrives continuously and should be available for reads with minimal latency. When using the BigQuery Storage Write API for streaming workloads, consider what guarantees you need:
- If your application only needs at-least-once semantics, then use the default stream.
- If you need exactly-once semantics, then create one or more streams in committed type and use stream offsets to guarantee exactly-once writes.
In committed type, data written to the stream is available for query as soon as the server acknowledges the write request. The default stream also uses committed type, but does not provide exactly-once guarantees.
Use the default stream for at-least-once semantics
If your application can accept the possibility of duplicate records appearing in the destination table, then we recommend using the default stream for streaming scenarios.
The following code shows how to write data to the default stream:
Java
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
For more information, see the
BigQuery Java API
reference documentation.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.
Node.js
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.
Python
This example shows how to insert a record with two fields using the default stream:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
This code example depends on the compiled protocol module sample_data_pb2.py
. To create the compiled module, execute the
protoc --python_out=. sample_data.proto
command, where protoc
is the
protocol buffer compiler. The sample_data.proto
file defines the format
of the messages used in the Python example. To install the protoc
compiler, follow the instructions in Protocol Buffers - Google's data interchange format.
Here are the contents of the sample_data.proto
file:
message SampleData {
required string name = 1;
required int64 age = 2;
}
This script consumes the entities.json
file, which contains sample row data to be inserted into the BigQuery table:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Use multiplexing
You enable
multiplexing
at the stream writer level for default stream only. To enable multiplexing in
Java, call the setEnableConnectionPool
method when you construct a
StreamWriter
or JsonStreamWriter
object:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
To enable multiplexing in Go, see Connection Sharing (Multiplexing).
Use committed type for exactly-once semantics
If you need exactly-once write semantics, create a write stream in committed type. In committed type, records are available for query as soon as the client receives acknowledgement from the back end.
Committed type provides exactly-once delivery within a stream through the use of
record offsets. By using record offsets, the application specifies the next
append offset in each call to AppendRows
. The write operation is
only performed if the offset value matches the next append offset. For more
information, see
Manage stream offsets to achieve exactly-once semantics.
If you don't provide an offset, then records are appended to the current end of the stream. In that case, if an append request returns an error, retrying it could result in the record appearing more than once in the stream.
To use committed type, perform the following steps:
Java
- Call
CreateWriteStream
to create one or more streams in committed type. - For each stream, call
AppendRows
in a loop to write batches of records. - Call
FinalizeWriteStream
for each stream to release the stream. After you call this method, you cannot write any more rows to the stream. This step is optional in committed type, but helps to prevent exceeding the limit on active streams. For more information, see Limit the rate of stream creation.
Node.js
- Call
createWriteStreamFullResponse
to create one or more streams in committed type. - For each stream, call
appendRows
in a loop to write batches of records. - Call
finalize
for each stream to release the stream. After you call this method, you cannot write any more rows to the stream. This step is optional in committed type, but helps to prevent exceeding the limit on active streams. For more information, see Limit the rate of stream creation.
You cannot delete a stream explicitly. Streams follow the system-defined time to live (TTL):
- A committed stream has a TTL of three days if there is no traffic on the stream.
- A buffered stream by default has a TTL of seven days if there is no traffic on the stream.
The following code shows how to use committed type:
Java
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
For more information, see the
BigQuery Java API
reference documentation.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.
Node.js
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.