This document describes how to write data from Dataflow to BigQuery by using the Apache Beam BigQuery I/O connector.
The BigQuery I/O connector is available in the Apache Beam SDK. We recommend using the latest SDK version. For more information, see Apache Beam 2.x SDKs.
Cross-language support for Python is also available.
Overview
The BigQuery I/O connector supports the following methods for writing to BigQuery:
STORAGE_WRITE_API
. In this mode, the connector performs direct writes to BigQuery storage, using the BigQuery Storage Write API. The Storage Write API combines streaming ingestion and batch loading into a single high-performance API. This mode guarantees exactly-once semantics.STORAGE_API_AT_LEAST_ONCE
. This mode also uses the Storage Write API, but provides at-least-once semantics. This mode results in lower latency for most pipelines. However, duplicate writes are possible.FILE_LOADS
. In this mode, the connector writes the input data to staging files in Cloud Storage. Then it runs a BigQuery load job to load the data into BigQuery. The mode is the default for boundedPCollections
, which are most commonly found in batch pipelines.STREAMING_INSERTS
. In this mode, the connector uses the legacy streaming API. This mode is the default for unboundedPCollections
, but is not recommended for new projects.
When choosing a write method, consider the following points:
- For streaming jobs, consider using
STORAGE_WRITE_API
orSTORAGE_API_AT_LEAST_ONCE
, because these modes write directly to BigQuery storage, without using intermediate staging files. - If you run the pipeline using
at-least-once streaming mode, set the
write mode to
STORAGE_API_AT_LEAST_ONCE
. This setting is more efficient and matches the semantics of at-least-once streaming mode. - File loads and Storage Write API have different quotas and limits.
- Load jobs use either the shared BigQuery slot pool or reserved
slots. To use reserved slots, run the load job in a project with a reservation
assignment of type
PIPELINE
. Load jobs are free if you use the shared BigQuery slot pool. However, BigQuery does not make guarantees about the available capacity of the shared pool. For more information, see Introduction to reservations.
Parallelism
For
FILE_LOADS
andSTORAGE_WRITE_API
in streaming pipelines, the connector shards the data to a number of files or streams. In general, we recommend callingwithAutoSharding
to enable auto-sharding.For
FILE_LOADS
in batch pipelines, the connector writes data to partitioned files, which are then loaded into BigQuery in parallel.For
STORAGE_WRITE_API
in batch pipelines, each worker creates one or more streams to write to BigQuery, determined by the total number of shards.For
STORAGE_API_AT_LEAST_ONCE
, there is a single default write stream. Multiple workers append to this stream.
Performance
The following table shows performance metrics for various
BigQuery I/O read options. The workloads were run on one
e2-standard2
worker, using the Apache Beam SDK 2.49.0 for Java. They did
not use Runner v2.
100 M records | 1 kB | 1 column | Throughput (bytes) | Throughput (elements) |
---|---|---|
Storage Write | 55 MBps | 54,000 elements per second |
Avro Load | 78 MBps | 77,000 elements per second |
Json Load | 54 MBps | 53,000 elements per second |
These metrics are based on simple batch pipelines. They are intended to compare performance between I/O connectors, and are not necessarily representative of real-world pipelines. Dataflow pipeline performance is complex, and is a function of VM type, the data being processed, the performance of external sources and sinks, and user code. Metrics are based on running the Java SDK, and aren't representative of the performance characteristics of other language SDKs. For more information, see Beam IO Performance.
Best practices
This section describes best practices for writing to BigQuery from Dataflow.
General considerations
The Storage Write API has quota limits. The connector handles these limits for most pipelines. However, some scenarios can exhaust the available Storage Write API streams. For example, this issue might happen in a pipeline that uses auto-sharding and autoscaling with a large number of destinations, especially in long-running jobs with highly variable workloads. If this problem occurs, consider using
STORAGE_WRITE_API_AT_LEAST_ONCE
, which avoids the issue.Use Google Cloud metrics to monitor your Storage Write API quota usage.
When using file loads, Avro typically outperforms JSON. To use Avro, call
withAvroFormatFunction
.By default, load jobs run in the same project as the Dataflow job. To specify a different project, call
withLoadJobProjectId
.When using the Java SDK, consider creating a class that represents the schema of the BigQuery table. Then call
useBeamSchema
in your pipeline to automatically convert between Apache BeamRow
and BigQueryTableRow
types. For an example of a schema class, seeExampleModel.java
.If you load tables with complex schemas containing thousands of fields, consider calling
withMaxBytesPerPartition
to set a smaller maximum size for each load job.
Streaming pipelines
The following recommendations apply to streaming pipelines.
For streaming pipelines, we recommend using the Storage Write API (
STORAGE_WRITE_API
orSTORAGE_API_AT_LEAST_ONCE
).A streaming pipeline can use file loads, but this approach has disadvantages:
- It requires windowing in order to write the files. You can't use the global window.
- BigQuery loads files on a best-effort basis when using the shared slot pool. There can be a significant delay between when a record is written and when it's available in BigQuery.
- If a load job fails — for example, due to bad data or a schema mismatch — the entire pipeline fails.
Consider using
STORAGE_WRITE_API_AT_LEAST_ONCE
when possible. It can result in duplicate records being written to BigQuery, but is less expensive and more scalable thanSTORAGE_WRITE_API
.In general, avoid using
STREAMING_INSERTS
. Streaming inserts are more expensive than Storage Write API, and don't perform as well.Data sharding can improve performance in streaming pipelines. For most pipelines, auto-sharding is a good starting point. However, you can tune sharding as follows:
- For
STORAGE_WRITE_API
, callwithNumStorageWriteApiStreams
to set the number of write streams. - For
FILE_LOADS
, callwithNumFileShards
to set the number of file shards.
- For
If you use streaming inserts, we recommend setting
retryTransientErrors
as the retry policy.
Batch pipelines
The following recommendations apply to batch pipelines.
For most large batch pipelines, we recommend first trying
FILE_LOADS
. A batch pipeline can useSTORAGE_WRITE_API
, but it's likely to exceed quota limits at large scale (1,000+ vCPUs) or if concurrent pipelines are running. Apache Beam doesn't throttle the maximum number of write streams for batchSTORAGE_WRITE_API
jobs, so the job eventually reaches BigQuery Storage API limits.When using
FILE_LOADS
, you might exhaust either the shared BigQuery slot pool or your pool of reserved slots. If you encounter this kind of failure, try the following approaches:- Reduce the maximum number of workers or worker size for the job.
- Purchase more reserved slots.
- Consider using
STORAGE_WRITE_API
.
Small to medium pipelines (<1,000 vCPUs) might benefit from using
STORAGE_WRITE_API
. For these smaller jobs, consider usingSTORAGE_WRITE_API
if you want a dead letter queue or when theFILE_LOADS
shared slot pool is not enough.If you can tolerate duplicate data, consider using
STORAGE_WRITE_API_AT_LEAST_ONCE
. This mode can result in duplicate records being written to BigQuery, but might be less expensive than theSTORAGE_WRITE_API
option.Different write modes might perform differently based on the characteristics of your pipeline. Experiment to find the best write mode for your workload.
Handle row-level errors
This section describes how to handle errors that might happen at the row level, for example because of badly formed input data or schema mismatches.
For Storage Write API, any rows that can't be written are placed
into a separate PCollection
. To get this collection, call
getFailedStorageApiInserts
on the WriteResult
object. For an example of this approach, see
Stream data to BigQuery.
It's a good practice to
send the errors to a dead-letter queue or table, for later processing. For more
information about this pattern, see
BigQueryIO
dead letter pattern.
For FILE_LOADS
, if an error occurs while loading the data, the load job fails
and the pipeline throws a runtime exception. You can view the error in the
Dataflow logs or look at the BigQuery job history.
The I/O connector does not return information about individual failed rows.
For more information about troubleshooting errors, see BigQuery connector errors.
Examples
The following examples show how to use Dataflow to write to BigQuery.
Write to an existing table
The following example creates a batch pipeline that writes a
PCollection<MyData>
to BigQuery, where MyData
is a custom
data type.
The BigQueryIO.write()
method returns a
BigQueryIO.Write<T>
type, which is used to configure the write
operation. For more information, see
Writing to a table
in the Apache Beam documentation. This code example writes to an existing
table (CREATE_NEVER
) and appends the new rows to the table (WRITE_APPEND
).
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Write to a new or existing table
The following example creates a new table if the destination table does not
exist, by setting the
create disposition
to CREATE_IF_NEEDED
. When you use this option, you must provide a table
schema. The connector uses this schema if it creates a new table.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Stream data to BigQuery
The following example shows how to stream data using exactly-once semantics, by
setting the write mode to STORAGE_WRITE_API
Not all streaming pipelines require exactly-once semantics. For example, you
might be able to
manually remove duplicates
from the destination table. If the possibility of duplicate records is
acceptable for your scenario, consider using at-least-once semantics by setting
the write method to STORAGE_API_AT_LEAST_ONCE
. This method is
generally more efficient and results in lower latency for most pipelines.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
What's next
- Learn more about the BigQuery I/O connector in the Apache Beam documentation.
- Read about Streaming data into BigQuery using Storage Write API (blog post).