BigQuery Storage API Client Libraries

This page shows how to get started with the Cloud Client Libraries for the BigQuery Storage API. Read more about the client libraries for Cloud APIs, including the older Google APIs Client Libraries, in Client Libraries Explained.

Installing the client library

Go

For more information, see Setting Up a Go Development Environment.

go get -u cloud.google.com/go/bigquery

Java

For more information, see Setting Up a Java Development Environment.

Maven을 사용하는 경우 pom.xml 파일에 다음을 추가합니다. BOM에 대한 자세한 내용은 Google Cloud Platform 라이브러리 BOM을 참조하세요.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>19.0.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigquerystorage</artifactId>
  </dependency>

Gradle을 사용하는 경우 종속 항목에 다음을 추가합니다.

implementation platform('com.google.cloud:libraries-bom:19.0.0')

compile 'com.google.cloud:google-cloud-bigquerystorage'

SBT를 사용하는 경우 종속 항목에 다음을 추가합니다.

libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "1.15.0"

IntelliJ 또는 Eclipse를 사용하는 경우 다음 IDE 플러그인을 사용하여 클라이언트 라이브러리를 프로젝트에 추가할 수 있습니다.

이 플러그인은 서비스 계정의 키 관리와 같은 추가 기능을 제공합니다. 자세한 내용은 각 플러그인의 문서를 참조하세요.

Python

For more information, see Setting Up a Python Development Environment.

pip install --upgrade google-cloud-bigquery-storage

Setting up authentication

To run the client library, you must first set up authentication by creating a service account and setting an environment variable. Complete the following steps to set up authentication. For other ways to authenticate, see the GCP authentication documentation.

Cloud Console

서비스 계정을 만듭니다.

  1. Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

    서비스 계정 만들기로 이동
  2. 프로젝트를 선택합니다.
  3. 서비스 계정 이름 필드에 이름을 입력합니다. Cloud Console은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

    서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

  4. 만들기를 클릭합니다.
  5. 역할 선택 필드를 클릭합니다.

    빠른 액세스에서 기본을 클릭한 후 소유자를 클릭합니다.

  6. 계속을 클릭합니다.
  7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

    브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

서비스 계정 키 만들기

  1. Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
  2. 를 클릭합니다.
  3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
  4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
  5. 닫기를 클릭합니다.

명령줄

로컬 머신 또는 Cloud Shell에서 Cloud SDK를 사용하여 다음 명령어를 실행할 수 있습니다.

  1. 서비스 계정을 만듭니다. NAME을 서비스 계정 이름으로 바꿉니다.

    gcloud iam service-accounts create NAME
  2. 서비스 계정에 권한을 부여합니다. PROJECT_ID를 프로젝트 ID로 바꿉니다.

    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:NAME@PROJECT_ID.iam.gserviceaccount.com" --role="roles/owner"
  3. 키 파일을 생성합니다. FILE_NAME을 키 파일 이름으로 바꿉니다.

    gcloud iam service-accounts keys create FILE_NAME.json --iam-account=NAME@PROJECT_ID.iam.gserviceaccount.com

GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 설정하여 애플리케이션 코드에 사용자 인증 정보를 제공합니다. [PATH]를 서비스 계정 키가 포함된 JSON 파일의 파일 경로로 바꿉니다. 이 변수는 현재 셸 세션에만 적용되므로 새 세션을 연 경우 변수를 다시 설정합니다.

Linux 또는 macOS

export GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

예를 들면 다음과 같습니다.

export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/my-key.json"

Windows

PowerShell:

$env:GOOGLE_APPLICATION_CREDENTIALS="[PATH]"

예를 들면 다음과 같습니다.

$env:GOOGLE_APPLICATION_CREDENTIALS="C:\Users\username\Downloads\my-key.json"

명령어 프롬프트:

set GOOGLE_APPLICATION_CREDENTIALS=[PATH]

Using the client library

The following example shows basic interactions with the BigQuery Storage API.

Go

To use this sample, prepare your machine for Go development, and complete the BigQuery Storage API quickstart. For more information, see the BigQuery Storage API Go API reference documentation.


// The bigquery_storage_quickstart application demonstrates usage of the
// BigQuery Storage read API.  It demonstrates API features such as column
// projection (limiting the output to a subset of a table's columns),
// column filtering (using simple predicates to filter records on the server
// side), establishing the snapshot time (reading data from the table at a
// specific point in time), and decoding Avro row blocks using the third party
// "github.com/linkedin/goavro" library.
package main

import (
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"sort"
	"sync"
	"time"

	bqStorage "cloud.google.com/go/bigquery/storage/apiv1"
	"github.com/golang/protobuf/ptypes"
	gax "github.com/googleapis/gax-go/v2"
	goavro "github.com/linkedin/goavro/v2"
	bqStoragepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
	"google.golang.org/grpc"
)

// rpcOpts is used to configure the underlying gRPC client to accept large
// messages.  The BigQuery Storage API may send message blocks up to 10MB
// in size.
var rpcOpts = gax.WithGRPCOptions(
	grpc.MaxCallRecvMsgSize(1024 * 1024 * 11),
)

// Command-line flags.
var (
	projectID = flag.String("project_id", "",
		"Cloud Project ID, used for session creation.")
	snapshotMillis = flag.Int64("snapshot_millis", 0,
		"Snapshot time to use for reads, represented in epoch milliseconds format.  Default behavior reads current data.")
)

func main() {
	flag.Parse()
	ctx := context.Background()
	bqReadClient, err := bqStorage.NewBigQueryReadClient(ctx)
	if err != nil {
		log.Fatalf("NewBigQueryStorageClient: %v", err)
	}
	defer bqReadClient.Close()

	// Verify we've been provided a parent project which will contain the read session.  The
	// session may exist in a different project than the table being read.
	if *projectID == "" {
		log.Fatalf("No parent project ID specified, please supply using the --project_id flag.")
	}

	// This example uses baby name data from the public datasets.
	srcProjectID := "bigquery-public-data"
	srcDatasetID := "usa_names"
	srcTableID := "usa_1910_current"
	readTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s",
		srcProjectID,
		srcDatasetID,
		srcTableID,
	)

	// We limit the output columns to a subset of those allowed in the table,
	// and set a simple filter to only report names from the state of
	// Washington (WA).
	tableReadOptions := &bqStoragepb.ReadSession_TableReadOptions{
		SelectedFields: []string{"name", "number", "state"},
		RowRestriction: `state = "WA"`,
	}

	createReadSessionRequest := &bqStoragepb.CreateReadSessionRequest{
		Parent: fmt.Sprintf("projects/%s", *projectID),
		ReadSession: &bqStoragepb.ReadSession{
			Table: readTable,
			// This API can also deliver data serialized in Apache Arrow format.
			// This example leverages Apache Avro.
			DataFormat:  bqStoragepb.DataFormat_AVRO,
			ReadOptions: tableReadOptions,
		},
		MaxStreamCount: 1,
	}

	// Set a snapshot time if it's been specified.
	if *snapshotMillis > 0 {
		ts, err := ptypes.TimestampProto(time.Unix(0, *snapshotMillis*1000))
		if err != nil {
			log.Fatalf("Invalid snapshot millis (%d): %v", *snapshotMillis, err)
		}
		createReadSessionRequest.ReadSession.TableModifiers = &bqStoragepb.ReadSession_TableModifiers{
			SnapshotTime: ts,
		}
	}

	// Create the session from the request.
	session, err := bqReadClient.CreateReadSession(ctx, createReadSessionRequest, rpcOpts)
	if err != nil {
		log.Fatalf("CreateReadSession: %v", err)
	}

	if len(session.GetStreams()) == 0 {
		log.Fatalf("no streams in session.  if this was a small query result, consider writing to output to a named table.")
	}

	// We'll use only a single stream for reading data from the table.  Because
	// of dynamic sharding, this will yield all the rows in the table. However,
	// if you wanted to fan out multiple readers you could do so by having a
	// increasing the MaxStreamCount.
	readStream := session.GetStreams()[0].Name

	ch := make(chan *bqStoragepb.AvroRows)

	// Use a waitgroup to coordinate the reading and decoding goroutines.
	var wg sync.WaitGroup

	// Start the reading in one goroutine.
	wg.Add(1)
	go func() {
		defer wg.Done()
		if err := processStream(ctx, bqReadClient, readStream, ch); err != nil {
			log.Fatalf("processStream failure: %v", err)
		}
		close(ch)
	}()

	// Start Avro processing and decoding in another goroutine.
	wg.Add(1)
	go func() {
		defer wg.Done()
		err := processAvro(ctx, session.GetAvroSchema().GetSchema(), ch)
		if err != nil {
			log.Fatalf("Error processing avro: %v", err)
		}
	}()

	// Wait until both the reading and decoding goroutines complete.
	wg.Wait()

}

// printDatum prints the decoded row datum.
func printDatum(d interface{}) {
	m, ok := d.(map[string]interface{})
	if !ok {
		log.Printf("failed type assertion: %v", d)
	}
	// Go's map implementation returns keys in a random ordering, so we sort
	// the keys before accessing.
	keys := make([]string, len(m))
	i := 0
	for k := range m {
		keys[i] = k
		i++
	}
	sort.Strings(keys)
	for _, key := range keys {
		fmt.Printf("%s: %-20v ", key, valueFromTypeMap(m[key]))
	}
	fmt.Println()
}

// valueFromTypeMap returns the first value/key in the type map.  This function
// is only suitable for simple schemas, as complex typing such as arrays and
// records necessitate a more robust implementation.  See the goavro library
// and the Avro specification for more information.
func valueFromTypeMap(field interface{}) interface{} {
	m, ok := field.(map[string]interface{})
	if !ok {
		return nil
	}
	for _, v := range m {
		// Return the first key encountered.
		return v
	}
	return nil
}

// processStream reads rows from a single storage Stream, and sends the Avro
// data blocks to a channel. This function will retry on transient stream
// failures and bookmark progress to avoid re-reading data that's already been
// successfully transmitted.
func processStream(ctx context.Context, client *bqStorage.BigQueryReadClient, st string, ch chan<- *bqStoragepb.AvroRows) error {
	var offset int64

	// Streams may be long-running.  Rather than using a global retry for the
	// stream, implement a retry that resets once progress is made.
	retryLimit := 3

	for {
		retries := 0
		// Send the initiating request to start streaming row blocks.
		rowStream, err := client.ReadRows(ctx, &bqStoragepb.ReadRowsRequest{
			ReadStream: st,
			Offset:     offset,
		}, rpcOpts)
		if err != nil {
			return fmt.Errorf("Couldn't invoke ReadRows: %v", err)
		}

		// Process the streamed responses.
		for {
			r, err := rowStream.Recv()
			if err == io.EOF {
				return nil
			}
			if err != nil {
				retries++
				if retries >= retryLimit {
					return fmt.Errorf("processStream retries exhausted: %v", err)
				}
				// break the inner loop, and try to recover by starting a new streaming
				// ReadRows call at the last known good offset.
				break
			}

			rc := r.GetRowCount()
			if rc > 0 {
				// Bookmark our progress in case of retries and send the rowblock on the channel.
				offset = offset + rc
				// We're making progress, reset retries.
				retries = 0
				ch <- r.GetAvroRows()
			}
		}
	}
}

// processAvro receives row blocks from a channel, and uses the provided Avro
// schema to decode the blocks into individual row messages for printing.  Will
// continue to run until the channel is closed or the provided context is
// cancelled.
func processAvro(ctx context.Context, schema string, ch <-chan *bqStoragepb.AvroRows) error {
	// Establish a decoder that can process blocks of messages using the
	// reference schema. All blocks share the same schema, so the decoder
	// can be long-lived.
	codec, err := goavro.NewCodec(schema)
	if err != nil {
		return fmt.Errorf("couldn't create codec: %v", err)
	}

	for {
		select {
		case <-ctx.Done():
			// Context was cancelled.  Stop.
			return nil
		case rows, ok := <-ch:
			if !ok {
				// Channel closed, no further avro messages.  Stop.
				return nil
			}
			undecoded := rows.GetSerializedBinaryRows()
			for len(undecoded) > 0 {
				datum, remainingBytes, err := codec.NativeFromBinary(undecoded)

				if err != nil {
					if err == io.EOF {
						break
					}
					return fmt.Errorf("decoding error with %d bytes remaining: %v", len(undecoded), err)
				}
				printDatum(datum)
				undecoded = remainingBytes
			}
		}
	}
}

Java

Before trying this sample, follow the Java setup instructions in Setting Up a Java Development Environment. For more information, see the BigQuery Storage API Java API reference documentation.


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

public class StorageArrowSample {

  /*
   * SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted
   * from the storage API using a generic datum decoder.
   */
  private static class SimpleRowReader implements AutoCloseable {

    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

    // Decoder object will be reused to avoid re-allocation and too much garbage collection.
    private final VectorSchemaRoot root;
    private final VectorLoader loader;

    public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
      Schema schema =
          MessageSerializer.deserializeSchema(
              new ReadChannel(
                  new ByteArrayReadableSeekableByteChannel(
                      arrowSchema.getSerializedSchema().toByteArray())));
      Preconditions.checkNotNull(schema);
      List<FieldVector> vectors = new ArrayList<>();
      for (Field field : schema.getFields()) {
        vectors.add(field.createVector(allocator));
      }
      root = new VectorSchemaRoot(vectors);
      loader = new VectorLoader(root);
    }

    /**
     * Sample method for processing Arrow data which only validates decoding.
     *
     * @param batch object returned from the ReadRowsResponse.
     */
    public void processRows(ArrowRecordBatch batch) throws IOException {
      org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
          MessageSerializer.deserializeRecordBatch(
              new ReadChannel(
                  new ByteArrayReadableSeekableByteChannel(
                      batch.getSerializedRecordBatch().toByteArray())),
              allocator);

      loader.load(deserializedBatch);
      // Release buffers from batch (they are still held in the vectors in root).
      deserializedBatch.close();
      System.out.println(root.contentToTSVString());
      // Release buffers from vectors in root.
      root.clear();
    }

    @Override
    public void close() {
      root.close();
      allocator.close();
    }
  }

  public static void main(String... args) throws Exception {
    // Sets your Google Cloud Platform project ID.
    // String projectId = "YOUR_PROJECT_ID";
    String projectId = args[0];
    Integer snapshotMillis = null;
    if (args.length > 1) {
      snapshotMillis = Integer.parseInt(args[1]);
    }

    try (BigQueryReadClient client = BigQueryReadClient.create()) {
      String parent = String.format("projects/%s", projectId);

      // This example uses baby name data from the public datasets.
      String srcTable =
          String.format(
              "projects/%s/datasets/%s/tables/%s",
              "bigquery-public-data", "usa_names", "usa_1910_current");

      // We specify the columns to be projected by adding them to the selected fields,
      // and set a simple filter to restrict which rows are transmitted.
      TableReadOptions options =
          TableReadOptions.newBuilder()
              .addSelectedFields("name")
              .addSelectedFields("number")
              .addSelectedFields("state")
              .setRowRestriction("state = \"WA\"")
              .build();

      // Start specifying the read session we want created.
      ReadSession.Builder sessionBuilder =
          ReadSession.newBuilder()
              .setTable(srcTable)
              // This API can also deliver data serialized in Apache Avro format.
              // This example leverages Apache Arrow.
              .setDataFormat(DataFormat.ARROW)
              .setReadOptions(options);

      // Optionally specify the snapshot time.  When unspecified, snapshot time is "now".
      if (snapshotMillis != null) {
        Timestamp t =
            Timestamp.newBuilder()
                .setSeconds(snapshotMillis / 1000)
                .setNanos((int) ((snapshotMillis % 1000) * 1000000))
                .build();
        TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
        sessionBuilder.setTableModifiers(modifiers);
      }

      // Begin building the session creation request.
      CreateReadSessionRequest.Builder builder =
          CreateReadSessionRequest.newBuilder()
              .setParent(parent)
              .setReadSession(sessionBuilder)
              .setMaxStreamCount(1);

      ReadSession session = client.createReadSession(builder.build());
      // Setup a simple reader and start a read session.
      try (SimpleRowReader reader = new SimpleRowReader(session.getArrowSchema())) {

        // Assert that there are streams available in the session.  An empty table may not have
        // data available.  If no sessions are available for an anonymous (cached) table, consider
        // writing results of a query to a named table rather than consuming cached results
        // directly.
        Preconditions.checkState(session.getStreamsCount() > 0);

        // Use the first stream to perform reading.
        String streamName = session.getStreams(0).getName();

        ReadRowsRequest readRowsRequest =
            ReadRowsRequest.newBuilder().setReadStream(streamName).build();

        // Process each block of rows as they arrive and decode using our simple row reader.
        ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
        for (ReadRowsResponse response : stream) {
          Preconditions.checkState(response.hasArrowRecordBatch());
          reader.processRows(response.getArrowRecordBatch());
        }
      }
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in Setting Up a Python Development Environment. For more information, see the BigQuery Storage API Python API reference documentation.

from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types

# TODO(developer): Set the project_id variable.
# project_id = 'your-project-id'
#
# The read session is created in this project. This project can be
# different from that which contains the table.

client = BigQueryReadClient()

# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
    "bigquery-public-data", "usa_names", "usa_1910_current"
)

requested_session = types.ReadSession()
requested_session.table = table
# This API can also deliver data serialized in Apache Arrow format.
# This example leverages Apache Avro.
requested_session.data_format = types.DataFormat.AVRO

# We limit the output columns to a subset of those allowed in the table,
# and set a simple filter to only report names from the state of
# Washington (WA).
requested_session.read_options.selected_fields = ["name", "number", "state"]
requested_session.read_options.row_restriction = 'state = "WA"'

# Set a snapshot time if it's been specified.
if snapshot_millis > 0:
    snapshot_time = types.Timestamp()
    snapshot_time.FromMilliseconds(snapshot_millis)
    requested_session.table_modifiers.snapshot_time = snapshot_time

parent = "projects/{}".format(project_id)
session = client.create_read_session(
    parent=parent,
    read_session=requested_session,
    # We'll use only a single stream for reading data from the table. However,
    # if you wanted to fan out multiple readers you could do so by having a
    # reader process each individual stream.
    max_stream_count=1,
)
reader = client.read_rows(session.streams[0].name)

# The read stream contains blocks of Avro-encoded bytes. The rows() method
# uses the fastavro library to parse these blocks as an iterable of Python
# dictionaries. Install fastavro with the following command:
#
# pip install google-cloud-bigquery-storage[fastavro]
rows = reader.rows(session)

# Do any local processing by iterating over the rows. The
# google-cloud-bigquery-storage client reconnects to the API after any
# transient network errors or timeouts.
names = set()
states = set()

for row in rows:
    names.add(row["name"])
    states.add(row["state"])

print("Got {} unique names in states: {}".format(len(names), states))

Additional resources

What's next?

For users of the pandas and the pandas-gbq integration to BigQuery, see the tutorial Downloading BigQuery data to pandas using the BigQuery Storage API for more information about leveraging the storage API.