Auf dieser Seite werden die folgenden Attribute von Änderungsstreams ausführlich beschrieben:
- Das aufteilungsbasierte Partitionierungsmodell
- Format und Inhalt von Änderungsstreameinträgen
- Die Low-Level-Syntax, die zum Abfragen dieser Einträge verwendet wird
- Beispiel für den Abfrageworkflow
Die Informationen auf dieser Seite sind für die Verwendung der Spanner API zur direkten Abfrage von Änderungsstreams relevant. Anwendungen, die stattdessen Dataflow zum Lesen des Änderungsstreams verwenden Daten nicht direkt mit dem Datenmodell arbeiten, beschrieben.
Eine umfassendere Einführung in Änderungsstreams finden Sie unter Streams ändern .
Streampartitionen ändern
Wenn eine Änderung an einer Tabelle auftritt, die von einem Änderungsstream beobachtet wird, schreibt Spanner einen entsprechenden Änderungsstreameintrag in die Datenbank, synchron in derselben Transaktion wie die Datenänderung. Dieses garantiert, dass Spanner bei erfolgreicher Transaktion die Änderung erfolgreich erfasst und beibehalten hat. Intern Spanner bringt den Änderungsstreameintrag und die Datenänderung zusammen sodass sie vom selben Server verarbeitet werden, um den Schreib-Overhead zu minimieren.
Im Rahmen der DML für eine bestimmte Teilung hängt Spanner den Schreibvorgang in derselben Transaktion an den entsprechenden Datensplit des Änderungsstreams an. Aufgrund dieser Colocation Streams sorgen für keine zusätzliche Koordination zwischen den Bereitstellungsressourcen, den Commit-Overhead der Transaktion minimiert.
Spanner skaliert durch dynamisches Aufteilen und Zusammenführen von Daten auf Datenbanklast und -größe sowie die Verteilung von Splits auf die Bereitstellungsressourcen.
Bis Ermöglicht das Skalieren von Änderungsstreams, Schreib- und Lesevorgängen, Spanner-Splits und führt den internen Änderungsstream-Speicher mit den Datenbankdaten zusammen. Hotspots werden automatisch vermieden. Um das Lesen von Änderungsstream-Datensätzen in nahezu in Echtzeit, wenn Datenbankschreibvorgänge skaliert werden, ist die Spanner API ist darauf ausgelegt, einen Änderungsstream gleichzeitig über den Änderungsstream abzufragen. Partitionen. Änderungsstream-Partitionen werden Änderungsstream-Datensplits zugeordnet, die die Änderungsstream-Datensätze enthalten. Die Partitionen eines Änderungsstreams ändern sich dynamisch im Laufe der Zeit und korrelieren damit, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.
Eine Änderungsstream-Partition enthält Einträge für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Change-Stream-Partition kann in eine oder mehrere Change-Stream-Partitionen aufgeteilt oder mit anderen Change-Stream-Partitionen zusammengeführt werden. Wenn diese geteilte oder zusammengeführte Ereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für ihre jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum. Außerdem Datenänderungseinträge gibt eine Änderungsstreamabfrage Datensätze untergeordneter Partitionen an Leser über neue Partitions des Änderungsstreams informieren, die abgefragt werden müssen, als Heartbeat-Datensätze, um den Vorwärtsfortschritt anzuzeigen, wenn keine Schreibvorgänge stattgefunden haben vor Kurzem.
Wenn Sie eine bestimmte Change-Stream-Partition abfragen, werden die Änderungssätze in der Reihenfolge des Commit-Zeitstempels zurückgegeben. Jeder Änderungseintrag wird genau einmal. Zwischen den Partitionen eines Änderungsstreams ist keine Reihenfolge der Änderungssätze garantiert. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.
Aufgrund der übergeordneten und untergeordneten Partitionsabfolge müssen Einträge, die von untergeordneten Partitionen zurückgegeben werden, erst verarbeitet werden, nachdem Einträge aus allen übergeordneten Partitionen verarbeitet wurden, damit Änderungen für einen bestimmten Schlüssel in der Reihenfolge des Commit-Zeitstempels verarbeitet werden können.
Funktionen für Änderungsstream-Lesevorgänge und Abfragesyntax
GoogleSQL
Änderungsstreams werden mithilfe der Methode
ExecuteStreamingSql
der API erstellen. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet READ_change_stream_name
.
Angenommen, in der Datenbank ist ein Änderungsstream SingersNameStream
vorhanden, lautet die Abfragesyntax für GoogleSQL:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
Die Funktion read akzeptiert die folgenden Argumente:
Name des Arguments | Typ | Erforderlich/Optional? | Beschreibung |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Erforderlich | Gibt an, dass Datensätze mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Uhrzeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein. |
end_timestamp |
TIMESTAMP |
Optional (Standard: NULL ) |
Gibt an, dass Einträge mit commit_timestamp kleiner oder gleich end_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb des Zeitrahmens für die Aufbewahrung des Änderungsstreams liegen und größer oder gleich start_timestamp sein. Die Abfrage
ist entweder nach Rückgabe aller ChangeRecords bis zu end_timestamp abgeschlossen
oder der Nutzer die Verbindung trennt. Wenn NULL oder nicht
wird die Abfrage ausgeführt, bis alle ChangeRecords zurückgegeben oder der
bricht die Verbindung ab. |
partition_token |
STRING |
Optional (Standard: NULL ) |
Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem
Inhalt von untergeordneten Partitionen
Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den
den Änderungsstream zum ersten Mal abfragt und
Es wurden keine bestimmten Partitionstokens für die Abfrage abgerufen. |
heartbeat_milliseconds |
INT64 |
Erforderlich | Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird
falls keine Transaktionen in dieser Partition übergeben werden.
Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen.
Minuten). |
read_options |
ARRAY |
Optional (Standard: NULL ) |
Zusätzliche Leseoptionen, die für die zukünftige Verwendung reserviert sind. Derzeit ist NULL der einzige zulässige Wert. |
Wir empfehlen, eine einfache Methode zum Erstellen des Textes für die Funktionsabfrage und Bindungsparameter daran lesen, wie im Folgenden gezeigt: Beispiel.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Änderungsstreams abfragen Sie mit dem
ExecuteStreamingSql
API
Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet spanner.read_json_change_stream_name
.
Unter der Annahme, dass in der Datenbank der Änderungsstream SingersNameStream
vorhanden ist, gilt Folgendes:
für PostgreSQL sieht so aus:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
Die Funktion read akzeptiert die folgenden Argumente:
Name des Arguments | Typ | Erforderlich/Optional? | Beschreibung |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Erforderlich | Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp geändert werden
zurückgegeben werden soll. Der Wert muss sich im Änderungsstream befinden
und sollte kleiner oder gleich
der aktuellen Zeit sein,
und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams ist. |
end_timestamp |
timestamp with timezone |
Optional (Standardeinstellung: NULL ) |
Gibt an, dass Einträge mit commit_timestamp geändert werden
kleiner oder gleich end_timestamp sollte
zurückgegeben werden. Der Wert muss innerhalb der Aufbewahrungsfrist des Änderungsstreams liegen
Punkt und größer oder gleich start_timestamp .
Die Abfrage wird entweder beendet, nachdem alle Änderungseinträge bis zu end_timestamp zurückgegeben wurden, oder der Nutzer beendet die Verbindung.
Mit NULL wird die Abfrage ausgeführt, bis alle Änderungseinträge zurückgegeben wurden
oder der Nutzer die Verbindung trennt. |
partition_token |
text |
Optional (Standard: NULL ) |
Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem
Inhalt von untergeordneten Partitionen
Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den
den Änderungsstream zum ersten Mal abfragt und
Es wurden keine bestimmten Partitionstokens für die Abfrage abgerufen. |
heartbeat_milliseconds |
bigint |
Erforderlich | Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird
falls keine Transaktionen in dieser Partition übergeben werden.
Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen.
Minuten). |
null |
null |
Erforderlich | Reserviert für zukünftige Verwendung |
Wir empfehlen, eine praktische Methode zum Erstellen des Texts der Lesefunktion zu erstellen und Parameter daran zu binden, wie im folgenden Beispiel gezeigt.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Format von Streams-Eintrags ändern
GoogleSQL
Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord
-Spalte des Typs zurück
ARRAY<STRUCT<...>>
. In jeder Zeile enthält dieses Array immer ein einzelnes Element.
Die Arrayelemente haben folgenden Typ:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Diese Struktur enthält drei Felder: data_change_record
,
heartbeat_record
und child_partitions_record
, jeweils vom Typ
ARRAY<STRUCT<...>>
. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder enthalten den Wert NULL
. Diese Arrayfelder enthalten höchstens ein Element.
In den folgenden Abschnitten werden die einzelnen Datensatztypen beschrieben.
PostgreSQL
Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord
-Spalte vom Typ JSON
mit der folgenden Struktur zurück:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Dieses Objekt hat drei mögliche Schlüssel: data_change_record
, heartbeat_record
und child_partitions_record
. Der entsprechende Werttyp ist JSON
.
In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt,
einer dieser drei Schlüssel existiert.
In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.
Datenänderungseinträge
Ein Datensatz für Datenänderung enthält eine Reihe von Änderungen an einer Tabelle mit dem denselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), der zur selben Zeit Commit-Zeitstempel in einer Änderungsstreampartition für denselben Transaktion. Für denselben Datensatz können mehrere Datenänderungseinträge zurückgegeben werden. in mehreren Partitions des Änderungsstreams.
Alle Datenänderungsdatensätze haben die Felder commit_timestamp
, server_transaction_id
und record_sequence
, die zusammen die Reihenfolge im Änderungsstream für einen Streamdatensatz bestimmen. Diese drei Felder reichen aus, um
und für externe Konsistenz sorgen.
Hinweis: Mehrere Transaktionen können denselben Commit-Zeitstempel haben, wenn sie sich nicht überschneidende Daten betreffen. Das Feld server_transaction_id
lässt sich erkennen, welche Änderungen (möglicherweise
Änderungsstreams) wurden innerhalb derselben
Transaktion. Kopplung mit record_sequence
und
Mit number_of_records_in_transaction
-Feldern können Sie puffern und sortieren
alle Datensätze einer bestimmten Transaktion.
Zu den Feldern eines Datenänderungseintrags gehören:
GoogleSQL
Feld | Typ | Beschreibung |
---|---|---|
commit_timestamp |
TIMESTAMP |
Der Zeitstempel, zu dem die Änderung vorgenommen wurde. |
record_sequence |
STRING |
Die Sequenznummer für den Datensatz innerhalb der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze
server_transaction_id von record_sequence bis
die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
Diese Sortierung kann von Spanner für eine bessere Leistung optimiert werden und entspricht möglicherweise nicht immer der ursprünglichen Sortierung, die Nutzer angeben. |
server_transaction_id |
STRING |
Ein global eindeutiger String, der die Transaktion in in dem die Änderung festgeschrieben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert. |
is_last_record_in_transaction_in_partition |
BOOL |
Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist. |
table_name |
STRING |
Name der Tabelle, die von der Änderung betroffen ist. |
value_capture_type |
STRING |
Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde. Der Werterfassungstyp kann |
column_types |
ARRAY<STRUCT< |
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema hätte eine Ordinalposition von "1". Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben. |
mods |
ARRAY<STRUCT< |
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels.
Werte, die alten Werte und die neuen Werte der geänderten oder erfassten Spalten.
Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten „value_capture_type“ ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten. |
mod_type |
STRING |
Beschreibt die Art der Änderung. Entweder INSERT , UPDATE oder DELETE . |
number_of_records_in_transaction |
INT64 |
Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams. |
number_of_partitions_in_transaction |
INT64 |
Die Anzahl der Partitionen, die Datenänderungsdatensätze für diese Transaktion zurückgeben. |
transaction_tag |
STRING |
Transaktions-Tag, das dieser Transaktion zugeordnet ist. |
is_system_transaction |
BOOL |
Gibt an, ob die Transaktion eine Systemtransaktion ist. |
PostgreSQL
Feld | Typ | Beschreibung |
---|---|---|
commit_timestamp |
STRING |
Der Zeitstempel, zu dem die Änderung übernommen wurde. |
record_sequence |
STRING |
Die Sequenznummer für den Datensatz innerhalb der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze mit derselben „server_transaction_id“ nach „record_sequence“, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren. |
server_transaction_id |
STRING |
Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung per Commit übergeben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist. |
table_name |
STRING |
Name der Tabelle, die von der Änderung betroffen ist. |
value_capture_type |
STRING |
Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde. Der Werterfassungstyp kann |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema hätte eine Ordinalposition von "1". Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels.
Werte, die alten Werte und die neuen Werte der geänderten oder erfassten
Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten „value_capture_type“ ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
|
mod_type |
STRING |
Beschreibt die Art der Änderung. Entweder INSERT , UPDATE oder
DELETE |
number_of_records_in_transaction |
INT64 |
Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams. |
number_of_partitions_in_transaction |
NUMBER |
Die Anzahl der Partitionen, für die Datenänderungseinträge zurückgegeben werden für diese Transaktion. |
transaction_tag |
STRING |
Transaktions-Tag, das dieser Transaktion zugeordnet ist. |
is_system_transaction |
BOOLEAN |
Gibt an, ob es sich bei der Transaktion um eine Systemtransaktion handelt. |
Im Folgenden finden Sie zwei Beispieldatensätze für Datenänderungen. Sie beschreiben eine einzelne Transaktion in dem es eine zwischen zwei Konten übertragen. Die beiden Konten befinden sich separater Änderungsstream Partitionen.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert
Erfassungstyp "NEW_VALUES"
. Es werden nur neue Werte eingefügt.
Da nur die Spalte "LastUpdate"
geändert wurde, ist nur diese Spalte
zurückgegeben.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp "NEW_ROW"
. Es wurde nur die Spalte "LastUpdate"
geändert, aber alle erfassten Spalten werden zurückgegeben.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp "NEW_ROW_AND_OLD_VALUES"
. Nur die "LastUpdate"
wurde geändert, es werden jedoch alle beobachteten Spalten zurückgegeben. Diese Werterfassung
type erfasst den neuen und den alten Wert von LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Heartbeat-Einträge
Wenn ein Heartbeat-Eintrag zurückgegeben wird, bedeutet das, dass alle Änderungen mit einem commit_timestamp
, der kleiner oder gleich dem timestamp
des Heartbeat-Eintrags ist, zurückgegeben wurden. Zukünftige Datensätze in dieser Partition müssen höhere Commit-Zeitstempel haben als der vom Heartbeat-Eintrag zurückgegebene. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben wurden. Wenn Datenänderungen in die Partition geschrieben werden, kann anstelle von heartbeat_record.timestamp
die Zeichenfolge data_change_record.commit_timestamp
verwendet werden, um anzugeben, dass der Leser beim Lesen der Partition Fortschritte macht.
Sie können Heartbeat-Datensätze, die auf Partitionen zurückgegeben wurden, zur Synchronisierung verwenden.
in allen Partitionen. Sobald alle Leser entweder eine
Heartbeat größer oder gleich einem Zeitstempel A
oder Daten oder untergeordnetes Element erhalten
Partitionseinträge größer oder gleich dem Zeitstempel A
sind, wissen die Leser, dass sie
alle Datensätze, für die zum Zeitpunkt oder vor dem Zeitstempel A
ein Commit durchgeführt wurde, und können beginnen
Verarbeitung der zwischengespeicherten Datensätze, z. B. Sortieren der partitionsübergreifenden
Datensätze nach Zeitstempel sortiert und nach server_transaction_id
gruppiert.
Ein Heartbeat-Eintrag enthält nur ein Feld:
GoogleSQL
Feld | Typ | Beschreibung |
---|---|---|
timestamp |
TIMESTAMP |
Der Zeitstempel des Heartbeat-Eintrags. |
PostgreSQL
Feld | Typ | Beschreibung |
---|---|---|
timestamp |
STRING |
Der Zeitstempel des Heartbeat-Eintrags. |
Beispiel für einen Heartbeat-Eintrag, der angibt, dass alle Einträge mit Zeitstempeln, die kleiner oder gleich dem Zeitstempel dieses Eintrags sind, zurückgegeben wurden:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Datensätze für untergeordnete Partitionen
Ein untergeordneter Partitionseintrag gibt Informationen zu untergeordneten Partitionen zurück: ihre Partitionstokens, die Tokens ihrer übergeordneten Partitionen und den
start_timestamp
, der den frühesten Zeitstempel darstellt, den das untergeordnete Element
enthalten Änderungseinträge. Einträge mit Commit-Zeitstempeln
unmittelbar vor child_partitions_record.start_timestamp
liegen, werden
die in der aktuellen Partition zurückgegeben werden. Nachdem Sie alle
untergeordneten Partitionen für diese Partition enthält, gibt diese Abfrage
Erfolgsstatus, der angibt, dass alle Datensätze für diese
-Partition an.
Die Felder eines untergeordneten Partitionseintrags enthalten Folgendes:
GoogleSQL
Feld | Typ | Beschreibung |
---|---|---|
start_timestamp |
TIMESTAMP |
Vom untergeordneten Element zurückgegebene Datenänderungseinträge
Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel
ist größer als oder gleich start_timestamp . Bei der Abfrage einer untergeordneten Partition sollte die Abfrage
Geben Sie das untergeordnete Partitionstoken und einen start_timestamp größer oder gleich
child_partitions_token.start_timestamp . Alle von einer Partition zurückgegebenen Datensätze für untergeordnete Partitionen haben denselben start_timestamp -Wert und der Zeitstempel liegt immer zwischen dem angegebenen start_timestamp und end_timestamp der Abfrage. |
record_sequence |
STRING |
Eine kontinuierlich ansteigende Folge
Zahl, mit der die Reihenfolge der
wird der Datensatz von untergeordneten Partitionen
aufgenommen, wenn mehrere
Partitionsdatensätze, die mit demselben start_timestamp in einem
einer bestimmten Partition. Das Partitionstoken, start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig. |
child_partitions |
ARRAY<STRUCT< |
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen. |
PostgreSQL
Feld | Typ | Beschreibung |
---|---|---|
start_timestamp |
STRING |
Vom untergeordneten Element zurückgegebene Datenänderungseinträge
Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel
ist größer als oder gleich start_timestamp . Bei der Abfrage einer untergeordneten Partition muss die Abfrage das Token der untergeordneten Partition und einen start_timestamp -Wert enthalten, der größer oder gleich child_partitions_token.start_timestamp ist. Alle von einer Partition zurückgegebenen untergeordneten Partitionseinträge haben dieselbe start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp .
|
record_sequence |
STRING |
Eine monoton steigende Sequenznummer, mit der die Reihenfolge des Datensatzes für untergeordnete Partitionen definiert werden kann, wenn in einer bestimmten Partition mehrere Datensätze für untergeordnete Partitionen mit demselben Startzeitstempel zurückgegeben werden. Das Partitionstoken,
start_timestamp und
record_sequence eindeutig identifiziert
untergeordneten Partitionen ein. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Gibt ein Array von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen. |
Das folgende Beispiel zeigt einen untergeordneten Partitionseintrag:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Workflow für Abfrage von Änderungsstreams
Führen Sie Änderungsstreams-Abfragen mit der ExecuteStreamingSql
API mit einer einmaligen schreibgeschützten Transaktion und einer starken Zeitstempelbindung aus. Die Änderung
Mit der Stream-Lesefunktion können Sie start_timestamp
und
end_timestamp
für den gewünschten Zeitraum. Auf alle Änderungsdatensätze innerhalb des Aufbewahrungszeitraums kann über die starke schreibgeschützte Zeitstempelbindung zugegriffen werden.
Alle anderen TransactionOptions
sind für Änderungsstreamabfragen ungültig. Wenn TransactionOptions.read_only.return_read_timestamp
auf „wahr“ festgelegt ist, wird in der Transaction
-Nachricht, die die Transaktion beschreibt, anstelle eines gültigen Lesezeitstempels ein spezieller Wert von kint64max - 1
zurückgegeben. Dieser spezielle Wert sollte verworfen und für keine der
für nachfolgende Abfragen.
Jede Abfrage des Änderungsstreams kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils entweder einen Datenänderungseintrag, einen Heartbeat-Eintrag oder untergeordnete Partitionen Datensatz. Es muss keine Frist für die Anfrage festgelegt werden.
Beispiel:
Der Workflow für Streamingabfragen beginnt mit der Ausgabe der allerersten Abfrage für den Änderungsstream. Geben Sie dazu partition_token
bis NULL
an. In der Abfrage muss
die Lesefunktion für den Änderungsstream, den Start- und Endzeitstempel von Interesse und
das Heartbeat-Intervall. Wenn end_timestamp
NULL
ist, gibt die Abfrage so lange Datenänderungen zurück, bis die Partition endet.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Datensätze von dieser Abfrage verarbeiten, bis die Datensätze der untergeordneten Partition
zurückgegeben. Im Beispiel unten sind zwei untergeordnete Partitionseinträge und drei Partitionseinträge
werden zurückgegeben, wird die Abfrage beendet. Untergeordnete Partitionseinträge aus einem
bestimmte Abfrage hat immer dieselbe start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Wenn Sie zukünftige Änderungen nach dem 2022-05-01T09:00:01Z
verarbeiten möchten, erstellen Sie drei neue Abfragen und führen Sie sie parallel aus. Die drei Abfragen zusammen geben
Änderungen an den Daten für denselben Schlüsselbereich, den ihr übergeordnetes Element abdeckt. Immer die
start_timestamp
zu start_timestamp
im selben untergeordneten Partitionseintrag und
dieselben Werte für end_timestamp
und Heartbeat-Intervall zur Verarbeitung der Datensätze verwenden
bei allen Suchanfragen einheitlich.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Nach einer Weile wird die Abfrage für child_token_2
beendet, nachdem eine andere Abfrage
untergeordneten Partitionseintrag. Dieser Eintrag zeigt an, dass eine neue Partition
in zukünftigen Änderungen für child_token_2
und child_token_3
ab dem
2022-05-01T09:30:15Z
Genau derselbe Datensatz wird von der Abfrage auf child_token_3
zurückgegeben, da beide die übergeordneten Partitionen der neuen child_token_4
sind.
Um eine strikt geordnete Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, darf die Abfrage für child_token_4
erst gestartet werden, wenn alle übergeordneten Elemente abgeschlossen sind. In diesem Fall sind das child_token_2
und child_token_3
. Nur eine Abfrage erstellen
Für jedes untergeordnete Partitionstoken sollte im Design des Abfrage-Workflows ein Token festgelegt werden,
Parent, um zu warten und die Abfrage für child_token_4
zu planen.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Beispiele für die Verarbeitung und Analyse von Änderungsstreameinträgen finden Sie in der Apache Beam SpannerIO. Dataflow-Connector aktiviert GitHub