BigQuery Storage API - Libraries and Samples

Overview

During the initial Beta period for the Storage API, we're focusing on support for the Java, Python, and Go libraries. We'll be expanding coverage to include additional over time, as the API matures towards general availability.

Library Information

Java

The google-cloud-bigquerystorage java library is available via github, and includes installation instructions as well as source code.

Java documentation for the library is published here: bigquerystorage v1beta1

Python

See the bigquery_storage python library for more information about installation, source code and additional details.

Python reference documentation for the library can be found here: bigquery_storage

Go

See the cloud.google.com/go/bigquery/storage/apiv1beta1 Go library for more information. Additionally, the google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta1 library provides supplementary access to the protocol buffer types used by the API.

Apache Beam / Cloud Dataflow

Apache Beam and Cloud Dataflow users can now leverage the storage API via the existing BigQueryIO connector. See the Beam documentation for BigQueryIO for more information about the built-in IO transformer.

Sample Code

Below is an quickstart sample that reads from a managed bigquery table. Key aspects of the sample include projecting a subset of columns, and applying a simple predicate filter.

Go

Before trying this sample, follow the Go setup instructions in the BigQuery Quickstart Using Client Libraries . For more information, see the BigQuery Go API reference documentation .

// The bigquery_storage_quickstart application demonstrates usage of the
// BigQuery Storage read API.  It demonstrates API features such as column
// projection (limiting the output to a subset of a table's columns),
// column filtering (using simple predicates to filter records on the server
// side), establishing the snapshot time (reading data from the table at a
// specific point in time), and decoding Avro row blocks using the third party
// "github.com/linkedin/goavro" library.
package main

import (
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"sort"
	"sync"
	"time"

	bqStorage "cloud.google.com/go/bigquery/storage/apiv1beta1"
	"github.com/golang/protobuf/ptypes"
	gax "github.com/googleapis/gax-go"
	"github.com/linkedin/goavro"
	bqStoragepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta1"
	"google.golang.org/grpc"
)

// rpcOpts is used to configure the underlying gRPC client to accept large
// messages.  The BigQuery Storage API may send message blocks up to 10MB
// in size.
var rpcOpts = gax.WithGRPCOptions(
	grpc.MaxCallRecvMsgSize(1024 * 1024 * 11),
)

// Command-line flags.
var (
	projectID = flag.String("project_id", "",
		"Cloud Project ID, used for session creation.")
	snapshotMillis = flag.Int64("snapshot_millis", 0,
		"Snapshot time to use for reads, represented in epoch milliseconds format.  Default behavior reads current data.")
)

func main() {
	flag.Parse()
	ctx := context.Background()
	bqStorageClient, err := bqStorage.NewBigQueryStorageClient(ctx)
	if err != nil {
		log.Fatalf("NewBigQueryStorageClient: %v", err)
	}
	defer bqStorageClient.Close()

	// Verify we've been provided a parent project which will contain the read session.  The
	// session may exist in a different project than the table being read.
	if *projectID == "" {
		log.Fatalf("No parent project ID specified, please supply using the --project_id flag.")
	}

	// This example uses baby name data from the public datasets.
	readTable := &bqStoragepb.TableReference{
		ProjectId: "bigquery-public-data",
		DatasetId: "usa_names",
		TableId:   "usa_1910_current",
	}

	// We limit the output columns to a subset of those allowed in the table,
	// and set a simple filter to only report names from the state of
	// Washington (WA).
	tableReadOptions := &bqStoragepb.TableReadOptions{
		SelectedFields: []string{"name", "number", "state"},
		RowRestriction: `state = "WA"`,
	}

	readSessionRequest := &bqStoragepb.CreateReadSessionRequest{
		Parent:         fmt.Sprintf("projects/%s", *projectID),
		TableReference: readTable,
		ReadOptions:    tableReadOptions,
	}

	// Set a snapshot time if it's been specified.
	if *snapshotMillis > 0 {
		ts, err := ptypes.TimestampProto(time.Unix(0, *snapshotMillis*1000))
		if err != nil {
			log.Fatalf("Invalid snapshot millis (%d): %v", *snapshotMillis, err)
		}
		readSessionRequest.TableModifiers = &bqStoragepb.TableModifiers{
			SnapshotTime: ts,
		}
	}

	// Create the session from the request.
	session, err := bqStorageClient.CreateReadSession(ctx, readSessionRequest, rpcOpts)
	if err != nil {
		log.Fatalf("CreateReadSession: %v", err)
	}

	if len(session.GetStreams()) == 0 {
		log.Fatalf("no streams in session.  if this was a small query result, consider writing to output to a named table.")
	}

	// We'll use only a single stream for reading data from the table.  Because
	// of dynamic sharding, this will yield all the rows in the table. However,
	// if you wanted to fan out multiple readers you could do so by having a
	// reader process each individual stream.
	readStream := session.GetStreams()[0]

	ch := make(chan *bqStoragepb.AvroRows)

	// Use a waitgroup to coordinate the reading and decoding goroutines.
	var wg sync.WaitGroup

	// Start the reading in one goroutine.
	wg.Add(1)
	go func() {
		defer wg.Done()
		if err := processStream(ctx, bqStorageClient, readStream, ch); err != nil {
			log.Fatalf("processStream failure: %v", err)
		}
		close(ch)
	}()

	// Start Avro processing and decoding in another goroutine.
	wg.Add(1)
	go func() {
		defer wg.Done()
		err := processAvro(ctx, session.GetAvroSchema().GetSchema(), ch)
		if err != nil {
			log.Fatalf("Error processing avro: %v", err)
		}
	}()

	// Wait until both the reading and decoding goroutines complete.
	wg.Wait()

}

// printDatum prints the decoded row datum.
func printDatum(d interface{}) {
	m, ok := d.(map[string]interface{})
	if !ok {
		log.Printf("failed type assertion: %v", d)
	}
	// Go's map implementation returns keys in a random ordering, so we sort
	// the keys before accessing.
	keys := make([]string, len(m))
	for k := range m {
		keys = append(keys, k)
	}
	sort.Strings(keys)
	for _, key := range keys {
		fmt.Printf("%s: %-20v ", key, valueFromTypeMap(m[key]))
	}
	fmt.Println()
}

// valueFromTypeMap returns the first value/key in the type map.  This function
// is only suitable for simple schemas, as complex typing such as arrays and
// records necessitate a more robust implementation.  See the goavro library
// and the Avro specification for more information.
func valueFromTypeMap(field interface{}) interface{} {
	m, ok := field.(map[string]interface{})
	if !ok {
		return nil
	}
	for _, v := range m {
		// Return the first key encountered.
		return v
	}
	return nil
}

// processStream reads rows from a single storage Stream, and sends the Avro
// data blocks to a channel. This function will retry on transient stream
// failures and bookmark progress to avoid re-reading data that's already been
// successfully transmitted.
func processStream(ctx context.Context, client *bqStorage.BigQueryStorageClient, st *bqStoragepb.Stream, ch chan<- *bqStoragepb.AvroRows) error {
	var offset int64
	streamRetry := 3

	for {
		// Send the initiating request to start streaming row blocks.
		rowStream, err := client.ReadRows(ctx, &bqStoragepb.ReadRowsRequest{
			ReadPosition: &bqStoragepb.StreamPosition{
				Stream: st,
				Offset: offset,
			}}, rpcOpts)
		if err != nil {
			return fmt.Errorf("Couldn't invoke ReadRows: %v", err)
		}

		// Process the streamed responses.
		for {
			r, err := rowStream.Recv()
			if err == io.EOF {
				return nil
			}
			if err != nil {
				streamRetry--
				if streamRetry <= 0 {
					return fmt.Errorf("processStream retries exhausted: %v", err)
				}
			}

			rc := r.GetAvroRows().GetRowCount()
			if rc > 0 {
				// Bookmark our progress in case of retries and send the rowblock on the channel.
				offset = offset + rc
				ch <- r.GetAvroRows()
			}
		}
	}
}

// processAvro receives row blocks from a channel, and uses the provided Avro
// schema to decode the blocks into individual row messages for printing.  Will
// continue to run until the channel is closed or the provided context is
// cancelled.
func processAvro(ctx context.Context, schema string, ch <-chan *bqStoragepb.AvroRows) error {
	// Establish a decoder that can process blocks of messages using the
	// reference schema. All blocks share the same schema, so the decoder
	// can be long-lived.
	codec, err := goavro.NewCodec(schema)
	if err != nil {
		return fmt.Errorf("couldn't create codec: %v", err)
	}

	for {
		select {
		case <-ctx.Done():
			// Context was cancelled.  Stop.
			return nil
		case rows, ok := <-ch:
			if !ok {
				// Channel closed, no further avro messages.  Stop.
				return nil
			}
			undecoded := rows.GetSerializedBinaryRows()
			for len(undecoded) > 0 {
				datum, remainingBytes, err := codec.NativeFromBinary(undecoded)

				if err != nil {
					if err == io.EOF {
						break
					}
					return fmt.Errorf("decoding error with %d bytes remaining: %v", len(undecoded), err)
				}
				printDatum(datum)
				undecoded = remainingBytes
			}
		}
	}
}

Java

Before trying this sample, follow the Java setup instructions in the BigQuery Quickstart Using Client Libraries . For more information, see the BigQuery Java API reference documentation .

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroRows;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.DataFormat;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableModifiers;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableReference;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;


public class StorageSample {

  /*
   * SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted
   * from the storage API using a generic datum decoder.
   */
  private static class SimpleRowReader {

    private final DatumReader<GenericRecord> datumReader;

    // Decoder object will be reused to avoid re-allocation and too much garbage collection.
    private BinaryDecoder decoder = null;

    // GenericRecord object will be reused.
    private GenericRecord row = null;

    public SimpleRowReader(Schema schema) {
      Preconditions.checkNotNull(schema);
      datumReader = new GenericDatumReader<>(schema);
    }

    /**
     * Sample method for processing AVRO rows which only validates decoding.
     *
     * @param avroRows object returned from the ReadRowsResponse.
     */
    public void processRows(AvroRows avroRows) throws IOException {
      decoder = DecoderFactory.get()
          .binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), decoder);

      while (!decoder.isEnd()) {
        // Reusing object row
        row = datumReader.read(row, decoder);
        System.out.println(row.toString());
      }
    }
  }

  public static void main(String... args) throws Exception {
    // Sets your Google Cloud Platform project ID.
    // String projectId = "YOUR_PROJECT_ID";
    String projectId = args[0];
    Integer snapshotMillis = null;
    if (args.length > 1) {
      snapshotMillis = Integer.parseInt(args[1]);
    }

    try (BigQueryStorageClient client = BigQueryStorageClient.create()) {
      String parent = String.format("projects/%s", projectId);

      // This example uses baby name data from the public datasets.
      TableReference tableReference = TableReference.newBuilder()
          .setProjectId("bigquery-public-data")
          .setDatasetId("usa_names")
          .setTableId("usa_1910_current")
          .build();

      // We specify the columns to be projected by adding them to the selected fields,
      // and set a simple filter to restrict which rows are transmitted.
      TableReadOptions options = TableReadOptions.newBuilder()
          .addSelectedFields("name")
          .addSelectedFields("number")
          .addSelectedFields("state")
          .setRowRestriction("state = \"WA\"")
          .build();

      // Begin building the session request.
      CreateReadSessionRequest.Builder builder = CreateReadSessionRequest.newBuilder()
          .setParent(parent)
          .setTableReference(tableReference)
          .setReadOptions(options)
          .setRequestedStreams(1)
          .setFormat(DataFormat.AVRO);

      // Optionally specify the snapshot time.  When unspecified, snapshot time is "now".
      if (snapshotMillis != null) {
        Timestamp t = Timestamp.newBuilder()
            .setSeconds(snapshotMillis / 1000)
            .setNanos((int) ((snapshotMillis % 1000) * 1000000))
            .build();
        TableModifiers modifiers = TableModifiers.newBuilder()
            .setSnapshotTime(t)
            .build();
        builder.setTableModifiers(modifiers);
      }

      // Request the session creation.
      ReadSession session = client.createReadSession(builder.build());

      SimpleRowReader reader = new SimpleRowReader(
          new Schema.Parser().parse(session.getAvroSchema().getSchema()));

      // Assert that there are streams available in the session.  An empty table may not have
      // data available.  If no sessions are available for an anonymous (cached) table, consider
      // writing results of a query to a named table rather than consuming cached results directly.
      Preconditions.checkState(session.getStreamsCount() > 0);

      // Use the first stream to perform reading.
      StreamPosition readPosition = StreamPosition.newBuilder()
          .setStream(session.getStreams(0))
          .build();

      ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder()
          .setReadPosition(readPosition)
          .build();

      // Process each block of rows as they arrive and decode using our simple row reader.
      ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
      for (ReadRowsResponse response : stream) {
        Preconditions.checkState(response.hasAvroRows());
        reader.processRows(response.getAvroRows());
      }
      client.close();
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in the BigQuery Quickstart Using Client Libraries . For more information, see the BigQuery Python API reference documentation .

from google.cloud import bigquery_storage_v1beta1

# TODO(developer): Set the project_id variable.
# project_id = 'your-project-id'
#
# The read session is created in this project. This project can be
# different from that which contains the table.

client = bigquery_storage_v1beta1.BigQueryStorageClient()

# This example reads baby name data from the public datasets.
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"

# We limit the output columns to a subset of those allowed in the table,
# and set a simple filter to only report names from the state of
# Washington (WA).
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")
read_options.selected_fields.append("state")
read_options.row_restriction = 'state = "WA"'

# Set a snapshot time if it's been specified.
modifiers = None
if snapshot_millis > 0:
    modifiers = bigquery_storage_v1beta1.types.TableModifiers()
    modifiers.snapshot_time.FromMilliseconds(snapshot_millis)

parent = "projects/{}".format(project_id)
session = client.create_read_session(
    table_ref, parent, table_modifiers=modifiers, read_options=read_options
)  # API request.

# We'll use only a single stream for reading data from the table. Because
# of dynamic sharding, this will yield all the rows in the table. However,
# if you wanted to fan out multiple readers you could do so by having a
# reader process each individual stream.
reader = client.read_rows(
    bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)

# The read stream contains blocks of Avro-encoded bytes. The rows() method
# uses the fastavro library to parse these blocks as an interable of Python
# dictionaries. Install fastavro with the following command:
#
# pip install google-cloud-bigquery-storage[fastavro]
rows = reader.rows(session)

# Do any local processing by iterating over the rows. The
# google-cloud-bigquery-storage client reconnects to the API after any
# transient network errors or timeouts.
names = set()
states = set()

for row in rows:
    names.add(row["name"])
    states.add(row["state"])

print("Got {} unique names in states: {}".format(len(names), states))

Additional Resources

For users of the pandas and the pandas-gbq integration to BigQuery, see the tutorial Downloading BigQuery data to pandas using the BigQuery Storage API for more information about leveraging the storage API.

Support Information

The BigQuery Storage API is currently a beta offering. The API is ready for use, but there are certain areas of the API that are not yet fully matured or lacking intended functionality.

If you encounter issues, please see the BigQuery support page for assistance.

Was this page helpful? Let us know how we did:

Send feedback about...

Need help? Visit our support page.