Usa cloud.google.com/go/bigquery/storage/managedwriter para anexar dados com uma mensagem definida estaticamente.
Mais informações
Para ver a documentação detalhada que inclui este exemplo de código, consulte:
Exemplo de código
Go
Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Go.
Para autenticar no BigQuery, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
import (
"context"
"fmt"
"io"
"math/rand"
"time"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"github.com/GoogleCloudPlatform/golang-samples/bigquery/snippets/managedwriter/exampleproto"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
)
// generateExampleMessages generates a slice of serialized protobuf messages using a statically defined
// and compiled protocol buffer file, and returns the binary serialized representation.
func generateExampleMessages(numMessages int) ([][]byte, error) {
msgs := make([][]byte, numMessages)
for i := 0; i < numMessages; i++ {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
// Our example data embeds an array of structs, so we'll construct that first.
sList := make([]*exampleproto.SampleStruct, 5)
for i := 0; i < int(random.Int63n(5)+1); i++ {
sList[i] = &exampleproto.SampleStruct{
SubIntCol: proto.Int64(random.Int63()),
}
}
m := &exampleproto.SampleData{
BoolCol: proto.Bool(true),
BytesCol: []byte("some bytes"),
Float64Col: proto.Float64(3.14),
Int64Col: proto.Int64(123),
StringCol: proto.String("example string value"),
// These types require special encoding/formatting to transmit.
// DATE values are number of days since the Unix epoch.
DateCol: proto.Int32(int32(time.Now().UnixNano() / 86400000000000)),
// DATETIME uses the literal format.
DatetimeCol: proto.String("2022-01-01 12:13:14.000000"),
// GEOGRAPHY uses Well-Known-Text (WKT) format.
GeographyCol: proto.String("POINT(-122.350220 47.649154)"),
// NUMERIC and BIGNUMERIC can be passed as string, or more efficiently
// using a packed byte representation.
NumericCol: proto.String("99999999999999999999999999999.999999999"),
BignumericCol: proto.String("578960446186580977117854925043439539266.34992332820282019728792003956564819967"),
// TIME also uses literal format.
TimeCol: proto.String("12:13:14.000000"),
// TIMESTAMP uses microseconds since Unix epoch.
TimestampCol: proto.Int64(time.Now().UnixNano() / 1000),
// Int64List is an array of INT64 types.
Int64List: []int64{2, 4, 6, 8},
// This is a required field, and thus must be present.
RowNum: proto.Int64(23),
// StructCol is a single nested message.
StructCol: &exampleproto.SampleStruct{
SubIntCol: proto.Int64(random.Int63()),
},
// StructList is a repeated array of a nested message.
StructList: sList,
}
b, err := proto.Marshal(m)
if err != nil {
return nil, fmt.Errorf("error generating message %d: %w", i, err)
}
msgs[i] = b
}
return msgs, nil
}
// appendToPendingStream demonstrates using the managedwriter package to write some example data
// to a pending stream, and then committing it to a table.
func appendToPendingStream(w io.Writer, projectID, datasetID, tableID string) error {
// projectID := "myproject"
// datasetID := "mydataset"
// tableID := "mytable"
ctx := context.Background()
// Instantiate a managedwriter client to handle interactions with the service.
client, err := managedwriter.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("managedwriter.NewClient: %w", err)
}
// Close the client when we exit the function.
defer client.Close()
// Create a new pending stream. We'll use the stream name to construct a writer.
pendingStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_PENDING,
},
})
if err != nil {
return fmt.Errorf("CreateWriteStream: %w", err)
}
// We need to communicate the descriptor of the protocol buffer message we're using, which
// is analagous to the "schema" for the message. Both SampleData and SampleStruct are
// two distinct messages in the compiled proto file, so we'll use adapt.NormalizeDescriptor
// to unify them into a single self-contained descriptor representation.
m := &exampleproto.SampleData{}
descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
if err != nil {
return fmt.Errorf("NormalizeDescriptor: %w", err)
}
// Instantiate a ManagedStream, which manages low level details like connection state and provides
// additional features like a future-like callback for appends, etc. NewManagedStream can also create
// the stream on your behalf, but in this example we're being explicit about stream creation.
managedStream, err := client.NewManagedStream(ctx, managedwriter.WithStreamName(pendingStream.GetName()),
managedwriter.WithSchemaDescriptor(descriptorProto))
if err != nil {
return fmt.Errorf("NewManagedStream: %w", err)
}
defer managedStream.Close()
// First, we'll append a single row.
rows, err := generateExampleMessages(1)
if err != nil {
return fmt.Errorf("generateExampleMessages: %w", err)
}
// We'll keep track of the current offset in the stream with curOffset.
var curOffset int64
// We can append data asyncronously, so we'll check our appends at the end.
var results []*managedwriter.AppendResult
result, err := managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(0))
if err != nil {
return fmt.Errorf("AppendRows first call error: %w", err)
}
results = append(results, result)
// Advance our current offset.
curOffset = curOffset + 1
// This time, we'll append three more rows in a single request.
rows, err = generateExampleMessages(3)
if err != nil {
return fmt.Errorf("generateExampleMessages: %w", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
return fmt.Errorf("AppendRows second call error: %w", err)
}
results = append(results, result)
// Advance our offset again.
curOffset = curOffset + 3
// Finally, we'll append two more rows.
rows, err = generateExampleMessages(2)
if err != nil {
return fmt.Errorf("generateExampleMessages: %w", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
return fmt.Errorf("AppendRows third call error: %w", err)
}
results = append(results, result)
// Now, we'll check that our batch of three appends all completed successfully.
// Monitoring the results could also be done out of band via a goroutine.
for k, v := range results {
// GetResult blocks until we receive a response from the API.
recvOffset, err := v.GetResult(ctx)
if err != nil {
return fmt.Errorf("append %d returned error: %w", k, err)
}
fmt.Fprintf(w, "Successfully appended data at offset %d.\n", recvOffset)
}
// We're now done appending to this stream. We now mark pending stream finalized, which blocks
// further appends.
rowCount, err := managedStream.Finalize(ctx)
if err != nil {
return fmt.Errorf("error during Finalize: %w", err)
}
fmt.Fprintf(w, "Stream %s finalized with %d rows.\n", managedStream.StreamName(), rowCount)
// To commit the data to the table, we need to run a batch commit. You can commit several streams
// atomically as a group, but in this instance we'll only commit the single stream.
req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: managedwriter.TableParentFromStreamName(managedStream.StreamName()),
WriteStreams: []string{managedStream.StreamName()},
}
resp, err := client.BatchCommitWriteStreams(ctx, req)
if err != nil {
return fmt.Errorf("client.BatchCommit: %w", err)
}
if len(resp.GetStreamErrors()) > 0 {
return fmt.Errorf("stream errors present: %v", resp.GetStreamErrors())
}
fmt.Fprintf(w, "Table data committed at %s\n", resp.GetCommitTime().AsTime().Format(time.RFC3339Nano))
return nil
}
A seguir
Para pesquisar e filtrar exemplos de código de outros produtos do Google Cloud, consulte o navegador de exemplos do Google Cloud.