Package cloud.google.com/go/bigquery/storage/managedwriter (v1.60.0)

Package managedwriter provides a thick client around the BigQuery storage API's BigQueryWriteClient. More information about this new write client may also be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api

Currently, this client targets the BigQueryWriteClient present in the v1 endpoint, and is intended as a more feature-rich successor to the classic BigQuery streaming interface, which is presented as the Inserter abstraction in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST methods.

Creating a Client

To start working with this package, create a client:

ctx := context.Background()
client, err := managedwriter.NewClient(ctx, projectID)
if err != nil {
    // TODO: Handle error.
}

Defining the Protocol Buffer Schema

The write functionality of BigQuery Storage requires data to be sent using encoded protocol buffer messages using proto2 wire format. As the protocol buffer is not self-describing, you will need to provide the protocol buffer schema. This is communicated using a DescriptorProto message, defined within the protocol buffer libraries: https://pkg.go.dev/google.golang.org/protobuf/types/descriptorpb#DescriptorProto

More information about protocol buffers can be found in the proto2 language guide: https://developers.google.com/protocol-buffers/docs/proto

Details about data type conversions between BigQuery and protocol buffers can be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api#data_type_conversions

For cases where the protocol buffer is compiled from a static ".proto" definition, this process is straightforward. Instantiate an example message, then convert the descriptor into a descriptor proto:

m := &myprotopackage.MyCompiledMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

If the message uses advanced protocol buffer features like nested messages/groups, or enums, the cloud.google.com/go/bigquery/storage/managedwriter/adapt subpackage contains functionality to normalize the descriptor into a self-contained definition:

m := &myprotopackage.MyCompiledMessage{}
descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
if err != nil {
    // TODO: Handle error.
}

The adapt subpackage also contains functionality for generating a DescriptorProto using a BigQuery table's schema directly.

Constructing a ManagedStream

The ManagedStream handles management of the underlying write connection to the BigQuery Storage service. You can either create a write session explicitly and pass it in, or create the write stream while setting up the ManagedStream.

It's easiest to register the protocol buffer descriptor you'll be using to send data when setting up the managed stream using the WithSchemaDescriptor option, though you can also set/change the schema as part of an append request once the ManagedStream is created.

// Create a ManagedStream using an explicit stream identifer, either a default
// stream or one explicitly created by CreateWriteStream.
managedStream, err := client.NewManagedStream(ctx,
    WithStreamName(streamName),
    WithSchemaDescriptor(descriptorProto))
if err != nil {
    // TODO: Handle error.
}

In addition, NewManagedStream can create new streams implicitly:

// Alternately, allow the ManagedStream to handle stream construction by supplying
// additional options.
tableName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", myProject, myDataset, myTable)
manageStream, err := client.NewManagedStream(ctx,
    WithDestinationTable(tableName),
    WithType(managedwriter.BufferedStream),
    WithSchemaDescriptor(descriptorProto))
if err != nil {
    // TODO: Handle error.
}

Writing Data

Use the AppendRows function to write one or more serialized proto messages to a stream. You can choose to specify an offset in the stream to handle de-duplication for user-created streams, but a "default" stream neither accepts nor reports offsets.

AppendRows returns a future-like object that blocks until the write is successful or yields an error.

   // Define a couple of messages.
    mesgs := []*myprotopackage.MyCompiledMessage{
        {
            UserName: proto.String("johndoe"),
            EmailAddress: proto.String("jd@mycompany.mydomain",
            FavoriteNumbers: []proto.Int64{1,42,12345},
        },
        {
            UserName: proto.String("janesmith"),
            EmailAddress: proto.String("smith@othercompany.otherdomain",
            FavoriteNumbers: []proto.Int64{1,3,5,7,9},
        },
    }

    // Encode the messages into binary format.
    encoded := make([][]byte, len(mesgs))
    for k, v := range mesgs{
        b, err := proto.Marshal(v)
        if err != nil {
            // TODO: Handle error.
        }
        encoded[k] = b
    }

    // Send the rows to the service, and specify an offset for managing deduplication.
    result, err := managedStream.AppendRows(ctx, encoded, WithOffset(0))

    // Block until the write is complete and return the result.
    returnedOffset, err := result.GetResult(ctx)
    if err != nil {
        // TODO: Handle error.
    }

Buffered Stream Management

For Buffered streams, users control when data is made visible in the destination table/stream independently of when it is written. Use FlushRows on the ManagedStream to advance the flush point ahead in the stream.

// We've written 1500+ rows in the stream, and want to advance the flush point
// ahead to make the first 1000 rows available.
flushOffset, err := managedStream.FlushRows(ctx, 1000)

Pending Stream Management

Pending streams allow users to commit data from multiple streams together once the streams have been finalized, meaning they'll no longer allow further data writes.

// First, finalize the stream we're writing into.
totalRows, err := managedStream.Finalize(ctx)
if err != nil {
    // TODO: Handle error.
}

req := &storagepb.BatchCommitWriteStreamsRequest{
    Parent: parentName,
    WriteStreams: []string{managedStream.StreamName()},
}
// Using the client, we can commit data from multple streams to the same
// table atomically.
resp, err := client.BatchCommitWriteStreams(ctx, req)

Error Handling and Automatic Retries

Like other Google Cloud services, this API relies on common components that can provide an enhanced set of errors when communicating about the results of API interactions.

Specifically, the apierror package (https://pkg.go.dev/github.com/googleapis/gax-go/v2/apierror) provides convenience methods for extracting structured information about errors.

The BigQuery Storage API service augments applicable errors with service-specific details in the form of a StorageError message. The StorageError message is accessed via the ExtractProtoMessage method in the apierror package. Note that the StorageError messsage does not implement Go's error interface.

An example of accessing the structured error details:

// By way of example, let's assume the response from an append call returns an error.
_, err := result.GetResult(ctx)
if err != nil {
    if apiErr, ok := apierror.FromError(err); ok {
        // We now have an instance of APIError, which directly exposes more specific
        // details about multiple failure conditions include transport-level errors.
        storageErr := &storagepb.StorageError{}
        if e := apiErr.Details().ExtractProtoMessage(storageErr); e != nil {
            // storageErr now contains service-specific information about the error.
            log.Printf("Received service-specific error code %s", storageErr.GetCode().String())
        }
    }
}

This library supports the ability to retry failed append requests, but this functionality is not enabled by default. You can enable it via the EnableWriteRetries option when constructing a new managed stream. Use of automatic retries can impact correctness when attempting certain exactly-once write patterns, but is generally recommended for workloads that only need at-least-once writing.

With write retries enabled, failed writes will be automatically attempted a finite number of times (currently 4) if the failure is considered retriable.

In support of the retry changes, the AppendResult returned as part of an append call now includes TotalAttempts(), which returns the number of times that specific append was enqueued to the service. Values larger than 1 are indicative of a specific append being enqueued multiple times.

Usage of Contexts

The underlying rpc mechanism used to transmit requests and responses between this client and the service uses a gRPC bidirectional streaming protocol, and the context provided when invoking NewClient to instantiate the client is used to maintain those background connections.

This package also exposes context when instantiating a new writer (NewManagedStream), as well as allowing a per-request context when invoking the AppendRows function to send a set of rows. If the context becomes invalid on the writer all subsequent AppendRows requests will be blocked.

Finally, there is a per-request context supplied as part of the AppendRows call on the ManagedStream writer itself, useful for bounding individual requests.

Connection Sharing (Multiplexing)

Note: This feature is EXPERIMENTAL and subject to change.

The BigQuery Write API enforces a limit on the number of concurrent open connections, documented here: https://cloud.google.com/bigquery/quotas#write-api-limits

Users can now choose to enable connection sharing (multiplexing) when using ManagedStream writers that use default streams. The intent of this feature is to simplify connection management for users who wish to write to many tables, at a cardinality beyond the open connection quota. Please note that explicit streams (Committed, Buffered, and Pending) cannot leverage the connection sharing feature.

Multiplexing features are controlled by the package-specific custom ClientOption options exposed within this package. Additionally, some of the connection-related WriterOptions that can be specified when constructing ManagedStream writers are ignored for writers that leverage the shared multiplex connections.

At a high level, multiplexing uses some heuristics based on the flow control of the shared connections to infer whether the pool should add additional connections up to a user-specific limit per region, and attempts to balance traffic from writers to those connections.

To enable multiplexing for writes to default streams, simply instantiate the client with the desired options:

ctx := context.Background()
client, err := managedwriter.NewClient(ctx, projectID,
    WithMultiplexing,
    WithMultiplexPoolLimit(3),
)
if err != nil {
    // TODO: Handle error.
}

Special Consideration: The gRPC architecture is capable of its own sharing of underlying HTTP/2 connections. For users who are sending significant traffic on multiple writers (independent of whether they're leveraging multiplexing or not) may also wish to consider further tuning of this behavior. The managedwriter library sets a reasonable default, but this can be tuned further by leveraging the WithGRPCConnectionPool ClientOption, documented here: https://pkg.go.dev/google.golang.org/api/option#WithGRPCConnectionPool

A reasonable upper bound for the connection pool size is the number of concurrent writers for explicit stream plus the configured size of the multiplex pool.

Writing JSON Data

As an example, you can refer to this integration test that demonstrates writing JSON data to a stream: https://github.com/googleapis/google-cloud-go/blob/7a46b5428f239871993d66be2c7c667121f60a6f/bigquery/storage/managedwriter/integration_test.go#L397

This integration test assumes the destination table already exists. In addition, it relies upon having a definition of a BigQuery schema that is compatible with this table (for this example the schema is defined here: https://github.com/googleapis/google-cloud-go/blob/2020edff24e3ffe127248cf9a90c67593c303e18/bigquery/storage/managedwriter/testdata/schemas.go#L31). Given the schema, this test first utilizes the function setupDynamicDescriptors() to derive both a MessageDescriptor and DescriptorProto from the schema. This function is defined here: https://github.com/googleapis/google-cloud-go/blob/7a46b5428f239871993d66be2c7c667121f60a6f/bigquery/storage/managedwriter/integration_test.go#L100 The test initializes the ManagedStream it will write to with the derived DescriptorProto. The test then iterates through each of the JSON rows to be written. For each row, it first dynamically creates an empty Message based on the derived MessageDescriptor. Then it loads the JSON row into the Message. Finally it generates protocol buffer bytes from the Message. These bytes are then sent to the ManagedStream within an AppendRows request.

Constants

DetectProjectID

const DetectProjectID = "*detect-project-id*"

DetectProjectID is a sentinel value that instructs NewClient to detect the project ID. It is given in place of the projectID argument. NewClient will use the project ID from the given credentials or the default credentials (https://developers.google.com/accounts/docs/application-default-credentials) if no credentials were provided. When providing credentials, not all options will allow NewClient to extract the project ID. Specifically a JWT does not have the project ID encoded.

NoStreamOffset

const NoStreamOffset int64 = -1

NoStreamOffset is a sentinel value for signalling we're not tracking stream offset (e.g. a default stream which allows simultaneous append streams).

Variables

AppendClientOpenCount, AppendClientOpenRetryCount, AppendRequests, AppendRequestBytes, AppendRequestErrors, AppendRequestReconnects, AppendRequestRows, AppendResponses, AppendResponseErrors, AppendRetryCount, FlushRequests

var (
	// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)

	// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)

	// AppendRequests is a measure of the number of append requests sent.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)

	// AppendRequestBytes is a measure of the bytes sent as append requests.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)

	// AppendRequestErrors is a measure of the number of append requests that errored on send.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)

	// AppendRequestReconnects is a measure of the number of times that sending an append request triggered reconnect.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestReconnects = stats.Int64(statsPrefix+"append_reconnections", "Number of append rows reconnections", stats.UnitDimensionless)

	// AppendRequestRows is a measure of the number of append rows sent.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)

	// AppendResponses is a measure of the number of append responses received.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)

	// AppendResponseErrors is a measure of the number of append responses received with an error attached.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)

	// AppendRetryCount is a measure of the number of appends that were automatically retried by the library
	// after receiving a non-successful response.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless)

	// FlushRequests is a measure of the number of FlushRows requests sent.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
)

AppendClientOpenView, AppendClientOpenRetryView, AppendRequestsView, AppendRequestBytesView, AppendRequestErrorsView, AppendRequestReconnectsView, AppendRequestRowsView, AppendResponsesView, AppendResponseErrorsView, AppendRetryView, FlushRequestsView

var (

	// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenView *view.View

	// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenRetryView *view.View

	// AppendRequestsView is a cumulative sum of AppendRequests.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestsView *view.View

	// AppendRequestBytesView is a cumulative sum of AppendRequestBytes.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestBytesView *view.View

	// AppendRequestErrorsView is a cumulative sum of AppendRequestErrors.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestErrorsView *view.View

	// AppendRequestReconnectsView is a cumulative sum of AppendRequestReconnects.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestReconnectsView *view.View

	// AppendRequestRowsView is a cumulative sum of AppendRows.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestRowsView *view.View

	// AppendResponsesView is a cumulative sum of AppendResponses.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendResponsesView *view.View

	// AppendResponseErrorsView is a cumulative sum of AppendResponseErrors.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendResponseErrorsView *view.View

	// AppendRetryView is a cumulative sum of AppendRetryCount.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRetryView *view.View

	// FlushRequestsView is a cumulative sum of FlushRequests.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	FlushRequestsView *view.View
)

DefaultOpenCensusViews

var DefaultOpenCensusViews []*view.View

DefaultOpenCensusViews retains the set of all opencensus views that this library has instrumented, to add view registration for exporters.

Functions

func TableParentFromParts

func TableParentFromParts(projectID, datasetID, tableID string) string

TableParentFromParts constructs a table identifier using individual identifiers and returns a string in the form "projects/{project}/datasets/{dataset}/tables/{table}".

func TableParentFromStreamName

func TableParentFromStreamName(streamName string) string

TableParentFromStreamName is a utility function for extracting the parent table prefix from a stream name. When an invalid stream ID is passed, this simply returns the original stream name.

func WithDefaultAppendRowsCallOption

func WithDefaultAppendRowsCallOption(o gax.CallOption) option.ClientOption

WithDefaultAppendRowsCallOption is an EXPERIMENTAL ClientOption for controlling the gax.CallOptions passed when opening the underlying AppendRows bidi stream connections used by this library to communicate with the BigQuery Storage service. This option is propagated to all connections created by the instantiated Client.

Note: the WithAppendRowsCallOption WriterOption can still be used to control the behavior for individual ManagedStream writers that don't participate in multiplexing.

This ClientOption is EXPERIMENTAL and subject to change.

func WithDefaultInflightBytes

func WithDefaultInflightBytes(n int) option.ClientOption

WithDefaultInflightBytes is an EXPERIMENTAL ClientOption for controlling the default byte limit for how many individual AppendRows write requests can be in flight on a connection at a time. This limit is enforced on all connections created by the instantiated Client.

Note: the WithMaxInflightBytes WriterOption can still be used to control the behavior for individual ManagedStream writers when not using multiplexing.

This ClientOption is EXPERIMENTAL and subject to change.

func WithDefaultInflightRequests

func WithDefaultInflightRequests(n int) option.ClientOption

WithDefaultInflightRequests is an EXPERIMENTAL ClientOption for controlling the default limit of how many individual AppendRows write requests can be in flight on a connection at a time. This limit is enforced on all connections created by the instantiated Client.

Note: the WithMaxInflightRequests WriterOption can still be used to control the behavior for individual ManagedStream writers when not using multiplexing.

This ClientOption is EXPERIMENTAL and subject to change.

func WithMultiplexPoolLimit

func WithMultiplexPoolLimit(maxSize int) option.ClientOption

WithMultiplexPoolLimit is an EXPERIMENTAL option that sets the maximum shared multiplex pool size when instantiating the Client. If multiplexing is not enabled, this setting is ignored. By default, the limit is a single shared connection. This limit is applied per destination region.

This ClientOption is EXPERIMENTAL and subject to change.

func WithMultiplexing

func WithMultiplexing() option.ClientOption

WithMultiplexing is an EXPERIMENTAL option that controls connection sharing when instantiating the Client. Only writes to default streams can leverage the multiplex pool. Internally, the client maintains a pool of connections per BigQuery destination region, and will grow the pool to it's maximum allowed size if there's sufficient traffic on the shared connection(s).

This ClientOption is EXPERIMENTAL and subject to change.

AppendOption

type AppendOption func(*pendingWrite)

AppendOption are options that can be passed when appending data with a managed stream instance.

func UpdateDefaultMissingValueInterpretation

func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption

UpdateDefaultMissingValueInterpretation updates the default intepretations setting for the stream, and is retained for subsequent writes. See the WithDefaultMissingValueInterpretations WriterOption for more details.

func UpdateMissingValueInterpretations

func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption

UpdateMissingValueInterpretations updates the per-column missing-value intepretations settings, and is retained for subsequent writes. See the WithMissingValueInterpretations WriterOption for more details.

func UpdateSchemaDescriptor

func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption

UpdateSchemaDescriptor is used to update the descriptor message schema associated with a given stream.

func WithOffset

func WithOffset(offset int64) AppendOption

WithOffset sets an explicit offset value for this append request.

AppendResult

type AppendResult struct {
	// contains filtered or unexported fields
}

AppendResult tracks the status of a batch of data rows.

func (*AppendResult) FullResponse

func (ar *AppendResult) FullResponse(ctx context.Context) (*storagepb.AppendRowsResponse, error)

FullResponse returns the full content of the AppendRowsResponse, and any error encountered while processing the append.

The AppendRowResponse may contain an embedded error. An embedded error in the response will be converted and returned as the error response, so this method may return both the AppendRowsResponse and an error.

This call blocks until the result is ready, or context is no longer valid.

func (*AppendResult) GetResult

func (ar *AppendResult) GetResult(ctx context.Context) (int64, error)

GetResult returns the optional offset of this row, as well as any error encountered while processing the append.

This call blocks until the result is ready, or context is no longer valid.

func (*AppendResult) Ready

func (ar *AppendResult) Ready() <-chan>

Ready blocks until the append request has reached a completed state, which may be a successful append or an error.

func (*AppendResult) TotalAttempts

func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error)

TotalAttempts returns the number of times this write was attempted.

This call blocks until the result is ready, or context is no longer valid.

func (*AppendResult) UpdatedSchema

func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error)

UpdatedSchema returns the updated schema for a table if supplied by the backend as part of the append response.

This call blocks until the result is ready, or context is no longer valid.

Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a managed BigQuery Storage write client scoped to a single project.

func NewClient

func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error)

NewClient instantiates a new client.

The context provided here is retained and used for background connection management between the client and the BigQuery Storage service.

func (*Client) BatchCommitWriteStreams

BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same parent table.

Streams must be finalized before commit and cannot be committed multiple times. Once a stream is committed, data in the stream becomes available for read operations.

func (*Client) Close

func (c *Client) Close() error

Close releases resources held by the client.

func (*Client) CreateWriteStream

func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error)

CreateWriteStream creates a write stream to the given table. Additionally, every table has a special stream named ‘_default’ to which data can be written. This stream doesn’t need to be created using CreateWriteStream. It is a stream that can be used simultaneously by any number of clients. Data written to this stream is considered committed as soon as an acknowledgement is received.

func (*Client) GetWriteStream

func (c *Client) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error)

GetWriteStream returns information about a given WriteStream.

func (*Client) NewManagedStream

func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error)

NewManagedStream establishes a new managed stream for appending data into a table.

Context here is retained for use by the underlying streaming connections the managed stream may create.

ManagedStream

type ManagedStream struct {
	// contains filtered or unexported fields
}

ManagedStream is the abstraction over a single write stream.

func (*ManagedStream) AppendRows

func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error)

AppendRows sends the append requests to the service, and returns a single AppendResult for tracking the set of data.

The format of the row data is binary serialized protocol buffer bytes. The message must be compatible with the schema currently set for the stream.

Use the WithOffset() AppendOption to set an explicit offset for this append. Setting an offset for a default stream is unsupported.

The size of a single request must be less than 10 MB in size. Requests larger than this return an error, typically INVALID_ARGUMENT.

func (*ManagedStream) Close

func (ms *ManagedStream) Close() error

Close closes a managed stream.

func (*ManagedStream) Finalize

func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error)

Finalize is used to mark a stream as complete, and thus ensure no further data can be appended to the stream. You cannot finalize a DefaultStream, as it always exists.

Finalizing does not advance the current offset of a BufferedStream, nor does it commit data in a PendingStream.

func (*ManagedStream) FlushRows

func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error)

FlushRows advances the offset at which rows in a BufferedStream are visible. Calling this method for other stream types yields an error.

func (*ManagedStream) StreamName

func (ms *ManagedStream) StreamName() string

StreamName returns the corresponding write stream ID being managed by this writer.

func (*ManagedStream) StreamType

func (ms *ManagedStream) StreamType() StreamType

StreamType returns the configured type for this stream.

StreamType

type StreamType string

StreamType indicates the type of stream this write client is managing.

DefaultStream, CommittedStream, BufferedStream, PendingStream

var (
	// DefaultStream most closely mimics the legacy bigquery
	// tabledata.insertAll semantics.  Successful inserts are
	// committed immediately, and there's no tracking offsets as
	// all writes go into a "default" stream that always exists
	// for a table.
	DefaultStream StreamType = "DEFAULT"

	// CommittedStream appends data immediately, but creates a
	// discrete stream for the work so that offset tracking can
	// be used to track writes.
	CommittedStream StreamType = "COMMITTED"

	// BufferedStream is a form of checkpointed stream, that allows
	// you to advance the offset of visible rows via Flush operations.
	BufferedStream StreamType = "BUFFERED"

	// PendingStream is a stream in which no data is made visible to
	// readers until the stream is finalized and committed explicitly.
	PendingStream StreamType = "PENDING"
)

WriterOption

type WriterOption func(*ManagedStream)

WriterOption are variadic options used to configure a ManagedStream instance.

func EnableWriteRetries

func EnableWriteRetries(enable bool) WriterOption

EnableWriteRetries enables ManagedStream to automatically retry failed appends.

Enabling retries is best suited for cases where users want to achieve at-least-once append semantics. Use of automatic retries may complicate patterns where the user is designing for exactly-once append semantics.

func WithAppendRowsCallOption

func WithAppendRowsCallOption(o gax.CallOption) WriterOption

WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when it opens the underlying append stream.

Note: See the DefaultAppendRowsCallOption ClientOption for setting defaults when instantiating a client, rather than setting this limit per-writer. This WriterOption is ignored for ManagedStream writers that participate in multiplexing.

func WithDataOrigin

func WithDataOrigin(dataOrigin string) WriterOption

WithDataOrigin is used to attach an origin context to the instrumentation metrics emitted by the library.

func WithDefaultMissingValueInterpretation

func WithDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption

WithDefaultMissingValueInterpretation controls how missing values are interpreted by for a given stream. See WithMissingValueIntepretations for more information about missing values.

WithMissingValueIntepretations set for individual colums can override the default chosen with this option.

For example, if you want to write NULL instead of using default values for some columns, you can set default_missing_value_interpretation to DEFAULT_VALUE and at the same time, set missing_value_interpretations to NULL_VALUE on those columns.

func WithDestinationTable

func WithDestinationTable(destTable string) WriterOption

WithDestinationTable specifies the destination table to which a created stream will append rows. Format of the table:

projects/{projectid}/datasets/{dataset}/tables/{table}

func WithMaxInflightBytes

func WithMaxInflightBytes(n int) WriterOption

WithMaxInflightBytes bounds the inflight append request bytes on the write connection.

Note: See the WithDefaultInflightBytes ClientOption for setting a default when instantiating a client, rather than setting this limit per-writer. This WriterOption is ignored for ManagedStreams that participate in multiplexing.

func WithMaxInflightRequests

func WithMaxInflightRequests(n int) WriterOption

WithMaxInflightRequests bounds the inflight appends on the write connection.

Note: See the WithDefaultInflightRequests ClientOption for setting a default when instantiating a client, rather than setting this limit per-writer. This WriterOption is ignored for ManagedStreams that participate in multiplexing.

func WithMissingValueInterpretations

func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption

WithMissingValueInterpretations controls how missing values are interpreted for individual columns.

You must provide a map to indicate how to interpret missing value for some fields. Missing values are fields present in user schema but missing in rows. The key is the field name. The value is the interpretation of missing values for the field.

For example, the following option would indicate that missing values in the "foo" column are interpreted as null, whereas missing values in the "bar" column are treated as the default value:

   WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
                "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
                "bar": storagepb.AppendRowsRequest_NULL_VALUE,
      })

If a field is not in this map and has missing values, the missing values in this field are interpreted as NULL unless overridden with a default missing value interpretation.

Currently, field name can only be top-level column name, can't be a struct field path like 'foo.bar'.

func WithSchemaDescriptor

func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption

WithSchemaDescriptor describes the format of the serialized data being sent by AppendRows calls on the stream.

func WithStreamName

func WithStreamName(name string) WriterOption

WithStreamName allows users to set the stream name this writer will append to explicitly. By default, the managed client will create the stream when instantiated if necessary.

Note: Supplying this option causes other options which affect stream construction such as WithStreamType and WithDestinationTable to be ignored.

func WithTraceID

func WithTraceID(traceID string) WriterOption

WithTraceID allows instruments requests to the service with a custom trace prefix. This is generally for diagnostic purposes only.

func WithType

func WithType(st StreamType) WriterOption

WithType sets the stream type for the managed stream.