Fazer o download dos dados da tabela usando o formato de dados Avro e desserializar dados em objetos de linha.
Páginas de documentação que incluem esta amostra de código
Para visualizar o exemplo de código usado em contexto, consulte a seguinte documentação:
Exemplo de código
C++
Antes de testar essa amostra, siga as instruções de configuração para C++ no Guia de início rápido do BigQuery: como usar bibliotecas de cliente.
#include "google/cloud/bigquery/bigquery_read_client.gcpcxx.pb.h"
#include <iostream>
#include <stdexcept>
namespace {
void ProcessRowsInAvroFormat(
::google::cloud::bigquery::storage::v1::AvroSchema const&,
::google::cloud::bigquery::storage::v1::AvroRows const&) {
// Code to deserialize avro rows should be added here.
}
} // namespace
int main(int argc, char* argv[]) try {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <project-id> <table-name>\n";
return 1;
}
// project_name should be in the format "projects/<your-gcp-project>"
std::string const project_name = "projects/" + std::string(argv[1]);
// table_name should be in the format:
// "projects/<project-table-resides-in>/datasets/<dataset-table_resides-in>/tables/<table
// name>" The project values in project_name and table_name do not have to be
// identical.
std::string const table_name = argv[2];
// Create a namespace alias to make the code easier to read.
namespace bigquery = google::cloud::bigquery;
constexpr int kMaxReadStreams = 1;
// Create the ReadSession.
auto client =
bigquery::BigQueryReadClient(bigquery::MakeBigQueryReadConnection());
::google::cloud::bigquery::storage::v1::ReadSession read_session;
read_session.set_data_format(
google::cloud::bigquery::storage::v1::DataFormat::AVRO);
read_session.set_table(table_name);
auto session =
client.CreateReadSession(project_name, read_session, kMaxReadStreams);
if (!session) throw std::runtime_error(session.status().message());
// Read rows from the ReadSession.
constexpr int kRowOffset = 0;
auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset);
std::int64_t num_rows = 0;
for (auto const& row : read_rows) {
if (row.ok()) {
num_rows += row->row_count();
ProcessRowsInAvroFormat(session->avro_schema(), row->avro_rows());
}
}
std::cout << num_rows << " rows read from table: " << table_name << "\n";
return 0;
} catch (std::exception const& ex) {
std::cerr << "Standard exception raised: " << ex.what() << "\n";
return 1;
}
Go
Antes de testar essa amostra, siga as instruções de configuração para Go no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Go.
// The bigquery_storage_quickstart application demonstrates usage of the
// BigQuery Storage read API. It demonstrates API features such as column
// projection (limiting the output to a subset of a table's columns),
// column filtering (using simple predicates to filter records on the server
// side), establishing the snapshot time (reading data from the table at a
// specific point in time), and decoding Avro row blocks using the third party
// "github.com/linkedin/goavro" library.
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"sort"
"sync"
"time"
bqStorage "cloud.google.com/go/bigquery/storage/apiv1"
"github.com/golang/protobuf/ptypes"
gax "github.com/googleapis/gax-go/v2"
goavro "github.com/linkedin/goavro/v2"
bqStoragepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc"
)
// rpcOpts is used to configure the underlying gRPC client to accept large
// messages. The BigQuery Storage API may send message blocks up to 10MB
// in size.
var rpcOpts = gax.WithGRPCOptions(
grpc.MaxCallRecvMsgSize(1024 * 1024 * 11),
)
// Command-line flags.
var (
projectID = flag.String("project_id", "",
"Cloud Project ID, used for session creation.")
snapshotMillis = flag.Int64("snapshot_millis", 0,
"Snapshot time to use for reads, represented in epoch milliseconds format. Default behavior reads current data.")
)
func main() {
flag.Parse()
ctx := context.Background()
bqReadClient, err := bqStorage.NewBigQueryReadClient(ctx)
if err != nil {
log.Fatalf("NewBigQueryStorageClient: %v", err)
}
defer bqReadClient.Close()
// Verify we've been provided a parent project which will contain the read session. The
// session may exist in a different project than the table being read.
if *projectID == "" {
log.Fatalf("No parent project ID specified, please supply using the --project_id flag.")
}
// This example uses baby name data from the public datasets.
srcProjectID := "bigquery-public-data"
srcDatasetID := "usa_names"
srcTableID := "usa_1910_current"
readTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s",
srcProjectID,
srcDatasetID,
srcTableID,
)
// We limit the output columns to a subset of those allowed in the table,
// and set a simple filter to only report names from the state of
// Washington (WA).
tableReadOptions := &bqStoragepb.ReadSession_TableReadOptions{
SelectedFields: []string{"name", "number", "state"},
RowRestriction: `state = "WA"`,
}
createReadSessionRequest := &bqStoragepb.CreateReadSessionRequest{
Parent: fmt.Sprintf("projects/%s", *projectID),
ReadSession: &bqStoragepb.ReadSession{
Table: readTable,
// This API can also deliver data serialized in Apache Arrow format.
// This example leverages Apache Avro.
DataFormat: bqStoragepb.DataFormat_AVRO,
ReadOptions: tableReadOptions,
},
MaxStreamCount: 1,
}
// Set a snapshot time if it's been specified.
if *snapshotMillis > 0 {
ts, err := ptypes.TimestampProto(time.Unix(0, *snapshotMillis*1000))
if err != nil {
log.Fatalf("Invalid snapshot millis (%d): %v", *snapshotMillis, err)
}
createReadSessionRequest.ReadSession.TableModifiers = &bqStoragepb.ReadSession_TableModifiers{
SnapshotTime: ts,
}
}
// Create the session from the request.
session, err := bqReadClient.CreateReadSession(ctx, createReadSessionRequest, rpcOpts)
if err != nil {
log.Fatalf("CreateReadSession: %v", err)
}
fmt.Printf("Read session: %s\n", session.GetName())
if len(session.GetStreams()) == 0 {
log.Fatalf("no streams in session. if this was a small query result, consider writing to output to a named table.")
}
// We'll use only a single stream for reading data from the table. Because
// of dynamic sharding, this will yield all the rows in the table. However,
// if you wanted to fan out multiple readers you could do so by having a
// increasing the MaxStreamCount.
readStream := session.GetStreams()[0].Name
ch := make(chan *bqStoragepb.AvroRows)
// Use a waitgroup to coordinate the reading and decoding goroutines.
var wg sync.WaitGroup
// Start the reading in one goroutine.
wg.Add(1)
go func() {
defer wg.Done()
if err := processStream(ctx, bqReadClient, readStream, ch); err != nil {
log.Fatalf("processStream failure: %v", err)
}
close(ch)
}()
// Start Avro processing and decoding in another goroutine.
wg.Add(1)
go func() {
defer wg.Done()
err := processAvro(ctx, session.GetAvroSchema().GetSchema(), ch)
if err != nil {
log.Fatalf("Error processing avro: %v", err)
}
}()
// Wait until both the reading and decoding goroutines complete.
wg.Wait()
}
// printDatum prints the decoded row datum.
func printDatum(d interface{}) {
m, ok := d.(map[string]interface{})
if !ok {
log.Printf("failed type assertion: %v", d)
}
// Go's map implementation returns keys in a random ordering, so we sort
// the keys before accessing.
keys := make([]string, len(m))
i := 0
for k := range m {
keys[i] = k
i++
}
sort.Strings(keys)
for _, key := range keys {
fmt.Printf("%s: %-20v ", key, valueFromTypeMap(m[key]))
}
fmt.Println()
}
// valueFromTypeMap returns the first value/key in the type map. This function
// is only suitable for simple schemas, as complex typing such as arrays and
// records necessitate a more robust implementation. See the goavro library
// and the Avro specification for more information.
func valueFromTypeMap(field interface{}) interface{} {
m, ok := field.(map[string]interface{})
if !ok {
return nil
}
for _, v := range m {
// Return the first key encountered.
return v
}
return nil
}
// processStream reads rows from a single storage Stream, and sends the Avro
// data blocks to a channel. This function will retry on transient stream
// failures and bookmark progress to avoid re-reading data that's already been
// successfully transmitted.
func processStream(ctx context.Context, client *bqStorage.BigQueryReadClient, st string, ch chan<- *bqStoragepb.AvroRows) error {
var offset int64
// Streams may be long-running. Rather than using a global retry for the
// stream, implement a retry that resets once progress is made.
retryLimit := 3
for {
retries := 0
// Send the initiating request to start streaming row blocks.
rowStream, err := client.ReadRows(ctx, &bqStoragepb.ReadRowsRequest{
ReadStream: st,
Offset: offset,
}, rpcOpts)
if err != nil {
return fmt.Errorf("Couldn't invoke ReadRows: %v", err)
}
// Process the streamed responses.
for {
r, err := rowStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
retries++
if retries >= retryLimit {
return fmt.Errorf("processStream retries exhausted: %v", err)
}
// break the inner loop, and try to recover by starting a new streaming
// ReadRows call at the last known good offset.
break
}
rc := r.GetRowCount()
if rc > 0 {
// Bookmark our progress in case of retries and send the rowblock on the channel.
offset = offset + rc
// We're making progress, reset retries.
retries = 0
ch <- r.GetAvroRows()
}
}
}
}
// processAvro receives row blocks from a channel, and uses the provided Avro
// schema to decode the blocks into individual row messages for printing. Will
// continue to run until the channel is closed or the provided context is
// cancelled.
func processAvro(ctx context.Context, schema string, ch <-chan *bqStoragepb.AvroRows) error {
// Establish a decoder that can process blocks of messages using the
// reference schema. All blocks share the same schema, so the decoder
// can be long-lived.
codec, err := goavro.NewCodec(schema)
if err != nil {
return fmt.Errorf("couldn't create codec: %v", err)
}
for {
select {
case <-ctx.Done():
// Context was cancelled. Stop.
return nil
case rows, ok := <-ch:
if !ok {
// Channel closed, no further avro messages. Stop.
return nil
}
undecoded := rows.GetSerializedBinaryRows()
for len(undecoded) > 0 {
datum, remainingBytes, err := codec.NativeFromBinary(undecoded)
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("decoding error with %d bytes remaining: %v", len(undecoded), err)
}
printDatum(datum)
undecoded = remainingBytes
}
}
}
}
Python
Antes de testar essa amostra, siga as instruções de configuração para Python no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Python.
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types
# TODO(developer): Set the project_id variable.
# project_id = 'your-project-id'
#
# The read session is created in this project. This project can be
# different from that which contains the table.
client = BigQueryReadClient()
# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
"bigquery-public-data", "usa_names", "usa_1910_current"
)
requested_session = types.ReadSession()
requested_session.table = table
# This API can also deliver data serialized in Apache Arrow format.
# This example leverages Apache Avro.
requested_session.data_format = types.DataFormat.AVRO
# We limit the output columns to a subset of those allowed in the table,
# and set a simple filter to only report names from the state of
# Washington (WA).
requested_session.read_options.selected_fields = ["name", "number", "state"]
requested_session.read_options.row_restriction = 'state = "WA"'
# Set a snapshot time if it's been specified.
if snapshot_millis > 0:
snapshot_time = types.Timestamp()
snapshot_time.FromMilliseconds(snapshot_millis)
requested_session.table_modifiers.snapshot_time = snapshot_time
parent = "projects/{}".format(project_id)
session = client.create_read_session(
parent=parent,
read_session=requested_session,
# We'll use only a single stream for reading data from the table. However,
# if you wanted to fan out multiple readers you could do so by having a
# reader process each individual stream.
max_stream_count=1,
)
reader = client.read_rows(session.streams[0].name)
# The read stream contains blocks of Avro-encoded bytes. The rows() method
# uses the fastavro library to parse these blocks as an iterable of Python
# dictionaries. Install fastavro with the following command:
#
# pip install google-cloud-bigquery-storage[fastavro]
rows = reader.rows(session)
# Do any local processing by iterating over the rows. The
# google-cloud-bigquery-storage client reconnects to the API after any
# transient network errors or timeouts.
names = set()
states = set()
for row in rows:
names.add(row["name"])
states.add(row["state"])
print("Got {} unique names in states: {}".format(len(names), ", ".join(states)))
A seguir
Para pesquisar e filtrar exemplos de código de outros produtos do Google Cloud, consulte o navegador de exemplos do Google Cloud