Data Analytics

Performing large-scale mutations in BigQuery

BigQuery is a fast, highly scalable, cost-effective and fully-managed enterprise data warehouse for analytics at any scale. As BigQuery has grown in popularity, one question that often comes up has to do with the topic of performing mutations (updates and deletes) in an efficient and scalable manner. Unlike Online Transaction Processing (OLTP) systems, BigQuery has limits on the frequency of mutations that can be performed. This means that if you need to perform a large number of mutations to one or more of your tables, you cannot simply issue Data Manipulation Language (DML) statements, one at a time. In this blog post, we’ll explore why DML statement quotas are in place and we’ll look at some approaches for accomplishing large scale mutations without going over your DML quotas. For example, an increasingly common scenario that calls for bulk mutations involves personal data deletion and update requests from users (EU citizens, in particular) following the passage of General Data Protection Regulation (GDPR) in the European Union.

Why quotas?

Firstly, BigQuery is not unique among OLAP databases in being constrained on mutation frequency, either explicitly (through quotas) or implicitly (resulting in significant performance degradation), since these types of databases are optimized for large scale ingestion and for analytical queries as opposed to transaction processing. Furthermore, BigQuery allows you to read the state of a table at any instant in time during the preceding 7 days. This look-back window requires retaining data that has already been deleted by the user. In order to support cost-effective and efficient queries at scale, we limit the frequency of mutations using quotas.

With that said, users will be glad to learn that BigQuery supports a large volume of operations in a single command. Specifically, you can issue a DML operation against an unlimited number of rows within a table in a big batch. In the following paragraphs, we’ll go over a few representative scenarios along with recommended approaches for performing large scale mutations within BigQuery.

Note: Before continuing, we should point out that if you still need to increase your quotas, you can reach out to us through your account team.

Batch mutations

A common scenario within OLAP systems involves updating existing data based on new information arriving from source systems (such as OLTP databases) on a periodic basis. In the retail business, inventory updates are typically done in this fashion. The following query demonstrates how to perform batch updates to the Inventory table based on the contents of another table (where new arrivals are kept) using the MERGE statement in BigQuery:

Language: SQL

  MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.ProductID = S.ProductID
WHEN MATCHED THEN
  UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
  INSERT (ProductID, quantity) VALUES (ProductID, quantity)

You could run this query on a regular schedule (daily, for instance) and perform updates in a wholesale fashion without going over BigQuery’s DML quotas.

Correlated queries

Have you ever needed to delete inventory rows as entire product lines get retired? You can do that in BigQuery using a correlated query as shown below:

Language: SQL

  DELETE dataset.Inventory i1
WHERE NOT EXISTS
(
   SELECT * FROM dataset.NewArrivals i2 WHERE i1.ProductID = i2.ProductID
)

This single query deletes the appropriate rows in the Inventory table based on the result of a sub-query. You could also use this approach for performing updates.

Streaming inserts and timestamp-aware queries

As shown above, you can update a large number of rows in BigQuery by batching mutations. What if your workload cannot tolerate too much latency between the time that updates and deletes are made in your OLTP system and the time these updates are reflected in BigQuery (OLAP system)—akin to a Change Data Capture (CDC) workflow? In this case, you can use the BigQuery streaming API to insert rows containing new values paired with timestamps into the corresponding tables. In other words, you can stream the change log of updates to the corresponding BigQuery Table. For example, the following table, which is sorted in ascending order by RecordTime, shows multiple entries for a single product with ProductID: SKU341, with different values for the Quantity field. The oldest record appears at the top:

ProductIDQuantityDescriptionIsDeletedRecordTime
SKU34120Blue GuitarFALSE2018-06-10 12:00:00
SKU3412Blue GuitarFALSE2018-06-11 08:12:50
SKU341130Blue GuitarFALSE2018-06-14 10:54:17

Note: ProductID serves as the primary key in the source systems from which data is imported into BigQuery.

Once data has been streamed into the table, you can use the associated RecordTime field as shown in the sample query below to ensure that only the most recent values are used in the aggregation:

Language: SQL

  SELECT row.ProductID, SUM(row.quantity) AS TotalQuantity FROM 
(
   SELECT
      ARRAY_AGG(i1 ORDER BY i1.RecordTime DESC LIMIT 1)[OFFSET(0)] AS row
   FROM dataset.Inventory i1
      WHERE i1.RecordTime > '2018-06-14'
      GROUP BY i1.ProductID
)
WHERE NOT row.IsDeleted
GROUP BY row.ProductID

In this case, the Inventory table is partitioned by time on the RecordTime column, which helps keep costs down and improve performance by allowing us to apply a partition filter (WHERE i1.RecordTime > '2018-06-14') as shown in the query above. Partitioning also helps keep costs down during deletion of stale records, as described below.

If your workload calls for large scale deletion of data with low latency then you can once again use streaming inserts to keep track of deleted records. Deleted records from the source can be inserted into the same table using the streaming API with the IsDeleted column set to TRUE along with the RecordTime. A filter can then be used to ensure that these (logically) deleted records are not included in the aggregation results. This is shown in the query above: WHERE NOT row.IsDeleted, as part of the outer query.

When using timestamps to keep track of updated and deleted records, it’s a good idea to periodically delete stale entries. To illustrate, the following pair of DML statements can be used to remove older versions as well as deleted records.

Language: SQL

  -- remove stale entries
DELETE dataset.Inventory i1
WHERE
i1.RecordTime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 HOUR) AND
i1.RecordTime < (SELECT MAX(RecordTime) FROM dataset.Inventory i2 
WHERE 
i1.ProductID = i2.ProductID)

Language: SQL

  -- remove entries marked as deleted along with corresponding stale entries
DELETE dataset.Inventory i1
WHERE
i1.RecordTime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 HOUR) AND
i1.RecordTime <= (SELECT MAX(RecordTime) FROM dataset.Inventory i2 
WHERE 
i1.ProductID = i2.ProductID AND i2.IsDeleted)

You’ll notice that the above DELETE statements don’t attempt to remove records that are newer than 3 hours. This is because data in BigQuery’s streaming buffer is not immediately available for UPDATE, DELETE, or MERGE operations, as described in DML Limitations. These queries assume that the actual values for RecordTime roughly match the actual ingestion time.

And lastly, while this example doesn’t explicitly deal with customer data, if your organization stores customer data, this approach can be easily adapted for handling “right to be forgotten” requests as mandated by GDPR.

Note: All of the mutations described above are ACID compliant, so there’s no risk of data being left in a halfway-state due to race conditions.

Summary

Mutations are a necessary part of many analytics workflows—especially in light of GDPR. And by following the guidelines above, you can perform large scale mutations in BigQuery without overrunning your DML quotas.