This page describes change streams in Spanner for GoogleSQL-dialect databases and PostgreSQL-dialect databases, including:
- The split-based partitioning model
- The format and content of change stream records
- The low-level syntax used to query those records
- An example of the query workflow
You use the Spanner API to query change streams directly. Applications that instead use Dataflow to read change stream data don't need to work directly with the data model described here.
For a broader introductory guide to change streams, see Change streams overview.
Change stream partitions
When a change occurs on a table that is watched by a change stream, Spanner writes a corresponding change stream record in the database, synchronously in the same transaction as the data change. This means that if the transaction succeeds, Spanner has also successfully captured and persisted the change. Internally, Spanner co-locates the change stream record and the data change so that they are processed by the same server to minimize write overhead.
As part of the DML to a particular split, Spanner appends the write to the corresponding change stream data split in the same transaction. Because of this colocation, change streams don't add extra coordination across serving resources, which minimizes the transaction commit overhead.
Spanner scales by dynamically splitting and merging data based on database load and size, and distributing splits across serving resources.
To enable change streams writes and reads to scale, Spanner splits and merges the internal change stream storage along with the database data, automatically avoiding hotspots. To support reading change stream records in near real-time as database writes scale, the Spanner API is designed for a change stream to be queried concurrently using change stream partitions. Change stream partitions map to change stream data splits that contain the change stream records. A change stream's partitions change dynamically over time and are correlated to how Spanner dynamically splits and merges the database data.
A change stream partition contains records for an immutable key range for a specific time range. Any change stream partition can split into one or more change stream partitions, or be merged with other change stream partitions. When these split or merge events happen, child partitions are created to capture the changes for their respective immutable key ranges for the next time range. In addition to data change records, a change stream query returns child partition records to notify readers of new change stream partitions that need to be queried, as well as heartbeat records to indicate forward progress when no writes have occurred recently.
When querying a particular change stream partition, the change records are returned in commit timestamp order. Each change record is returned exactly once. Across change stream partitions, change record ordering is not guaranteed. Change records for a particular primary key are returned only on one partition for a particular time range.
Due to the parent-child partition lineage, in order to process changes for a particular key in commit timestamp order, records returned from child partitions should be processed only after records from all parent partitions have been processed.
Change stream read functions and query syntax
GoogleSQL
To query change streams, use the
ExecuteStreamingSql
API. Spanner automatically creates a special read function
along with the change stream. The read function provides access to the
change stream's records. The read function naming convention is
READ_change_stream_name
.
Assuming a change stream SingersNameStream
exists in the database, the
query syntax for GoogleSQL is the following:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
The read function accepts the following arguments:
Argument name | Type | Required? | Description |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Required | Specifies that records with commit_timestamp greater
than or equal to start_timestamp should be returned. The
value must be within the change stream retention period, and should be
less than or equal to the current time, and greater than or equal to
the timestamp of the change stream's creation. |
end_timestamp |
TIMESTAMP |
Optional (Default: NULL ) |
Specifies that records with a commit_timestamp less
than or equal to end_timestamp should be returned. The
value must be within the change stream retention
period and greater or equal than the start_timestamp . The
query finishes either after returning all ChangeRecords
up to the end_timestamp or when you terminate the
connection. If end_timestamp is set to NULL
or isn't specified, the query continues execution until all
ChangeRecords are returned or until you terminate the
connection. |
partition_token |
STRING |
Optional (Default: NULL ) |
Specifies which change stream partition to query, based on the
content of child partitions
records. If NULL or not specified, this means the
reader is querying the change stream for the first time, and has
not obtained any specific partition tokens to query from. |
heartbeat_milliseconds |
INT64 |
Required | Determines how frequently a heartbeat ChangeRecord is
returned in case there are no transactions committed in this
partition.
The value must be between 1,000 (one second) and
300,000 (five minutes). |
read_options |
ARRAY |
Optional (Default: NULL ) |
Adds read options reserved for future use. The only
permitted value is NULL . |
We recommend making a helper method for building the text of the read function query and binding parameters to it, as shown in the following example.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
To query change streams, use the
ExecuteStreamingSql
API. Spanner automatically creates a special read function
along with the change stream. The read function provides access to the
change stream's records. The read function naming convention is
spanner.read_json_change_stream_name
.
Assuming a change stream SingersNameStream
exists in the database, the
query syntax for PostgreSQL is the following:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
The read function accepts the following arguments:
Argument name | Type | Required? | Description |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Required | Specifies that change records with commit_timestamp
greater than or equal to start_timestamp
should be returned. The value must be within the change stream
retention period, and should be less than or equal to the current
time, and greater than or equal to the timestamp of the change
stream's creation. |
end_timestamp |
timestamp with timezone |
Optional (Default: NULL ) |
Specifies that change records with commit_timestamp
less than or equal to end_timestamp should
be returned. The value must be within the change stream retention
period and greater or equal than the start_timestamp .
The query finishes either after returning all change records up to
the end_timestamp or until you terminate the connection.
If NULL , the query continues execution until all change
records are returned or until you terminate the connection. |
partition_token |
text |
Optional (Default: NULL ) |
Specifies which change stream partition to query, based on the
content of child partitions
records. If NULL or not specified, this means the
reader is querying the change stream for the first time, and has
not obtained any specific partition tokens to query from. |
heartbeat_milliseconds |
bigint |
Required | Determines how frequently a heartbeat ChangeRecord is
returned when there are no transactions committed in this
partition.
The value must be between 1,000 (one second) and
300,000 (five minutes). |
null |
null |
Required | Reserved for future use |
We recommend making a helper method for building the text of the read function and binding parameters to it, as shown in the following example.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Change streams record format
GoogleSQL
The change streams read function returns a single ChangeRecord
column of
type ARRAY<STRUCT<...>>
. In each row, this array always contains a single
element.
The array elements have the following type:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
There are three fields in this STRUCT
: data_change_record
,
heartbeat_record
and child_partitions_record
, each of type
ARRAY<STRUCT<...>>
. In any row that the change stream read function
returns, only one of these three fields contains a value; the others two
are empty or NULL
. These array fields contain, at most, one element.
The following sections examine each of these three record types.
PostgreSQL
The change streams read function returns a single ChangeRecord
column of
type JSON
with the following structure:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
There are three possible keys in this object: data_change_record
,
heartbeat_record
and child_partitions_record
, the corresponding value
type is JSON
. In any row that the change stream read function returns,
only one of these three keys exists.
The following sections examine each of these three record types.
Data change records
A data change record contains a set of changes to a table with the same modification type (insert, update, or delete) committed at the same commit timestamp in one change stream partition for the same transaction. Multiple data change records can be returned for the same transaction across multiple change stream partitions.
All data change records have commit_timestamp
, server_transaction_id
,
and record_sequence
fields, which together determine the order in the change
stream for a stream record. These three fields are sufficient to derive
the ordering of changes and provide external consistency.
Note that multiple transactions can have the same commit timestamp if
they touch non-overlapping data. The server_transaction_id
field
offers the ability to distinguish which set of changes (potentially
across change stream partitions) were issued within the same
transaction. Pairing it with the record_sequence
and
number_of_records_in_transaction
fields allows you buffer and order
all the records from a particular transaction, as well.
The fields of a data change record include the following:
GoogleSQL
Field | Type | Description |
---|---|---|
commit_timestamp |
TIMESTAMP |
Indicates the timestamp in which the change was committed. |
record_sequence |
STRING |
Indicates the sequence number for the record within the transaction.
Sequence numbers are unique and monotonically increasing (but not
necessarily contiguous) within a transaction. Sort the records for the same
server_transaction_id by record_sequence to
reconstruct the ordering of the changes within the transaction.
Spanner might optimize this ordering for better performances
and it might not always match the original ordering that you provide. |
server_transaction_id |
STRING |
Provides a globally unique string that represents the transaction in which the change was committed. The value should only be used in the context of processing change stream records and is not correlated with the transaction id in Spanner's API. |
is_last_record_in_transaction_in_partition |
BOOL |
Indicates whether this is the last record for a transaction in the current partition. |
table_name |
STRING |
Name of the table affected by the change. |
value_capture_type |
STRING |
Describes the value capture type that was specified in the change stream configuration when this change was captured. The value capture type can be one of the following:
By default, it is |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indicates the name of the column, the column type,
whether it is a primary key, and the position of the column as
defined in the schema (ordinal_position ). The first column of a
table in the schema would have an ordinal position of 1 . The
column type may be nested for array columns. The format matches the type
structure described in the Spanner API reference.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Describes the changes that were made, including the primary key
values, the old values, and the new values of the changed or tracked columns.
The availability and content of the old and new values depends on the
configured value_capture_type . The new_values and
old_values fields only contain the non-key columns. |
mod_type |
STRING |
Describes the type of change. One of INSERT ,
UPDATE , or DELETE . |
number_of_records_in_transaction |
INT64 |
Indicates the number of data change records that are part of this transaction across all change stream partitions. |
number_of_partitions_in_transaction |
INT64 |
Indicates the number of partitions that return data change records for this transaction. |
transaction_tag |
STRING |
Indicates the Transaction tag associated with this transaction. |
is_system_transaction |
BOOL |
Indicates whether the transaction is a system transaction. |
PostgreSQL
Field | Type | Description |
---|---|---|
commit_timestamp |
STRING |
Indicates the timestamp at which the change was committed. |
record_sequence |
STRING |
Indicates the sequence number for the record within the transaction.
Sequence numbers are unique and monotonically increasing (but not
necessarily contiguous) within a transaction. Sort the records for the same
server_transaction_id by record_sequence to
reconstruct the ordering of the changes within the transaction. |
server_transaction_id |
STRING |
Provides a globally unique string that represents the transaction in which the change was committed. The value should only be used in the context of processing change stream records and is not correlated with the transaction id in Spanner's API |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indicates whether this is the last record for a transaction in the current partition. |
table_name |
STRING |
Indicates the name of the table affected by the change. |
value_capture_type |
STRING |
Describes the value capture type that was specified in the change stream configuration when this change was captured. The value capture type can be one of the following:
By default, it is |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indicates the name of the column, the column type,
whether it's a primary key, and the position of the column as
defined in the schema (ordinal_position ). The first column of a
table in the schema would have an ordinal position of 1 . The
column type may be nested for array columns. The format matches the type
structure described in the
Spanner API reference.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Describes the changes that were made, including the primary key
values, the old values, and the new values of the changed or tracked
columns. The availability and content of the old and new values depends
on the configured value_capture_type . The
new_values and old_values fields only contain the
non-key columns.
|
mod_type |
STRING |
Describes the type of change. One of INSERT ,
UPDATE , or DELETE . |
number_of_records_in_transaction |
INT64 |
Indicates the number of data change records that are part of this transaction across all change stream partitions. |
number_of_partitions_in_transaction |
NUMBER |
Indicates the number of partitions that return data change records for this transaction. |
transaction_tag |
STRING |
Indicates the Transaction tag associated with this transaction. |
is_system_transaction |
BOOLEAN |
Indicates whether the transaction is a system transaction. |
Example data change record
A pair of example data change records follow. They describe a single transaction where there is a transfer between two accounts. The two accounts are in separate change stream partitions.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
The following data change record is an example of a record with the value
capture type NEW_VALUES
. Note that only new values are populated.
Only the LastUpdate
column was modified, so only that column
was returned.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
The following data change record is an example of a record with the value
capture type NEW_ROW
. Only the LastUpdate
column was modified, but all tracked columns are returned.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
The following data change record is an example of a record with the value
capture type NEW_ROW_AND_OLD_VALUES
. Only the LastUpdate
column was
modified, but all tracked columns are returned. This value capture type captures
the new value and old value of LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Heartbeat records
When a heartbeat record is returned, it indicates that all changes with
commit_timestamp
less than or equal to the heartbeat record's
timestamp
have been returned, and future data records in this
partition must have higher commit timestamps than that returned by the
heartbeat record. Heartbeat records are returned when there are no data
changes written to a partition. When there are data changes written to
the partition, data_change_record.commit_timestamp
can be used instead
of heartbeat_record.timestamp
to tell that the reader is making forward
progress in reading the partition.
You can use heartbeat records returned on partitions to synchronize
readers across all partitions. Once all readers have received either a
heartbeat greater than or equal to some timestamp A
or have received data or
child partition records greater than or equal to timestamp A
, the readers know
they have received all records committed at or before that timestamp A
and can
start processing the buffered records—for example, sorting the cross-partition
records by timestamp and grouping them by server_transaction_id
.
A heartbeat record contains only one field:
GoogleSQL
Field | Type | Description |
---|---|---|
timestamp |
TIMESTAMP |
Indicates the heartbeat record's timestamp. |
PostgreSQL
Field | Type | Description |
---|---|---|
timestamp |
STRING |
Indicates the heartbeat record's timestamp. |
Example heartbeat record
An example heartbeat record, communicating that all records with timestamps less or equal than this record's timestamp have been returned:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Child partition records
Child partition records returns information about child partitions: their
partition tokens, the tokens of their parent partitions, and the
start_timestamp
that represents the earliest timestamp that the child
partitions contain change records for. Records whose commit timestamps
are immediately prior to the child_partitions_record.start_timestamp
are
returned in the current partition. After returning all the
child partition records for this partition, this query returns with
a success status, indicating all records have been returned for this
partition.
The fields of a child partition record includes the following:
GoogleSQL
Field | Type | Description |
---|---|---|
start_timestamp |
TIMESTAMP |
Indicates that the data change records returned from child
partitions in this child partition record have a commit timestamp
greater than or equal to start_timestamp . When querying a child
partition, the query should specify the child partition token and a
start_timestamp greater than or equal to
child_partitions_token.start_timestamp . All child partitions
records returned by a partition have the same start_timestamp
and the timestamp always falls between the query's specified
start_timestamp and end_timestamp . |
record_sequence |
STRING |
Indicates a monotonically increasing sequence number that can be used to
define the ordering of the child partition records when there are multiple
child partition records returned with the same start_timestamp
in a particular partition. The partition token, start_timestamp
and record_sequence uniquely identify a child partition record.
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
Returns a set of child partitions and their associated information. This includes the partition token string used to identify the child partition in queries, as well as the tokens of its parent partitions. |
PostgreSQL
Field | Type | Description |
---|---|---|
start_timestamp |
STRING |
Indicates that the data change records returned from child
partitions in this child partition record have a commit timestamp
greater than or equal to start_timestamp . When querying a child
partition, the query should specify the child partition token and a
start_timestamp greater than or equal to
child_partitions_token.start_timestamp . All child partitions
records returned by a partition have the same start_timestamp
and the timestamp always falls between the query's specified
start_timestamp and end_timestamp .
|
record_sequence |
STRING |
Indicates a monotonically increasing sequence number that can be used to
define the ordering of the child partition records when there are multiple
child partition records returned with the same start_timestamp
in a particular partition. The partition token, start_timestamp
and record_sequence uniquely identify a child partition record.
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
Returns an array of child partitions and their associated information. This includes the partition token string used to identify the child partition in queries, as well as the tokens of its parent partitions. |
Example child partition record
The following is an example of a child partition record:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Change streams query workflow
Run change stream queries using the
ExecuteStreamingSql
API, with a single-use
read-only
transaction and a
strong timestamp bound. The change
stream read function lets you specify the start_timestamp
and
end_timestamp
for the time range of interest. All change records
within the retention period are accessible using the strong read-only
timestamp bound.
All other
TransactionOptions
are invalid for change stream queries. In addition,
if TransactionOptions.read_only.return_read_timestamp
is set to true
,
a special value of kint64max - 1
is returned in the Transaction
message that describes the transaction, instead of a valid read
timestamp. This special value should be discarded and not used for any
subsequent queries.
Each change stream query can return any number of rows, each containing either a data change record, heartbeat record, or child partitions record. There is no need to set a deadline for the request.
Example change stream query workflow
The streaming query workflow begins with issuing the very first change stream
query by specifying the partition_token
to NULL
. The query needs to specify
the read function for the change stream, start and end timestamp of interest,
and the heartbeat interval. When the end_timestamp
is NULL
, the query keeps
returning data changes until the partition ends.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Process data records from this query until all child partition records are
returned. In the following example, two child partition records and three
partition tokens are returned, then the query terminates. Child partition
records from a specific query always shares the same start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
To process changes after 2022-05-01T09:00:01Z
, create three new queries and
run them in parallel. Used together, the three queries return data changes for
the same key range their parent covers. Always set the start_timestamp
to the
start_timestamp
in the same child partition record and use the same
end_timestamp
and heartbeat interval to process the records consistently
across all queries.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
The query on child_token_2
finishes after returning another child partition
record. This record indicates that a new partition is covering changes for both
child_token_2
and child_token_3
starting at 2022-05-01T09:30:15Z
. The
exact same record is returned by the query on child_token_3
, because both are
the parent partitions of the new child_token_4
. To ensure a strict ordered
processing of data records for a particular key, the query on child_token_4
must start after all the parents have finished. In this case, the parents are
child_token_2
and child_token_3
. Only create one query for each child
partition token. The query workflow design should appoint one parent to wait and
schedule the query on child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Find examples of handling and parsing change stream records in the Apache Beam SpannerIO Dataflow connector on GitHub.