Sharding of timestamp-ordered data in Cloud Spanner
Karthi Thyagarajan
Senior Staff Solutions Architect, Spanner
Cloud Spanner was designed from the ground up to offer horizontal scalability and a developer-friendly SQL interface. As a managed service, Google Cloud handles most database management tasks, but it’s up to you to ensure that there are no hotspots, as described in Schema Design Best Practices and Optimizing Schema Design for Cloud Spanner. In this article, we’ll look at how to efficiently insert and retrieve records with timestamp ordering. We’ll start with the high-level guidance provided in Anti-pattern: timestamp ordering and explore the scenario in more detail with a concrete example.
Scenario
Let’s say we’re building an app that logs user activity along with timestamps and also allows users to query this activity by user id and time range. A good primary key for the table storing user activity (let’s call it LogEntries) is (UserId, Timestamp), as this gives us a uniform distribution of activity logs. Cloud Spanner inserts log entries sequentially, but they’re naturally sharded by UserId, resulting in uniform key distribution.Table LogEntries
UserId (PK) | Timestamp (PK) | LogEntry |
15b7bd1f-8473 | 2018-05-01T15:16:03.386257Z |
Here’s a sample query to retrieve a list of log entries by user and time range:
This query takes advantage of the primary key and thus performs well.
Now let’s make things more interesting. What if we wanted to group users by the company they work for so we can segment reports by company? This is a fairly common use case for Cloud Spanner, especially with multi-tenant SaaS applications. To support this, we create a table with the following schema.
Table LogEntries
CompanyId (PK) | UserId (PK) | Timestamp (PK) | LogEntry |
Acme | 15b7bd1f-8473 | 2018-05-01T15:16:03.386257Z |
And here’s the corresponding query to retrieve the log entries:
Here’s the query to retrieve log entries by CompanyId and time range (user field not specified):
To support the above query, we add a separate, secondary index. Initially, we include just two columns:
Challenge: hotspots during inserts
The challenge here is that some companies may have a lot more (orders of magnitude more) users than others, resulting in a very skewed distribution of log entries. The challenge is particularly acute during inserts as described in the opening paragraph above. And even if Cloud Spanner helps out by creating additional splits, nodes that service new splits become hotspots due to uneven key distribution.
The above diagram depicts a scenario where Company B has three times more users than Company A or Company C. Therefore, log entries corresponding to Company B grow at a higher rate, resulting in the hotspotting of nodes that service the splits where Company B’s log entries are being inserted.
Hotspot mitigation
There are multiple aspects to our hotspot mitigation strategy: schema design, index design and querying. Let’s look at each of these below.Schema and index design
As described in Anti-pattern: timestamp ordering, we’ll use application-level sharding to distribute data evenly. Let’s look at one particular approach for our scenario: instead of (CompanyId, UserId, Timestamp), we’ll use (UserId, CompanyId, Timestamp).Table LogEntries (reorder columns CompanyId and UserId in Primary Key)
UserId (PK) | CompanyId (PK) | Timestamp (PK) | LogEntry |
15b7bd1f-8473 | Acme | 2018-05-01T15:16:03.386257Z |
By placing UserId before CompanyId in the primary key, we can mitigate the hotspots caused by the non-uniform distribution of log entries across companies.
Now let’s look at the secondary index on CompanyId and timestamp. Since this index is meant to support queries that specify just CompanyId and timestamp, we cannot address the distribution problem by simply incorporating UserId. Keep in mind that indexes are also susceptible to hotspots and we need to design them so that their distribution is uniform.
To address this, we’ll add a new column, EntryShardId, where (in pseudo-code):
The hash function here could be a simple crc32 operation. Here’s a python snippet illustrating how to calculate this hash function before a log entry is inserted:
In this case, num_shards = 10. You can adjust this value based on the characteristics of your workload. For instance, if one company in our scenario generates 100 times more log entries on average than the other companies, then we would pick 100 for num_shards in order to achieve a uniform distribution across entries from all companies.
This hashing approach essentially takes the sequential, timestamp-ordered LogEntriesByCompany index entries for a particular company and distributes them across multiple application (or logical) shards. In this case, we have 10 such shards per company, resulting from the crc32 and modulo operations shown above.
Table LogEntries (with EntryShardId added)
CompanyId (PK) | UserId (PK) | Timestamp (PK) | EntryShardId | LogEntry |
‘Acme’ | 1 | 2018-05-01T15:16:03.386257Z | 8 |
And the index:
Querying
Evenly distributing data using a sharding approach is great for inserts but how does it affect retrieval? Application-level sharding is no good to us if we cannot retrieve the data efficiently. Let’s look at how we would query for a list of log entries by CompanyId and time range, but without UserId:The above query illustrates how to perform a timestamp range retrieval while taking sharding into account. By including the ShardedEntryId in the query above, we tell Spanner to ‘look’ in all 10 logical shards to retrieve the timestamp entries for CompanyId ‘Acme’ for a particular range.
Cloud Spanner is a full-featured relational database service that relieves you of most—but not all—database management tasks. For more information on Cloud Spanner management best practices, check out the recommended reading.
Anti-pattern: timestamp ordering
Optimizing Schema Design for Cloud Spanner
Best Practices for Schema Design