Package (v1.21.0)

Package managedwriter provides an EXPERIMENTAL thick client around the BigQuery storage API's BigQueryWriteClient. More information about this new write client may also be found in the public documentation:

It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha state, and breaking changes are frequent.

Currently, this client targets the BigQueryWriteClient present in the v1beta2 endpoint, and is intended as a more feature-rich successor to the classic BigQuery streaming interface, which is presented as the Inserter abstraction in, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST methods.

Appending data is accomplished through the use of streams. There are four stream types, each targetting slightly different use cases and needs. See the StreamType documentation for more details about each stream type.

This API uses serialized protocol buffer messages for appending data to streams. For users who don't have predefined protocol buffer messages for sending data, the subpackage includes functionality for defining protocol buffer messages dynamically using table schema information, which enables users to do things like using protojson to convert json text into a protocol buffer.



const DetectProjectID = "*detect-project-id*"

DetectProjectID is a sentinel value that instructs NewClient to detect the project ID. It is given in place of the projectID argument. NewClient will use the project ID from the given credentials or the default credentials ( if no credentials were provided. When providing credentials, not all options will allow NewClient to extract the project ID. Specifically a JWT does not have the project ID encoded.


const NoStreamOffset int64 = -1

NoStreamOffset is a sentinel value for signalling we're not tracking stream offset (e.g. a default stream which allows simultaneous append streams).


AppendRequests, AppendBytes, AppendResponses, FlushRequests, AppendClientOpenCount, AppendClientOpenRetryCount

var (
	// AppendRequests is a measure of the number of append requests sent.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)

	// AppendBytes is a measure of the bytes sent as append requests.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)

	// AppendResponses is a measure of the number of append responses received.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)

	// FlushRequests is a measure of the number of FlushRows requests sent.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)

	// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)

	// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)

AppendRequestsView, AppendBytesView, AppendResponsesView, FlushRequestsView, AppendClientOpenView, AppendClientOpenRetryView

var (
	// AppendRequestsView is a cumulative sum of AppendRequests.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendRequestsView *view.View

	// AppendBytesView is a cumulative sum of AppendBytes.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendBytesView *view.View

	// AppendResponsesView is a cumulative sum of AppendResponses.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendResponsesView *view.View

	// FlushRequestsView is a cumulative sum of FlushRequests.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	FlushRequestsView *view.View

	// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenView *view.View

	// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	AppendClientOpenRetryView *view.View


func TableParentFromStreamName

func TableParentFromStreamName(streamName string) string

TableParentFromStreamName is a utility function for extracting the parent table prefix from a stream name. When an invalid stream ID is passed, this simply returns the original stream name.


type AppendResult struct {
	// contains filtered or unexported fields

AppendResult tracks the status of a single row of data.

func (*AppendResult) GetResult

func (ar *AppendResult) GetResult(ctx context.Context) (int64, error)

GetResult returns the optional offset of this row, or the associated error. It blocks until the result is ready.

func (*AppendResult) Ready

func (ar *AppendResult) Ready() <-chan>

Ready blocks until the append request has reached a completed state, which may be a successful append or an error.


type Client struct {
	// contains filtered or unexported fields

Client is a managed BigQuery Storage write client scoped to a single project.

func NewClient

func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error)

NewClient instantiates a new client.

func (*Client) BatchCommit

func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error)

BatchCommit is used to commit one or more PendingStream streams belonging to the same table as a single transaction. Streams must be finalized before committing.

Format of the parentTable is: projects/{project}/datasets/{dataset}/tables/{table} and the utility function TableParentFromStreamName can be used to derive this from a Stream's name.

If the returned response contains stream errors, this indicates that the batch commit failed and no data was committed.

TODO: currently returns the raw response. Determine how we want to surface StreamErrors.

func (*Client) Close

func (c *Client) Close() error

Close releases resources held by the client.

func (*Client) NewManagedStream

func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error)

NewManagedStream establishes a new managed stream for appending data into a table.

Context here is retained for use by the underlying streaming connections the managed stream may create.


type ManagedStream struct {
	// contains filtered or unexported fields

ManagedStream is the abstraction over a single write stream.

func (*ManagedStream) AppendRows

func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error)

AppendRows sends the append requests to the service, and returns one AppendResult per row. The format of the row data is binary serialized protocol buffer bytes, and and the message must adhere to the format of the schema Descriptor passed in when creating the managed stream.

func (*ManagedStream) Close

func (ms *ManagedStream) Close() error

Close closes a managed stream.

func (*ManagedStream) Finalize

func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error)

Finalize is used to mark a stream as complete, and thus ensure no further data can be appended to the stream. You cannot finalize a DefaultStream, as it always exists.

Finalizing does not advance the current offset of a BufferedStream, nor does it commit data in a PendingStream.

func (*ManagedStream) FlushRows

func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error)

FlushRows advances the offset at which rows in a BufferedStream are visible. Calling this method for other stream types yields an error.

func (*ManagedStream) StreamName

func (ms *ManagedStream) StreamName() string

StreamName returns the corresponding write stream ID being managed by this writer.

func (*ManagedStream) StreamType

func (ms *ManagedStream) StreamType() StreamType

StreamType returns the configured type for this stream.


type StreamType string

StreamType indicates the type of stream this write client is managing.

DefaultStream, CommittedStream, BufferedStream, PendingStream

var (
	// DefaultStream most closely mimics the legacy bigquery
	// tabledata.insertAll semantics.  Successful inserts are
	// committed immediately, and there's no tracking offsets as
	// all writes go into a "default" stream that always exists
	// for a table.
	DefaultStream StreamType = "DEFAULT"

	// CommittedStream appends data immediately, but creates a
	// discrete stream for the work so that offset tracking can
	// be used to track writes.
	CommittedStream StreamType = "COMMITTED"

	// BufferedStream is a form of checkpointed stream, that allows
	// you to advance the offset of visible rows via Flush operations.
	// NOTE: Buffered Streams are currently in limited preview, and as such
	// methods like FlushRows() may yield errors for non-enrolled projects.
	BufferedStream StreamType = "BUFFERED"

	// PendingStream is a stream in which no data is made visible to
	// readers until the stream is finalized and committed explicitly.
	PendingStream StreamType = "PENDING"


type WriterOption func(*ManagedStream)

WriterOption are variadic options used to configure a ManagedStream instance.

func WithDataOrigin

func WithDataOrigin(dataOrigin string) WriterOption

WithDataOrigin is used to attach an origin context to the instrumentation metrics emitted by the library.

func WithDestinationTable

func WithDestinationTable(destTable string) WriterOption

WithDestinationTable specifies the destination table to which a created stream will append rows. Format of the table:


func WithMaxInflightBytes

func WithMaxInflightBytes(n int) WriterOption

WithMaxInflightBytes bounds the inflight append request bytes on the write connection.

func WithMaxInflightRequests

func WithMaxInflightRequests(n int) WriterOption

WithMaxInflightRequests bounds the inflight appends on the write connection.

func WithSchemaDescriptor

func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption

WithSchemaDescriptor describes the format of the serialized data being sent by AppendRows calls on the stream.

func WithStreamName

func WithStreamName(name string) WriterOption

WithStreamName allows users to set the stream name this writer will append to explicitly. By default, the managed client will create the stream when instantiated if necessary.

Note: Supplying this option causes other options which affect stream construction such as WithStreamType and WithDestinationTable to be ignored.

func WithTraceID

func WithTraceID(traceID string) WriterOption

WithTraceID allows instruments requests to the service with a custom trace prefix. This is generally for diagnostic purposes only.

func WithType

func WithType(st StreamType) WriterOption

WithType sets the stream type for the managed stream.