This page describes the following attributes of change streams in detail:
- Its split-based partitioning model
- The format and content of change stream records
- The low-level syntax used to query those records
- An example of the query workflow
The information on this page is most relevant for using the Spanner API to query change streams directly. Applications that instead use Dataflow to read change stream data do not need to work directly with the data model described here.
For a broader introductory guide to change streams, see Change streams overview.
Change stream partitions
When a change occurs on a table that is watched by a change stream, Spanner writes a corresponding change stream record in the database, synchronously in the same transaction as the data change. This guarantees that if the transaction succeeds, Spanner has also successfully captured and persisted the change. Internally, Spanner co-locates the change stream record and the data change so that they are processed by the same server to minimize write overhead.
As part of the DML to a particular split, Spanner appends the write to the corresponding change stream data split in the same transaction. Because of this colocation, change streams do not add extra coordination across serving resources, which minimizes the transaction commit overhead.
Spanner scales by dynamically splitting and merging data based on database load and size, and distributing splits across serving resources.
To enable change streams writes and reads to scale, Spanner splits and merges the internal change stream storage along with the database data, automatically avoiding hotspots. To support reading change stream records in near real-time as database writes scale, the Spanner API is designed for a change stream to be queried concurrently using change stream partitions. Change stream partitions map to change stream data splits that contain the change stream records. A change stream's partitions change dynamically over time and are correlated to how Spanner dynamically splits and merges the database data.
A change stream partition contains records for an immutable key range for a specific time range. Any change stream partition can split into one or more change stream partitions, or be merged with other change stream partitions. When these split or merge events happen, child partitions are created to capture the changes for their respective immutable key ranges for the next time range. In addition to data change records, a change stream query returns child partition records to notify readers of new change stream partitions that need to be queried, as well as heartbeat records to indicate forward progress when no writes have occurred recently.
When querying a particular change stream partition, the change records are returned in commit timestamp order. Each change record is returned exactly once. Across change stream partitions, there is no guaranteed ordering of change records. Change records for a particular primary key are returned only on one partition for a particular time range.
Due to the parent-child partition lineage, in order to process changes for a particular key in commit timestamp order, records returned from child partitions should be processed only after records from all parent partitions have been processed.
Change stream read functions and query syntax
GoogleSQL
You query change streams by using the
ExecuteStreamingSql
API. Spanner automatically creates a special read function along
with the change stream. The read function provides access to the change
stream's records. The read function naming convention is
READ_change_stream_name
.
Assuming a change stream SingersNameStream
exists in the database, the
query syntax for GoogleSQL is the following:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
The read function accepts the following arguments:
Argument name | Type | Required? | Description |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Required | Specifies that records with commit_timestamp greater than or equal to start_timestamp
should be returned. The value must be within the change stream
retention period, and should be less than or equal to the current time,
and greater than or equal to the timestamp of the change stream's creation. |
end_timestamp |
TIMESTAMP |
Optional (Default: NULL ) |
Specifies that records with commit_timestamp less
than or equal to end_timestamp should
be returned. The value must be within the change stream retention
period and greater or equal than the start_timestamp . The query
finishes either after returning all ChangeRecords up to end_timestamp
or the user terminates the connection. If NULL or not
specified, the query executes until all ChangeRecords are returned or the
user terminates the connection. |
partition_token |
STRING |
Optional (Default: NULL ) |
Specifies which change stream partition to query, based on the
content of child partitions
records. If NULL or not specified, this means the
reader is querying the change stream for the first time, and has
not obtained any specific partition tokens to query from. |
heartbeat_milliseconds |
INT64 |
Required | Determines how frequently a heartbeat ChangeRecord is returned
in case there are no transactions committed in this partition.
The value must be between 1,000 (one second) and 300,000 (five
minutes). |
read_options |
ARRAY |
Optional (Default: NULL ) |
Additional read options reserved for the future use. Currently, the only allowed value is NULL . |
We recommend making a convenience method for building the text of the read function query and binding parameters to it, as shown in the following example.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
You query change streams by using the
ExecuteStreamingSql
API.
Spanner automatically creates a special read function along
with the change stream. The read function provides access to the change
stream's records. The read function naming convention is
spanner.read_json_change_stream_name
.
Assuming a change stream SingersNameStream
exists in the database, the
query syntax for PostgreSQL is the following:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
The read function accepts the following arguments:
Argument name | Type | Required? | Description |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Required | Specifies that change records with commit_timestamp greater than or equal to start_timestamp
should be returned. The value must be within the change stream
retention period, and should be less than or equal to the current time,
and greater than or equal to the timestamp of the change stream's creation. |
end_timestamp |
timestamp with timezone |
Optional (Default: NULL ) |
Specifies that change records with commit_timestamp
less than or equal to end_timestamp should
be returned. The value must be within the change stream retention
period and greater or equal than the start_timestamp .
The query finishes either after returning all change records up to
end_timestamp or the user terminates the connection.
If NULL the query executes until all change records are returned
or the user terminates the connection. |
partition_token |
text |
Optional (Default: NULL ) |
Specifies which change stream partition to query, based on the
content of child partitions
records. If NULL or not specified, this means the
reader is querying the change stream for the first time, and has
not obtained any specific partition tokens to query from. |
heartbeat_milliseconds |
bigint |
Required | Determines how frequently a heartbeat ChangeRecord will be returned
in case there are no transactions committed in this partition.
The value must be between 1,000 (one second) and 300,000 (five
minutes). |
null |
null |
Required | Reserved for future use |
We recommend making a convenience method for building the text of the read function and binding parameters to it, as shown in the following example.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Change streams record format
GoogleSQL
The change streams read function returns a single ChangeRecord
column of type
ARRAY<STRUCT<...>>
. In each row, this array always contains a single element.
The array elements have the following type:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
There are three fields in this struct: data_change_record
,
heartbeat_record
and child_partitions_record
, each of type
ARRAY<STRUCT<...>>
. In any row that the change stream read function
returns, only one of these three fields contains a value; the others two
are empty or NULL
. These array fields contain, at most, one element.
The following sections examine each of these three record types.
PostgreSQL
The change streams read function returns a single ChangeRecord
column of
type JSON
with the following structure:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
There are three possible keys in this object: data_change_record
,
heartbeat_record
and child_partitions_record
, the corresponding value
type is JSON
.
In any row that the change stream read function returns, only
one of these three keys exists.
The following sections examine each of these three record types.
Data change records
A data change record contains a set of changes to a table with the same modification type (insert, update, or delete) committed at the same commit timestamp in one change stream partition for the same transaction. Multiple data change records can be returned for the same transaction across multiple change stream partitions.
All data change records have commit_timestamp
, server_transaction_id
,
and record_sequence
fields, which together determine the order in the change
stream for a stream record. These three fields are sufficient to derive
the ordering of changes and provide external consistency.
Note that multiple transactions can have the same commit timestamp if
they touch non-overlapping data. The server_transaction_id
field
offers the ability to distinguish which set of changes (potentially
across change stream partitions) were issued within the same
transaction. Pairing it with the record_sequence
and
number_of_records_in_transaction
fields allows you buffer and order
all the records from a particular transaction, as well.
The fields of a data change record include the following:
GoogleSQL
Field | Type | Description |
---|---|---|
commit_timestamp |
TIMESTAMP |
The timestamp in which the change was committed. |
record_sequence |
STRING |
The sequence number for the record within the transaction. Sequence numbers are guaranteed to
be unique and monotonically increasing (but not necessarily contiguous) within a transaction. Sort the records for the same
server_transaction_id by record_sequence to
reconstruct the ordering of the changes within the transaction.
This ordering might be optimized by Spanner for better performances and it might not always match the original ordering that users provide. |
server_transaction_id |
STRING |
A globally unique string that represents the transaction in which the change was committed. The value should only be used in the context of processing change stream records and is not correlated with the transaction id in Spanner's API. |
is_last_record_in_transaction_in_partition |
BOOL |
Indicates whether this is the last record for a transaction in the current partition. |
table_name |
STRING |
Name of the table affected by the change. |
value_capture_type |
STRING |
Describes the value capture type that was specified in the change stream configuration when this change was captured. The value capture type can be |
column_types |
ARRAY<STRUCT< |
The name of the column, the column type, whether it is a primary key, and the position of the column as defined in the schema (`ordinal_position`). The first column of a table in the schema would have an ordinal position of `1`. The column type may be nested for array columns. The format matches the type structure described in the Spanner API reference. |
mods |
ARRAY<STRUCT< |
Describes the changes that were made, including the primary key
values, the old values, and the new values of the changed or tracked columns.
The availability and content of the old and new values will depend on the configured value_capture_type. The new_values and old_values fields only contain the non-key columns. |
mod_type |
STRING |
Describes the type of change. One of INSERT , UPDATE , or
DELETE . |
number_of_records_in_transaction |
INT64 |
The number of data change records that are part of this transaction across all change stream partitions. |
number_of_partitions_in_transaction |
INT64 |
The number of partitions that will return data change records for this transaction. |
transaction_tag |
STRING |
Transaction tag associated with this transaction. |
is_system_transaction |
BOOL |
Indicates whether the transaction is a system transaction. |
PostgreSQL
Field | Type | Description |
---|---|---|
commit_timestamp |
STRING |
The timestamp at which the change was committed. |
record_sequence |
STRING |
The sequence number for the record within the transaction. Sequence numbers are guaranteed to be unique and monotonically increasing (but not necessarily contiguous) within a transaction. Sort the records for the same `server_transaction_id` by `record_sequence` to reconstruct the ordering of the changes within the transaction. |
server_transaction_id |
STRING |
A globally unique string that represents the transaction in which the change was committed. The value should only be used in the context of processing change stream records and is not correlated with the transaction id in Spanner's API |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indicates whether this is the last record for a transaction in the current partition. |
table_name |
STRING |
Name of the table affected by the change. |
value_capture_type |
STRING |
Describes the value capture type that was specified in the change stream configuration when this change was captured. The value capture type can be |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
The name of the column, the column type, whether it is a primary key, and the position of the column as defined in the schema (`ordinal_position`). The first column of a table in the schema would have an ordinal position of `1`. The column type may be nested for array columns. The format matches the type structure described in the Spanner API reference. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Describes the changes that were made, including the primary key
values, the old values, and the new values of the changed or tracked
columns. The availability and content of the old and new values will depend
on the configured value_capture_type. The new_values and
old_values fields only contain the non-key columns.
|
mod_type |
STRING |
Describes the type of change. One of INSERT , UPDATE , or
DELETE . |
number_of_records_in_transaction |
INT64 |
The number of data change records that are part of this transaction across all change stream partitions. |
number_of_partitions_in_transaction |
NUMBER |
The number of partitions that will return data change records for this transaction. |
transaction_tag |
STRING |
Transaction tag associated with this transaction. |
is_system_transaction |
BOOLEAN |
Indicates whether the transaction is a system transaction. |
A pair of example data change records follow. They describe a single transaction where there is a transfer between two accounts. Note that the two accounts are in separate change stream partitions.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
The following data change record is an example of a record with the value
capture type "NEW_VALUES"
. Note that only new values are populated.
Only the "LastUpdate"
column was modified, so only that column
was returned.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
The following data change record is an example of a record with the value
capture type "NEW_ROW"
. Only the "LastUpdate"
column was modified, but all tracked columns are returned.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
The following data change record is an example of a record with the value
capture type "NEW_ROW_AND_OLD_VALUES"
. Only the "LastUpdate"
column was modified, but all tracked columns are returned. This value capture
type captures the new value and old value of LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Heartbeat records
When a heartbeat record is returned, it indicates that all changes with
commit_timestamp
less than or equal to the heartbeat record's
timestamp
have been returned, and future data records in this
partition must have higher commit timestamps than that returned by the
heartbeat record. Heartbeat records are returned when there are no data
changes written to a partition. When there are data changes written to
the partition, data_change_record.commit_timestamp
can be used instead
of heartbeat_record.timestamp
to tell that the reader is making forward
progress in reading the partition.
You can use heartbeat records returned on partitions to synchronize
readers across all partitions. Once all readers have received either a
heartbeat greater than or equal to some timestamp A
or have received data or child
partition records greater than or equal to timestamp A
, the readers know they have received
all records committed at or before that timestamp A
and can start
processing the buffered records—for example, sorting the cross-partition
records by timestamp and grouping them by server_transaction_id
.
A heartbeat record contains only one field:
GoogleSQL
Field | Type | Description |
---|---|---|
timestamp |
TIMESTAMP |
The heartbeat record's timestamp. |
PostgreSQL
Field | Type | Description |
---|---|---|
timestamp |
STRING |
The heartbeat record's timestamp. |
An example heartbeat record, communicating that all records with timestamps less or equal than this record's timestamp have been returned:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Child partitions records
A child partitions record returns information about child partitions: their partition tokens, the tokens of their parent partitions, and the
start_timestamp
that represents the earliest timestamp that the child
partitions contain change records for. Records whose commit timestamps
are immediately prior to the child_partitions_record.start_timestamp
are
returned in the current partition. After returning all the
child partitions records for this partition, this query will return with
a success status, indicating all records have been returned for this
partition.
The fields of a child partitions record include the following:
GoogleSQL
Field | Type | Description |
---|---|---|
start_timestamp |
TIMESTAMP |
Data change records returned from child
partitions in this child partition record have a commit timestamp
greater than or equal to start_timestamp . When querying a child partition, the query should
specify the child partition token and a start_timestamp greater than or equal to
child_partitions_token.start_timestamp . All child partitions records
returned by a partition have the same start_timestamp and the
timestamp always falls between the query's specified start_timestamp
and end_timestamp . |
record_sequence |
STRING |
A monotonically increasing sequence
number that can be used to define the ordering of the
child partitions record when there are multiple
child partitions records returned with the same start_timestamp in a
particular partition. The partition token,
start_timestamp and
record_sequence uniquely identify a
child partitions record. |
child_partitions |
ARRAY<STRUCT< |
Returns a set of child partitions and their associated information. This includes the partition token string used to identify the child partition in queries, as well as the tokens of its parent partitions. |
PostgreSQL
Field | Type | Description |
---|---|---|
start_timestamp |
STRING |
Data change records returned from child
partitions in this child partitions record have a commit timestamp
greater than or equal to start_timestamp . When querying a child
partition, the query should specify the child partition token and a
start_timestamp greater than or equal to
child_partitions_token.start_timestamp . All child partitions
records returned by a partition have the same
start_timestamp and the timestamp always falls between the
query's specified start_timestamp and
end_timestamp .
|
record_sequence |
STRING |
A monotonically increasing sequence
number that can be used to define the ordering of the
child partitions record when there are multiple
child partitions records returned with the same start_timestamp in a
particular partition. The partition token,
start_timestamp and
record_sequence uniquely identify a
child partitions record. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Returns an array of child partitions and their associated information. This includes the partition token string used to identify the child partition in queries, as well as the tokens of its parent partitions. |
The following is an example of a child partition record:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Change streams query workflow
Run change stream queries using the
ExecuteStreamingSql
API, with a single-use
read-only
transaction and a
strong timestamp bound. The change
stream read function lets you specify the start_timestamp
and
end_timestamp
for the time range of interest. All change records
within the retention period are accessible using the strong read-only
timestamp bound.
All other
TransactionOptions
are invalid for change stream queries. In addition,
if TransactionOptions.read_only.return_read_timestamp
is set to true,
a special value of kint64max - 1
will be returned in the Transaction
message that describes the transaction, instead of a valid read
timestamp. This special value should be discarded and not used for any
subsequent queries.
Each change stream query can return any number of rows, each containing either a data change record, heartbeat record, or child partitions record. There is no need to set a deadline for the request.
Example:
The streaming query workflow begins with issuing the very first change stream
query by specifying the partition_token
to NULL
. The query needs to specify
the read function for the change stream, start and end timestamp of interest, and
the heartbeat interval. When the end_timestamp
is NULL
, the query keeps
returning data changes until the partition ends.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Process data records from this query until child partition records are
returned. In the example below, two child partition records and three partition
tokens are returned, then the query terminates. Child partition records from a
specific query always shares the same start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
To process future changes after 2022-05-01T09:00:01Z
, create three new
queries and run them in parallel. The three queries together return future
data changes for the same key range their parent covers. Always set the
start_timestamp
to the start_timestamp
in the same child partition record and
use the same end_timestamp
and heartbeat interval to process the records
consistently across all queries.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
After a while, the query on child_token_2
finishes after returning another
child partition record, this records indicates that a new partition will be
covering future changes for both child_token_2
and child_token_3
starting at
2022-05-01T09:30:15Z
. The exact same record will be returned by the query on
child_token_3
, because both are the parent partitions of the new child_token_4
.
To guarantee a strict ordered processing of data records for a particular key,
the query on child_token_4
must only start after all the parents have finished,
which in this case are child_token_2
and child_token_3
. Only create one query
for each child partition token, the query workflow design should appoint one
parent to wait and schedule the query on child_token_4
.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Find examples of handling and parsing change stream records in the Apache Beam SpannerIO Dataflow connector on GitHub.