Reliability: Import data
This document addresses how to build reliable import solutions with BigQuery.
Managed storage
The core of BigQuery is composed of a query engine and a managed storage system that is optimized for analytics. Data is organized into datasets and tables, and is stored in a columnar format.
As a baseline, BigQuery managed storage provides regional availability and durability by virtue of synchronously writing data into two different zones. While one zone is designated as the primary zone at any moment in time, BigQuery will transparently and seamlessly failover to the other zone without any action on your part. This is how BigQuery is able to offer a 99.99% availability SLA.
Choosing an import method
There are multiple methods of importing data into BigQuery's managed storage:
Load Jobs: Asynchronous file-oriented ingestion. This is the most efficient method of ingesting data in bulk, but the data does not become available until the asynchronous job completes.
Storage Write API: Realtime row-oriented ingestion with streaming RPCs. Rows ingested with the Storage Write API can be synchronously appended to the table and immediately queryable if desired. It also supports both stream-level transactions and multi-stream transactions where records are only appended atomically upon commit.
Legacy streaming API: Realtime row-oriented ingestion by using unary RPCs. Rows ingested by the streaming API are synchronously appended to the table and immediately queryable. The legacy streaming API pre-dates the Storage Write API and is not recommended for new applications due to lower efficiency and higher cost.
While application requirements will play a large role in which data import method is best, there are some reliability considerations as well:
Scale: Batch ingestion with load jobs is more resource efficient compared to streaming and supports large scale data ingest out of the box. While the scalability of BigQuery streaming should not be a concern, default quotas might need to be increased. Customers can stream in tens of GB/sec and more than 10 million rows/sec. In addition, real-time streaming ingestion requires more BigQuery resources, which is reflected in the cost of the service.
Frequency: Batch ingestion is limited to 1500 jobs per-day per-table to prevent table fragmentation that can degrade query performance. Therefore, if you need to ingest more frequently than once a minute, the Storage Write API is the right choice.
Latency: While load jobs typically complete in a few minutes, BigQuery does not provide any latency guarantees. Therefore, if timely ingestion of data is crucial for your business, then Storage Write API is the right choice.
Import using load jobs
Achieve exactly-once semantics
If necessary for your application, you can ensure exactly-once insertion semantics with load jobs. To do so, follow the best practices in Error Handling.
Handle unavailability
Since load jobs are typically importing data already persisted in files, dealing with extended service unavailability is generally straightforward. There isn't a risk of data loss, as long as the source files are ensured to exist until a load job can successfully ingest them. You need a mechanism to retry them later, whether that be a thread that is performing exponential backoff with jitter, a Pub/Sub topic for failed loads that can be retried later, or some other tracking mechanism.
Manage quotas and limits
Load jobs are subject to a number of limits and quotas. Most notable is the limit of 1500 load jobs per-table per-day and the default quota of 100,000 load jobs per-project per-day. The former is a limit imposed by BigQuery to avoid table fragmentation that can hurt query performance, while the latter is a default quota to ensure BigQuery has sufficient capacity. Such capacity motivated quotas are large enough for most users, but can be increased upon request. While these are expressed as daily quotas, they are incrementally refilled throughout the day to avoid extended multi-hour outages if suddenly exhausted in a traffic spike. If your application is anywhere near these limits and quotas for load jobs, you should consider using the Storage Write API instead.
There are also limits on the shape of the import (column count, row bytes, file count, file bytes, etc.). Exceeding one of these limits can create a situation where a load job persistently fails and blocks your ingestion pipeline from moving on to the next set of files. Having a fallback plan for dealing with such "poison records" will make your ingestion pipeline more robust.
There are two methods for users to monitor usage. The first is in Cloud Monitoring. For more information, see BigQuery metrics. Specifically, you can monitor the amount of data and number of rows uploaded to a specific table. If your load jobs upload data to a specific table, this can be a proxy for monitoring load job upload data usage.
The second method is by using INFORMATION_SCHEMA.jobs. For example, this query will return the number of jobs by day, dataset, and table so that you can determine how much of the daily job quota is used.
SELECT DATE(creation_time) as day, destination_table.project_id as project_id, destination_table.dataset_id as dataset_id, destination_table.table_id as table_id, COUNT(job_id) AS load_job_count FROM region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE creation_time BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 8 DAY) AND CURRENT_TIMESTAMP() AND job_type = "LOAD" GROUP BY day, project_id, dataset_id, table_id ORDER BY day DESC
Import using Storage Write API
Achieve exactly-once semantics
The BigQuery Storage Write API offers exactly-once delivery semantics through the use of stream offsets when in commit mode. If the client provides an offset when inserting a record, then BigQuery will guarantee that the record is appended only once to the table. Unlike the Streaming API that offers best effort deduplication, the BigQuery Write API is a guarantee.
The Storage Write API offers a pending mode whereby rows inserted by a stream are atomically appended to the table only when the stream is committed. If the commit operation fails, it can be retried. A stream can only be committed once, so retries will never cause rows to be appended a second time.
Handle unavailability
Retrying with exponential backoff can mitigate random errors and brief periods of service unavailability, but to avoid dropping rows during extended unavailability requires more thought. In particular, if a client is persistently unable to insert a row, what should it do?
The answer depends on your requirements. For example, if BigQuery is being used for operational analytics where some missing rows are acceptable, then the client can give up after a few retries and discard the data. If, instead, every row is crucial to the business, such as with financial data, then you need to have a strategy to persist the data until it can be inserted later.
One common way to deal with persistent errors is to publish the rows to a Pub/Sub topic for later evaluation and possible insertion. Another common method is to temporarily persist the data on the client. Both methods can keep clients unblocked while at the same time ensuring that all rows can be inserted once availability is restored.
Manage quotas and limits
The Storage Write API is subject to some limits and quotas. You can view these quotas and limits in the Google Cloud console by clicking View quota in Google Cloud console in the documentation.
Import using streaming API
Achieve exactly-once semantics
The BigQuery Streaming API only offers best effort de-duplication for rows in the form of an insertId that acts as a row key. Since deduplication cannot be relied upon, its usage is discouraged. BigQuery's Storage Write API guarantees exactly-once semantics if so desired.
Handle unavailability
The strategies for handling unavailability of the BigQuery Streaming API are similar to that of the Storage Write API above.
Manage quotas and limits
The Streaming API is subject to a number of limits and quotas. All of the rate quotas are defaults that should be sufficient for most users but can be raised upon request. Their purpose is to ensure proper provisioning and configuration on the BigQuery side. For example, while maximum bytes per-second per-project defaults to 1 GB in the multi-regions, BigQuery can scale to more than 100 GB/s.
There are also limits on row size and request size. Exceeding one of these limits can create a situation where inserting a row persistently fails. For example, if a row exceeds the 5 MB limit, no amount of backing off and retrying will help. You need to have a plan for how to deal with such poison rows, such as a Pub/Sub dead-letter topic.
Project level quotas can be viewed in the Google Cloud console under IAM >
Quotas. You can see
dataset level usage metrics
in Cloud Monitoring. For more detailed data, you can get streaming metadata
using the
INFORMATION_SCHEMA.STREAMING_TIMELINE*
views.
Use cases for importing data
Real-time analytics
The first use case is a pipeline processing event data from endpoint logs. Events are generated continuously and need to be available for querying in BigQuery as soon as possible. As data freshness is paramount for this use case, the Storage Write API is the best choice to ingest data into BigQuery. A recommended architecture to keep these endpoints lean is sending events to Pub/Sub, from where they are consumed by a streaming Dataflow pipeline which directly streams to BigQuery.
A primary reliability concern for this architecture is how to deal with failing to insert a record into BigQuery. If each record is important and cannot be lost, data needs to be buffered before attempting to insert. In the recommended architecture above, Pub/Sub can play the role of a buffer with its message retention capabilities. The Dataflow pipeline should be configured to retry BigQuery streaming inserts with truncated exponential backoff. Once the capacity of Pub/Sub as a buffer is exhausted, for example in the case of prolonged unavailability of BigQuery or a network failure, data needs to be persisted on the client and the client needs a mechanism to resume inserting persisted records once availability is restored. For more information about how to handle this situation, see the Google Pub/Sub Reliability Guide blog post.
Another failure case to handle is that of a poison record. A poison record is either a record rejected by BigQuery because the record fails to insert with a non-retryable error or a record that has not been successfully inserted after the maximum number of retries. Both types of records should be stored in a "dead letter queue" by the Dataflow pipeline for further investigation.
If exactly-once semantics are required, streams should be written in committed mode, with record offsets provided by the client. This avoids duplicates, as the write operation is only performed if the offset value matches the next append offset. Not providing an offset means records are appended to the current end of the stream and retrying a failed append could result in the record appearing more than once in the stream.
If exactly-once guarantees are not required, writing to the default stream allows for a higher throughput and also does not count against the quota limit on creating write streams.
By default, the quota for streaming inserts is 1GB/s per project. Assuming the goal is to collect logs from 100M endpoints creating a 1.5k log record once a minute, you can estimate the throughput as 100M * 1.5k / 60s = 2.5GB/s. Ensure you have adequate quota to serve this kind of throughput in advance.
If your workload is generating or processing data at a very uneven rate, then
try to smooth out any load spikes on the client and stream into
BigQuery with a constant throughput. This can simplify your
capacity planning. If that is not possible, ensure you are prepared to handle
429
(resource exhausted) errors if and when your throughput does go over quota
during short spikes.
Batch data processing
The second use case is a nightly batch processing pipeline that needs to be completed by a fixed deadline. Data needs to be available by this deadline for further processing by another batch process to generate reports to be sent to a regulator. This use case is common in regulated industries such as finance.
Batch loading of data with load jobs is the right approach for this use case because latency is not a concern provided the deadline can be met. Ensure your Cloud Storage buckets meet the location requirements for loading data into the BigQuery dataset.
The result of a BigQuery load job is atomic; either all records
get inserted or none do. When inserting all data in a single load job, it makes
sense to always create a new table by using the WRITE_TRUNCATE
disposition of
the JobConfigurationLoad.
This is important especially when retrying a failed load job, as the client may
not be able to tell in all cases if the job has actually failed or the failure
was e.g. in communicating the success state back to the client.
Assuming data to be ingested has been successfully copied to Cloud Storage already, retrying with exponential backoff is sufficient to address ingestion failures.
A nightly batch job should never hit the default quota of 1,500 loads per table per day even with retries. When loading data incrementally, the default quota is sufficient for running a load job every 5 minutes and still have headroom for at least 1 retry per job on average.
Next steps
- Introduction to reliability
- Reliability: Querying data
- Reliability: Reading data
- Reliability: Disaster recovery