Auf dieser Seite werden Änderungsstreams in Spanner für Datenbanken mit Google SQL- und PostgreSQL-Dialekt beschrieben. Dazu gehören:
- Das splitbasierte Partitionierungsmodell
- Format und Inhalt von Änderungsstream-Eintrags
- Die Low-Level-Syntax, die zum Abfragen dieser Einträge verwendet wird
- Beispiel für den Abfragevorgang
Mit der Spanner API können Sie Änderungsstreams direkt abfragen. Bei Anwendungen, die stattdessen Dataflow zum Lesen von Änderungsstreamdaten verwenden, muss nicht direkt mit dem hier beschriebenen Datenmodell gearbeitet werden.
Eine umfassende Einführung in Änderungsstreams finden Sie unter Änderungsstreams – Übersicht.
Streampartitionen ändern
Wenn eine Änderung an einer Tabelle auftritt, die von einem Änderungsstream beobachtet wird, schreibt Spanner einen entsprechenden Änderungsstream-Eintrag synchron in derselben Transaktion wie die Datenänderung in die Datenbank. Wenn die Transaktion erfolgreich ist, hat Spanner die Änderung also auch erfolgreich erfasst und gespeichert. Intern platziert Spanner den Änderungsstream-Eintrag und die Datenänderung an derselben Stelle, damit sie vom selben Server verarbeitet werden, um den Schreibaufwand zu minimieren.
Im Rahmen der DML für eine bestimmte Teilung hängt Spanner den Schreibvorgang in derselben Transaktion an den entsprechenden Datensplit des Änderungsstreams an. Aufgrund dieser Co-Lokation erfordern Änderungsstreams keine zusätzliche Koordination zwischen den Bereitstellungsressourcen, wodurch der Transaktions-Commit-Overhead minimiert wird.
Spanner skaliert, indem Daten dynamisch basierend auf der Datenbanklast und ‑größe aufgeteilt und zusammengeführt und die Aufteilungen auf Bereitstellungsressourcen verteilt werden.
Damit Schreib- und Lesevorgänge für Änderungsstreams skaliert werden können, teilt und verschmilzt Spanner den internen Speicher für Änderungsstreams mit den Datenbankdaten und vermeidet so automatisch Hotspots. Um das Lesen von Änderungsstream-Einträgen nahezu in Echtzeit zu unterstützen, wenn Datenbankeinträge skaliert werden, ist die Spanner API so konzipiert, dass ein Änderungsstream gleichzeitig mithilfe von Änderungsstream-Partitionen abgefragt werden kann. Änderungsstream-Partitionen werden Änderungsstream-Datensplits zugeordnet, die die Änderungsstream-Datensätze enthalten. Die Partitionen eines Änderungsstreams ändern sich dynamisch im Laufe der Zeit und hängen davon ab, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.
Eine Änderungsstream-Partition enthält Einträge für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Change-Stream-Partition kann in eine oder mehrere Change-Stream-Partitionen aufgeteilt oder mit anderen Change-Stream-Partitionen zusammengeführt werden. Wenn diese Ereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für die jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum zu erfassen. Neben Datenänderungssätzen gibt eine Änderungsstreamabfrage auch untergeordnete Partitionssätze zurück, um Leser über neue Änderungsstreampartitionen zu informieren, die abgefragt werden müssen, sowie Heartbeat-Einträge, um den Fortschritt anzuzeigen, wenn in letzter Zeit keine Schreibvorgänge stattgefunden haben.
Wenn Sie eine bestimmte Change-Stream-Partition abfragen, werden die Änderungssätze in der Reihenfolge des Commit-Zeitstempels zurückgegeben. Jeder Änderungseintrag wird genau einmal zurückgegeben. Die Reihenfolge der Änderungsdatensätze ist nicht für alle Änderungsstream-Partitionen garantiert. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.
Aufgrund der übergeordneten und untergeordneten Partitionsabstammung müssen Einträge, die von untergeordneten Partitionen zurückgegeben werden, erst verarbeitet werden, nachdem Einträge aus allen übergeordneten Partitionen verarbeitet wurden, damit Änderungen für einen bestimmten Schlüssel in der Reihenfolge des Commit-Zeitstempels verarbeitet werden können.
Funktionen für Änderungsstream-Lesevorgänge und Abfragesyntax
Verwenden Sie die ExecuteStreamingSql
API, um Änderungsstreams abzufragen. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet READ_change_stream_name
.
Angenommen, in der Datenbank ist ein Änderungsstream SingersNameStream
vorhanden, lautet die Abfragesyntax für GoogleSQL:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
Die Funktion „read“ akzeptiert die folgenden Argumente:
Name des Arguments | Typ | Erforderlich? | Beschreibung |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Erforderlich | Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb des Zeitfensters für die Aufbewahrung des Änderungsstreams liegen und kleiner oder gleich der aktuellen Uhrzeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein. |
end_timestamp |
TIMESTAMP |
Optional (Standard: NULL ) |
Gibt an, dass Einträge mit einem commit_timestamp -Wert zurückgegeben werden sollen, der kleiner oder gleich end_timestamp ist. Der Wert muss innerhalb des Zeitraums für die Aufbewahrung des Änderungsstreams liegen und größer oder gleich start_timestamp sein. Die Abfrage wird entweder beendet, nachdem alle ChangeRecords bis zur end_timestamp zurückgegeben wurden, oder wenn Sie die Verbindung beenden. Wenn end_timestamp auf NULL festgelegt ist oder nicht angegeben ist, wird die Ausführung der Abfrage fortgesetzt, bis alle ChangeRecords zurückgegeben wurden oder Sie die Verbindung beenden. |
partition_token |
STRING |
Optional (Standard: NULL ) |
Gibt an, welche Änderungsstream-Partition abgefragt werden soll, basierend auf dem Inhalt der Datensätze der untergeordneten Partitionen. Wenn NULL oder nichts angegeben ist, bedeutet das, dass der Leser den Änderungsstream zum ersten Mal abfragt und keine bestimmten Partitionstokens für die Abfrage erhalten hat. |
heartbeat_milliseconds |
INT64 |
Erforderlich | Bestimmt, wie oft ein Heartbeat ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen verbindlich gemacht wurden.
Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf Minuten) liegen. |
read_options |
ARRAY |
Optional (Standard: NULL ) |
Es werden Leseoptionen hinzugefügt, die für die zukünftige Verwendung reserviert sind. Der einzige zulässige Wert ist NULL . |
Wir empfehlen, eine Hilfsmethode zum Erstellen des Textes der Abfrage der Lesefunktion zu erstellen und Parameter daran zu binden, wie im folgenden Beispiel gezeigt.
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
Verwenden Sie die ExecuteStreamingSql
API, um Änderungsstreams abzufragen. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet spanner.read_json_change_stream_name
.
Angenommen, in der Datenbank ist ein Änderungsstream SingersNameStream
vorhanden, lautet die Abfragesyntax für PostgreSQL:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
Die Funktion „read“ akzeptiert die folgenden Argumente:
Name des Arguments | Typ | Erforderlich? | Beschreibung |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Erforderlich | Gibt an, dass Änderungseinträge mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Uhrzeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein. |
end_timestamp |
timestamp with timezone |
Optional (Standard: NULL ) |
Gibt an, dass Änderungseinträge mit einem commit_timestamp -Wert kleiner oder gleich end_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb des Zeitraums für die Aufbewahrung des Änderungsstreams liegen und größer oder gleich start_timestamp sein.
Die Abfrage wird entweder beendet, nachdem alle Änderungseinträge bis zum end_timestamp zurückgegeben wurden, oder sobald Sie die Verbindung beenden.
Bei NULL wird die Abfrage fortgesetzt, bis alle Änderungseinträge zurückgegeben wurden oder Sie die Verbindung beenden. |
partition_token |
text |
Optional (Standard: NULL ) |
Gibt an, welche Änderungsstream-Partition abgefragt werden soll, basierend auf dem Inhalt der Datensätze der untergeordneten Partitionen. Wenn NULL oder nichts angegeben ist, bedeutet das, dass der Leser den Änderungsstream zum ersten Mal abfragt und keine bestimmten Partitionstokens für die Abfrage erhalten hat. |
heartbeat_milliseconds |
bigint |
Erforderlich | Bestimmt, wie oft ein Heartbeat ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen verbindlich gemacht wurden.
Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf Minuten) liegen. |
null |
null |
Erforderlich | Reserviert für zukünftige Verwendung |
Wir empfehlen, eine Hilfsmethode zum Erstellen des Texts der Lesefunktion zu erstellen und Parameter daran zu binden, wie im folgenden Beispiel gezeigt.
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Format von Streams-Eintrags ändern
Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord
-Spalte vom Typ ARRAY<STRUCT<...>>
zurück. In jeder Zeile enthält dieses Array immer ein einzelnes Element.
Die Arrayelemente haben folgenden Typ:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Diese STRUCT
enthält drei Felder: data_change_record
, heartbeat_record
und child_partitions_record
, alle vom Typ ARRAY<STRUCT<...>>
. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder enthalten NULL
. Diese Arrayfelder enthalten höchstens ein Element.
In den folgenden Abschnitten werden die einzelnen Datensatztypen beschrieben.
Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord
-Spalte vom Typ JSON
mit der folgenden Struktur zurück:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Dieses Objekt hat drei mögliche Schlüssel: data_change_record
, heartbeat_record
und child_partitions_record
. Der entsprechende Werttyp ist JSON
. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, ist nur einer dieser drei Schlüssel vorhanden.
In den folgenden Abschnitten werden die einzelnen Datensatztypen beschrieben.
Datenänderungseinträge
Ein Datenänderungssatz enthält eine Reihe von Änderungen an einer Tabelle mit demselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), die zum selben Commit-Zeitstempel in einer Change-Stream-Partition für dieselbe Transaktion committet wurden. Für dieselbe Transaktion können mehrere Datenänderungseinträge über mehrere Änderungsstreampartitionen zurückgegeben werden.
Alle Datenänderungsdatensätze haben die Felder commit_timestamp
, server_transaction_id
und record_sequence
, die zusammen die Reihenfolge im Änderungsstream für einen Streamdatensatz bestimmen. Diese drei Felder reichen aus, um die Reihenfolge der Änderungen abzuleiten und für externe Konsistenz zu sorgen.
Hinweis: Mehrere Transaktionen können denselben Commit-Zeitstempel haben, wenn sie sich nicht überschneidende Daten betreffen. Mit dem Feld server_transaction_id
können Sie unterscheiden, welche Änderungen (potenziell über mehrere Änderungsstream-Partitionen hinweg) innerhalb derselben Transaktion vorgenommen wurden. In Kombination mit den Feldern record_sequence
und number_of_records_in_transaction
können Sie auch alle Einträge einer bestimmten Transaktion puffern und sortieren.
Die Felder eines Datenänderungs-Eintrags umfassen Folgendes:
Feld | Typ | Beschreibung |
---|---|---|
commit_timestamp |
TIMESTAMP |
Der Zeitstempel, zu dem die Änderung vorgenommen wurde. |
record_sequence |
STRING |
Gibt die Sequenznummer für den Datensatz innerhalb der Transaktion an.
Sequenznummern sind innerhalb einer Transaktion eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Einträge für dieselbe server_transaction_id nach record_sequence , um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
Spanner kann diese Sortierung für eine bessere Leistung optimieren und sie entspricht möglicherweise nicht immer der von Ihnen angegebenen ursprünglichen Sortierung. |
server_transaction_id |
STRING |
Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung per Commit übergeben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert. |
is_last_record_in_transaction_in_partition |
BOOL |
Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist. |
table_name |
STRING |
Name der Tabelle, die von der Änderung betroffen ist. |
value_capture_type |
STRING |
Beschreibt den Typ der Werterfassung, der in der Änderungsstreamkonfiguration angegeben wurde, als diese Änderung erfasst wurde. Der Werterfassungstyp kann einer der folgenden sein:
Der Standardwert ist |
column_types |
[ { "name": " |
Gibt den Namen der Spalte, den Spaltentyp, ob es sich um einen Primärschlüssel handelt, und die Position der Spalte gemäß Definition im Schema (ordinal_position ) an. Die erste Spalte einer Tabelle im Schema hat die Ordnungsposition 1 . Der Spaltentyp kann bei Arrayspalten verschachtelt sein. Das Format entspricht der in der Spanner API-Referenz beschriebenen Typenstruktur.
|
mods |
[ { "keys": {" |
Hier werden die vorgenommenen Änderungen beschrieben, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder erfassten Spalten.
Verfügbarkeit und Inhalt der alten und neuen Werte hängen von der konfigurierten value_capture_type ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten. |
mod_type |
STRING |
Beschreibt die Art der Änderung. Entweder INSERT , UPDATE oder DELETE . |
number_of_records_in_transaction |
INT64 |
Gibt die Anzahl der Datenänderungs-Datensätze an, die Teil dieser Transaktion in allen Änderungsstream-Partitionen sind. |
number_of_partitions_in_transaction |
INT64 |
Gibt die Anzahl der Partitionen an, die Datenänderungsdatensätze für diese Transaktion zurückgeben. |
transaction_tag |
STRING |
Das Transaktions-Tag, das mit dieser Transaktion verknüpft ist. |
is_system_transaction |
BOOL |
Gibt an, ob es sich bei der Transaktion um eine Systemtransaktion handelt. |
Feld | Typ | Beschreibung |
---|---|---|
commit_timestamp |
STRING |
Der Zeitstempel, zu dem die Änderung übernommen wurde. |
record_sequence |
STRING |
Gibt die Sequenznummer für den Datensatz innerhalb der Transaktion an.
Sequenznummern sind innerhalb einer Transaktion eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Einträge für dieselbe server_transaction_id nach record_sequence , um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren. |
server_transaction_id |
STRING |
Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung per Commit übergeben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist. |
table_name |
STRING |
Der Name der Tabelle, die von der Änderung betroffen ist. |
value_capture_type |
STRING |
Beschreibt den Typ der Werterfassung, der in der Änderungsstreamkonfiguration angegeben wurde, als diese Änderung erfasst wurde. Der Werterfassungstyp kann einer der folgenden sein:
Der Standardwert ist |
column_types |
[ { "name": " |
Gibt den Namen der Spalte, den Spaltentyp, ob es sich um einen Primärschlüssel handelt, und die Position der Spalte gemäß Definition im Schema (ordinal_position ) an. Die erste Spalte einer Tabelle im Schema hat die Ordnungsposition 1 . Der Spaltentyp kann bei Arrayspalten verschachtelt sein. Das Format entspricht der in der Spanner API-Referenz beschriebenen Typenstruktur.
|
mods |
[ { "keys": {" |
Hier werden die vorgenommenen Änderungen beschrieben, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder erfassten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten value_capture_type ab. Die Felder new_values und old_values enthalten nur die nicht primären Spalten.
|
mod_type |
STRING |
Beschreibt die Art der Änderung. Entweder INSERT , UPDATE oder DELETE . |
number_of_records_in_transaction |
INT64 |
Gibt die Anzahl der Datenänderungs-Datensätze an, die Teil dieser Transaktion in allen Änderungsstream-Partitionen sind. |
number_of_partitions_in_transaction |
NUMBER |
Gibt die Anzahl der Partitionen an, die Datenänderungsdatensätze für diese Transaktion zurückgeben. |
transaction_tag |
STRING |
Das Transaktions-Tag, das mit dieser Transaktion verknüpft ist. |
is_system_transaction |
BOOLEAN |
Gibt an, ob es sich bei der Transaktion um eine Systemtransaktion handelt. |
Beispiel für einen Datenänderungsdatensatz
Im Folgenden finden Sie zwei Beispieldatensätze für Datenänderungen. Sie beschreiben eine einzelne Transaktion, bei der eine Überweisung zwischen zwei Konten erfolgt. Die beiden Konten befinden sich in separaten Partitionen für Änderungsstreams.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp NEW_VALUES
. Es werden nur neue Werte eingefügt.
Da nur die Spalte LastUpdate
geändert wurde, wurde nur diese Spalte zurückgegeben.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp NEW_ROW
. Es wurde nur die Spalte LastUpdate
geändert, aber alle erfassten Spalten werden zurückgegeben.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp NEW_ROW_AND_OLD_VALUES
. Es wurde nur die Spalte LastUpdate
geändert, aber alle erfassten Spalten werden zurückgegeben. Bei diesem Werterfassungstyp werden der neue und der alte Wert von LastUpdate
erfasst.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Heartbeat-Einträge
Wenn ein Heartbeat-Eintrag zurückgegeben wird, bedeutet das, dass alle Änderungen mit einem commit_timestamp
, der kleiner oder gleich dem timestamp
des Heartbeat-Eintrags ist, zurückgegeben wurden. Zukünftige Datensätze in dieser Partition müssen höhere Commit-Zeitstempel haben als der vom Heartbeat-Eintrag zurückgegebene. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben wurden. Wenn Datenänderungen in die Partition geschrieben werden, kann anstelle von heartbeat_record.timestamp
die Zeichenfolge data_change_record.commit_timestamp
verwendet werden, um anzugeben, dass der Leser beim Lesen der Partition Fortschritte macht.
Sie können Heartbeat-Einträge, die in Partitionen zurückgegeben werden, verwenden, um Leser über alle Partitionen hinweg zu synchronisieren. Sobald alle Leser entweder einen Heartbeat erhalten haben, der größer oder gleich einem bestimmten Zeitstempel A
ist, oder Daten oder untergeordnete Partitionsdatensätze erhalten haben, die größer oder gleich dem Zeitstempel A
sind, wissen sie, dass sie alle Datensätze erhalten haben, die bis zu diesem Zeitstempel A
verbindlich gemacht wurden. Sie können dann mit der Verarbeitung der zwischengespeicherten Datensätze beginnen, z. B. die partitionenübergreifenden Datensätze nach Zeitstempel sortieren und nach server_transaction_id
gruppieren.
Ein Heartbeat-Eintrag enthält nur ein Feld:
Feld | Typ | Beschreibung |
---|---|---|
timestamp |
TIMESTAMP |
Gibt den Zeitstempel des Heartbeat-Eintrags an. |
Feld | Typ | Beschreibung |
---|---|---|
timestamp |
STRING |
Gibt den Zeitstempel des Heartbeat-Eintrags an. |
Beispiel für einen Heartbeat-Eintrag
Beispiel für einen Heartbeat-Eintrag, der angibt, dass alle Einträge mit Zeitstempeln, die kleiner oder gleich dem Zeitstempel dieses Eintrags sind, zurückgegeben wurden:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Untergeordnete Partitionseinträge
Mit Untergeordnete Partitionsdatensätze werden Informationen zu untergeordneten Partitionen zurückgegeben: ihre Partitionstokens, die Tokens ihrer übergeordneten Partitionen und die start_timestamp
, die den frühesten Zeitstempel darstellt, für den die untergeordneten Partitionen Änderungsdatensätze enthalten. Einträge, deren Commit-Zeitstempel unmittelbar vor dem child_partitions_record.start_timestamp
liegen, werden in der aktuellen Partition zurückgegeben. Nachdem alle untergeordneten Partitionseinträge für diese Partition zurückgegeben wurden, gibt diese Abfrage den Status „Erfolg“ zurück, was bedeutet, dass alle Einträge für diese Partition zurückgegeben wurden.
Die Felder eines untergeordneten Partitionseintrags umfassen Folgendes:
Feld | Typ | Beschreibung |
---|---|---|
start_timestamp |
TIMESTAMP |
Gibt an, dass die Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben wurden, einen Commit-Zeitstempel haben, der mindestens start_timestamp ist. Bei der Abfrage einer untergeordneten Partition muss die Abfrage das Token der untergeordneten Partition und einen start_timestamp -Wert enthalten, der größer oder gleich child_partitions_token.start_timestamp ist. Alle von einer Partition zurückgegebenen Datensätze für untergeordnete Partitionen haben dieselbe start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp . |
record_sequence |
STRING |
Gibt eine monoton steigende Sequenznummer an, mit der die Reihenfolge der untergeordneten Partitionseinträge definiert werden kann, wenn in einer bestimmten Partition mehrere untergeordnete Partitionseinträge mit derselben start_timestamp zurückgegeben werden. Das Partitionstoken, start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig.
|
child_partitions |
[ { "token" : " |
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitionstoken-String, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen. |
Feld | Typ | Beschreibung |
---|---|---|
start_timestamp |
STRING |
Gibt an, dass die Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben wurden, einen Commit-Zeitstempel haben, der mindestens start_timestamp ist. Bei der Abfrage einer untergeordneten Partition muss die Abfrage das Token der untergeordneten Partition und einen start_timestamp -Wert enthalten, der größer oder gleich child_partitions_token.start_timestamp ist. Alle von einer Partition zurückgegebenen Datensätze für untergeordnete Partitionen haben dieselbe start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp .
|
record_sequence |
STRING |
Gibt eine monoton steigende Sequenznummer an, mit der die Reihenfolge der untergeordneten Partitionseinträge definiert werden kann, wenn in einer bestimmten Partition mehrere untergeordnete Partitionseinträge mit derselben start_timestamp zurückgegeben werden. Das Partitionstoken, start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig.
|
child_partitions |
[ { "token": " |
Gibt ein Array von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitionstoken-String, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen. |
Beispiel für einen untergeordneten Partitionseintrag
Im folgenden Beispiel wird ein untergeordneter Partitionseintrag gezeigt:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Workflow für Abfragen von Änderungsstreams
Führen Sie Änderungsstreamabfragen mit der ExecuteStreamingSql
API mit einer einmaligen schreibgeschützten Transaktion und einer starken Zeitstempelbindung aus. Mit der Lesefunktion für den Änderungsstream können Sie start_timestamp
und end_timestamp
für den gewünschten Zeitraum angeben. Auf alle Änderungsdatensätze innerhalb des Aufbewahrungszeitraums kann über die starke schreibgeschützte Zeitstempelbindung zugegriffen werden.
Alle anderen TransactionOptions
sind für Änderungsstreamabfragen ungültig. Wenn TransactionOptions.read_only.return_read_timestamp
auf true
festgelegt ist, wird in der Transaction
-Nachricht, die die Transaktion beschreibt, anstelle eines gültigen Lesezeitstempels ein spezieller Wert von kint64max - 1
zurückgegeben. Dieser spezielle Wert sollte verworfen und nicht für nachfolgende Abfragen verwendet werden.
Jede Änderungsstreamabfrage kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils einen Datenänderungseintrag, einen Herzschlageintrag oder einen Eintrag für untergeordnete Partitionen enthalten. Sie müssen keine Frist für die Anfrage festlegen.
Beispiel für einen Workflow für Abfragen zu Änderungsstreams
Der Workflow für Streamingabfragen beginnt mit der Ausgabe der allerersten Abfrage für den Änderungsstream. Geben Sie dazu partition_token
bis NULL
an. In der Abfrage müssen die Lesefunktion für den Änderungsstream, der gewünschte Start- und Endzeitstempel sowie das Heartbeat-Intervall angegeben werden. Wenn end_timestamp
NULL
ist, gibt die Abfrage so lange Datenänderungen zurück, bis die Partition endet.
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Verarbeiten Sie Datensätze aus dieser Abfrage, bis alle untergeordneten Partitionseinträge zurückgegeben werden. Im folgenden Beispiel werden zwei untergeordnete Partitionseinträge und drei Partitionstokens zurückgegeben, bevor die Abfrage beendet wird. Untergeordnete Partitionseinträge aus einer bestimmten Abfrage haben immer dieselbe start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Wenn Sie Änderungen nach dem 2022-05-01T09:00:01Z
verarbeiten möchten, erstellen Sie drei neue Abfragen und führen Sie sie parallel aus. Zusammengenommen geben die drei Abfragen Datenänderungen für denselben Schlüsselbereich zurück, den auch das übergeordnete Element abdeckt. Legen Sie start_timestamp
immer auf start_timestamp
im selben untergeordneten Partitionseintrag fest und verwenden Sie dieselbe end_timestamp
und dasselbe Heartbeat-Intervall, um die Einträge in allen Abfragen konsistent zu verarbeiten.
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Die Abfrage für child_token_2
wird abgeschlossen, nachdem ein weiterer untergeordneter Partitionseintrag zurückgegeben wurde. Dieser Eintrag gibt an, dass eine neue Partition Änderungen sowohl für child_token_2
als auch für child_token_3
ab 2022-05-01T09:30:15Z
abdeckt. Die Abfrage für child_token_3
gibt genau denselben Datensatz zurück, da beide die übergeordneten Partitionen der neuen child_token_4
sind. Um eine strikte Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, muss die Abfrage auf child_token_4
gestartet werden, nachdem alle übergeordneten Elemente abgeschlossen sind. In diesem Fall sind die übergeordneten Elemente child_token_2
und child_token_3
. Erstellen Sie nur eine Abfrage pro untergeordnetem Partitionstoken. Im Abfrage-Workflow-Design sollte ein übergeordnetes Element zum Warten und Planen der Abfrage am child_token_4
festgelegt werden.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Beispiele für die Verarbeitung und das Parsen von Änderungsstream-Einträgen finden Sie im Apache Beam SpannerIO Dataflow-Connector auf GitHub.