Auf dieser Seite werden die folgenden Attribute von Änderungsstreams im Detail beschrieben:
- Das aufteilungsbasierte Partitionierungsmodell
- Format und Inhalt von Änderungsstreameinträgen
- Die Low-Level-Syntax, die zum Abfragen dieser Datensätze verwendet wird
- Beispiel für den Abfrageworkflow
Die Informationen auf dieser Seite sind am relevantesten für die Verwendung der Spanner API, um Änderungsstreams direkt abzufragen. Anwendungen, die stattdessen Dataflow zum Lesen des Änderungsstreams verwenden Daten nicht direkt mit dem Datenmodell arbeiten, beschrieben.
Eine umfassendere Einführung in Änderungsstreams finden Sie unter Streams ändern .
Streampartitionen ändern
Wenn eine Änderung an einer Tabelle auftritt, die von einem Änderungsstream überwacht wird, Spanner schreibt einen entsprechenden Änderungsstreameintrag in die Datenbank. synchron in derselben Transaktion erfolgen, in der sich die Daten ändern. Dieses garantiert, dass Spanner bei erfolgreicher Transaktion die Änderung erfolgreich erfasst und beibehalten hat. Intern Spanner bringt den Änderungsstreameintrag und die Datenänderung zusammen sodass sie vom selben Server verarbeitet werden, um den Schreib-Overhead zu minimieren.
Als Teil der DML für einen bestimmten Split kann Spanner Hängt den Schreibvorgang an die entsprechenden Änderungsstreamdaten an. in derselben Transaktion aufgeteilt. Aufgrund dieser Colocation Streams sorgen für keine zusätzliche Koordination zwischen den Bereitstellungsressourcen, den Commit-Overhead der Transaktion minimiert.
Spanner skaliert durch dynamisches Aufteilen und Zusammenführen von Daten auf Datenbanklast und -größe sowie die Verteilung von Splits auf die Bereitstellungsressourcen.
Bis Ermöglicht das Skalieren von Änderungsstreams, Schreib- und Lesevorgängen, Spanner-Splits und führt den internen Änderungsstream-Speicher mit den Datenbankdaten zusammen. Hotspots werden automatisch vermieden. Um das Lesen von Änderungsstream-Datensätzen in nahezu in Echtzeit, wenn Datenbankschreibvorgänge skaliert werden, ist die Spanner API ist darauf ausgelegt, einen Änderungsstream gleichzeitig über den Änderungsstream abzufragen. Partitionen. Ändern Sie die Zuordnung von Streampartitionen, um Stream-Datenaufteilungen zu ändern, die Änderungsstream-Datensätze enthalten. Die Partitionen eines Änderungsstreams ändern sich dynamisch im Laufe der Zeit und korrelieren damit, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.
Eine Änderungsstreampartition enthält Datensätze für einen unveränderlichen Schlüsselbereich für einen Zeitraum festlegen. Jede Änderungsstreampartition kann in eine oder mehrere Änderungsstreampartitionen aufgeteilt oder mit anderen Änderungsstreampartitionen zusammengeführt werden. Wenn diese geteilte oder zusammengeführte Ereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für ihre jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum. Außerdem Datenänderungseinträge gibt eine Änderungsstreamabfrage Datensätze untergeordneter Partitionen an Leser über neue Partitions im Änderungsstream informieren, die abgefragt werden müssen, als Heartbeat-Datensätze, um den Vorwärtsfortschritt anzuzeigen, wenn keine Schreibvorgänge stattgefunden haben vor Kurzem.
Beim Abfragen einer bestimmten Änderungsstreampartition sind die Änderungseinträge werden in der Reihenfolge der Commit-Zeitstempel zurückgegeben. Jeder Änderungseintrag wird genau einmal. Über Änderungsstreampartitionen hinweg gibt es keine garantierte Reihenfolge der Änderungen. Datensätze. Änderungseinträge für einen bestimmten Primärschlüssel werden nur auf einem für einen bestimmten Zeitraum.
Aufgrund der Partitionsherkunft zwischen über- und untergeordneten Elementen bestimmter Schlüssel in der Commit-Zeitstempelreihenfolge, Datensätze, die vom untergeordneten Partitionen sollten erst nach Datensätzen von allen übergeordneten Elementen verarbeitet werden Partitionen verarbeitet wurden.
Lesefunktionen und Abfragesyntax des Streams ändern
GoogleSQL
Änderungsstreams abfragen Sie mit dem
ExecuteStreamingSql
der API erstellen. Spanner erstellt automatisch eine spezielle Lesefunktion
mit dem Änderungsstream. Die Lesefunktion bietet Zugriff auf die Änderung
die Datensätze des Streams. Die Namenskonvention für Lesefunktionen
READ_change_stream_name
Unter der Annahme, dass in der Datenbank der Änderungsstream SingersNameStream
vorhanden ist, gilt Folgendes:
für GoogleSQL lautet die Abfragesyntax wie folgt:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
Die Funktion read akzeptiert die folgenden Argumente:
Name des Arguments | Typ | Erforderlich/Optional? | Beschreibung |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Erforderlich | Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp sind
zurückgegeben werden soll. Der Wert muss sich im Änderungsstream befinden
und sollte kleiner oder gleich
der aktuellen Zeit sein,
und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams ist. |
end_timestamp |
TIMESTAMP |
Optional (Standardeinstellung: NULL ) |
Gibt an, dass Einträge mit commit_timestamp weniger vorhanden sind
oder gleich end_timestamp sollte
zurückgegeben werden kann. Der Wert muss innerhalb der Aufbewahrungsfrist des Änderungsstreams liegen
Punkt und größer oder gleich start_timestamp . Die Abfrage
ist entweder nach Rückgabe aller ChangeRecords bis zu end_timestamp abgeschlossen
oder der Nutzer die Verbindung trennt. Wenn NULL oder nicht
wird die Abfrage ausgeführt, bis alle ChangeRecords zurückgegeben oder der
bricht die Verbindung ab. |
partition_token |
STRING |
Optional (Standardeinstellung: NULL ) |
Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem
Inhalt von untergeordneten Partitionen
Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den
den Änderungsstream zum ersten Mal abfragt und
Es wurden keine spezifischen Partitionstokens für die Abfrage abgerufen. |
heartbeat_milliseconds |
INT64 |
Erforderlich | Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird
falls in dieser Partition
keine Transaktionen mit Commit durchgeführt werden.
Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen.
Minuten). |
read_options |
ARRAY |
Optional (Standardeinstellung: NULL ) |
Zusätzliche Leseoptionen, die für die zukünftige Verwendung reserviert sind. Derzeit ist der einzige zulässige Wert NULL . |
Wir empfehlen, eine einfache Methode zum Erstellen des Textes für die Funktionsabfrage und Bindungsparameter daran lesen, wie im Folgenden gezeigt: Beispiel.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Änderungsstreams abfragen Sie mit dem
ExecuteStreamingSql
API
Spanner erstellt automatisch eine spezielle Lesefunktion
mit dem Änderungsstream. Die Lesefunktion bietet Zugriff auf die Änderung
die Datensätze des Streams. Die Namenskonvention für Lesefunktionen
spanner.read_json_change_stream_name
Unter der Annahme, dass in der Datenbank der Änderungsstream SingersNameStream
vorhanden ist, gilt Folgendes:
für PostgreSQL sieht so aus:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
Die Funktion read akzeptiert die folgenden Argumente:
Name des Arguments | Typ | Erforderlich/Optional? | Beschreibung |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Erforderlich | Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp geändert werden
zurückgegeben werden soll. Der Wert muss sich im Änderungsstream befinden
und sollte kleiner oder gleich
der aktuellen Zeit sein,
und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams ist. |
end_timestamp |
timestamp with timezone |
Optional (Standardeinstellung: NULL ) |
Gibt an, dass Einträge mit commit_timestamp geändert werden
kleiner oder gleich end_timestamp sollte
zurückgegeben werden kann. Der Wert muss innerhalb der Aufbewahrungsfrist des Änderungsstreams liegen
Punkt und größer oder gleich start_timestamp .
Die Abfrage ist abgeschlossen, nachdem alle Änderungseinträge bis zu
end_timestamp oder der Nutzer trennt die Verbindung.
Mit NULL wird die Abfrage ausgeführt, bis alle Änderungseinträge zurückgegeben wurden
oder der Nutzer die Verbindung trennt. |
partition_token |
text |
Optional (Standardeinstellung: NULL ) |
Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem
Inhalt von untergeordneten Partitionen
Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den
den Änderungsstream zum ersten Mal abfragt und
Es wurden keine spezifischen Partitionstokens für die Abfrage abgerufen. |
heartbeat_milliseconds |
bigint |
Erforderlich | Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird
falls in dieser Partition
keine Transaktionen mit Commit durchgeführt werden.
Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen.
Minuten). |
null |
null |
Erforderlich | Für zukünftige Verwendung reserviert |
Wir empfehlen, eine einfache Methode zum Erstellen des Textes für die die Funktions- und Bindungsparameter, wie im Folgenden gezeigt: Beispiel.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Format von Streamaufzeichnungen ändern
GoogleSQL
Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord
-Spalte des Typs zurück
ARRAY<STRUCT<...>>
. In jeder Zeile enthält dieses Array immer ein einzelnes Element.
Die Array-Elemente haben folgenden Typ:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Diese Struktur enthält drei Felder: data_change_record
,
heartbeat_record
und child_partitions_record
, jeweils vom Typ
ARRAY<STRUCT<...>>
. In jeder Zeile, die die Änderungsstream-Lesefunktion enthält
gibt, enthält nur eines dieser drei Felder einen Wert; die anderen beiden
sind leer oder NULL
. Diese Array-Felder enthalten höchstens ein Element.
In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.
PostgreSQL
Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord
-Spalte mit
Geben Sie JSON
mit der folgenden Struktur ein:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
In diesem Objekt gibt es drei mögliche Schlüssel: data_change_record
,
heartbeat_record
und child_partitions_record
, der entsprechende Wert
Typ ist JSON
.
In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt,
einer dieser drei Schlüssel existiert.
In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.
Einträge über Datenänderungen
Ein Datensatz für Datenänderung enthält eine Reihe von Änderungen an einer Tabelle mit dem denselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), der zur selben Zeit Commit-Zeitstempel in einer Änderungsstreampartition für denselben Transaktion. Für denselben Datensatz können mehrere Datenänderungseinträge zurückgegeben werden. in mehreren Partitions des Änderungsstreams.
Alle Datenänderungseinträge haben commit_timestamp
, server_transaction_id
,
und record_sequence
, die zusammen die Reihenfolge der Änderung bestimmen.
für einen Stream-Datensatz. Diese drei Felder reichen aus, um
und für externe Konsistenz sorgen.
Mehrere Transaktionen können
den gleichen Commit-Zeitstempel haben,
nicht überlappende Daten berühren. Das Feld server_transaction_id
lässt sich erkennen, welche Änderungen (möglicherweise
Änderungsstreams) wurden innerhalb derselben
Transaktion. Kopplung mit record_sequence
und
Mit number_of_records_in_transaction
-Feldern können Sie puffern und sortieren
alle Datensätze einer bestimmten Transaktion.
Zu den Feldern eines Datenänderungseintrags gehören:
GoogleSQL
Feld | Typ | Beschreibung |
---|---|---|
commit_timestamp |
TIMESTAMP |
Der Zeitstempel, an dem die Änderung festgeschrieben wurde. |
record_sequence |
STRING |
Die Sequenznummer für den Datensatz in der Transaktion. Für Sequenznummern wird garantiert,
eindeutig sein und innerhalb einer Transaktion kontinuierlich ansteigen (aber nicht notwendigerweise fortlaufend). Sortieren Sie die Datensätze
server_transaction_id von record_sequence bis
die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren. |
server_transaction_id |
STRING |
Ein global eindeutiger String, der die Transaktion in in dem die Änderung festgeschrieben wurde. Der Wert darf nur werden im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet. mit der Transaktions-ID in der Spanner-API korreliert. |
is_last_record_in_transaction_in_partition |
BOOL |
Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist. |
table_name |
STRING |
Name der Tabelle, die von der Änderung betroffen ist. |
value_capture_type |
STRING |
Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde. Der Werterfassungstyp kann |
column_types |
ARRAY<STRUCT< |
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema eine Ordinalposition von "1" haben. Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben. |
mods |
ARRAY<STRUCT< |
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels.
Werte, die alten Werte und die neuen Werte der geänderten oder erfassten Spalten.
Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten Wert "value_capture_type" ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten. |
mod_type |
STRING |
Beschreibt die Art der Änderung. Entweder INSERT , UPDATE oder
DELETE |
number_of_records_in_transaction |
INT64 |
Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams. |
number_of_partitions_in_transaction |
INT64 |
Die Anzahl der Partitionen, für die Datenänderungseinträge zurückgegeben werden für diese Transaktion. |
transaction_tag |
STRING |
Transaktions-Tag, das dieser Transaktion zugeordnet ist. |
is_system_transaction |
BOOL |
Gibt an, ob die Transaktion eine Systemtransaktion ist. |
PostgreSQL
Feld | Typ | Beschreibung |
---|---|---|
commit_timestamp |
STRING |
Der Zeitstempel, zu dem der Commit für die Änderung durchgeführt wurde. |
record_sequence |
STRING |
Die Sequenznummer für den Datensatz in der Transaktion. Für Sequenznummern wird garantiert, eindeutig sein und innerhalb einer Transaktion kontinuierlich ansteigen (aber nicht notwendigerweise fortlaufend). Sortieren Sie die Datensätze „server_transaction_id“ durch „record_sequenz“, um die Reihenfolge der Änderungen innerhalb der Transaktion. |
server_transaction_id |
STRING |
Ein global eindeutiger String, der die Transaktion in in dem die Änderung festgeschrieben wurde. Der Wert darf nur werden im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet. Korreliert mit der Transaktions-ID in der Spanner-API |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist. |
table_name |
STRING |
Name der Tabelle, die von der Änderung betroffen ist. |
value_capture_type |
STRING |
Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde. Der Werterfassungstyp kann |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema eine Ordinalposition von "1" haben. Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels.
Werte, die alten Werte und die neuen Werte der geänderten oder erfassten
Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen davon ab,
für den konfigurierten Wert "value_capture_type" festgelegt. Die new_values und
old_values -Felder enthalten nur die Nicht-Schlüsselspalten.
|
mod_type |
STRING |
Beschreibt die Art der Änderung. Entweder INSERT , UPDATE oder
DELETE |
number_of_records_in_transaction |
INT64 |
Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams. |
number_of_partitions_in_transaction |
NUMBER |
Die Anzahl der Partitionen, für die Datenänderungseinträge zurückgegeben werden für diese Transaktion. |
transaction_tag |
STRING |
Transaktions-Tag, das dieser Transaktion zugeordnet ist. |
is_system_transaction |
BOOLEAN |
Gibt an, ob die Transaktion eine Systemtransaktion ist. |
Es folgt ein Paar Beispieldatensätze für Datenänderungseinträge. Sie beschreiben eine einzelne Transaktion in dem es eine zwischen zwei Konten übertragen. Die beiden Konten befinden sich separater Änderungsstream Partitionen.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert
Erfassungstyp "NEW_VALUES"
. Beachten Sie, dass nur neue Werte ausgefüllt werden.
Da nur die Spalte "LastUpdate"
geändert wurde, ist nur diese Spalte
zurückgegeben.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert
Erfassungstyp "NEW_ROW"
. Nur die "LastUpdate"
wurde geändert, es werden jedoch alle beobachteten Spalten zurückgegeben.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert
Erfassungstyp "NEW_ROW_AND_OLD_VALUES"
. Nur die "LastUpdate"
wurde geändert, es werden jedoch alle beobachteten Spalten zurückgegeben. Diese Werterfassung
type erfasst den neuen und den alten Wert von LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Heartbeat-Rekorde
Wenn ein Heartbeat-Datensatz zurückgegeben wird, zeigt dies an, dass alle Änderungen mit
commit_timestamp
kleiner oder gleich dem Wert des Heartbeat-Datensatzes
timestamp
wurden zurückgegeben. Zukünftige Datensätze in diesem
Partition muss höhere Commit-Zeitstempel haben als die vom
Heartbeat-Rekord. Heartbeat-Einträge werden zurückgegeben, wenn keine Daten vorhanden sind
in eine Partition geschriebene Änderungen. Wenn Datenänderungen
die Partition, stattdessen kann data_change_record.commit_timestamp
verwendet werden
von heartbeat_record.timestamp
, um zu signalisieren, dass der Leser Fortschritte macht
beim Lesen der Partition.
Sie können Heartbeat-Datensätze, die auf Partitionen zurückgegeben wurden, zur Synchronisierung verwenden.
in allen Partitionen. Sobald alle Leser entweder eine
Heartbeat größer oder gleich einem Zeitstempel A
oder Daten oder untergeordnetes Element erhalten
Partitionseinträge größer oder gleich dem Zeitstempel A
sind, wissen die Leser, dass sie
alle Datensätze, für die zum Zeitpunkt oder vor dem Zeitstempel A
ein Commit durchgeführt wurde, und können beginnen
Verarbeitung der zwischengespeicherten Datensätze, z. B. Sortieren der partitionsübergreifenden
Datensätze nach Zeitstempel sortiert und nach server_transaction_id
gruppiert.
Ein Heartbeat-Eintrag enthält nur ein Feld:
GoogleSQL
Feld | Typ | Beschreibung |
---|---|---|
timestamp |
TIMESTAMP |
Der Zeitstempel des Heartbeat-Datensatzes. |
PostgreSQL
Feld | Typ | Beschreibung |
---|---|---|
timestamp |
STRING |
Der Zeitstempel des Heartbeat-Datensatzes. |
Beispiel für einen Heartbeat-Datensatz, der kommuniziert, dass alle Datensätze mit Zeitstempeln Es wurden weniger oder gleich dem Zeitstempel dieses Eintrags zurückgegeben:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Untergeordnete Partitionseinträge
Ein untergeordneter Partitionseintrag gibt Informationen über untergeordnete Partitionen zurück: ihre Partitionstokens, die Tokens ihrer übergeordneten Partitionen und den
start_timestamp
, der den frühesten Zeitstempel darstellt, den das untergeordnete Element
enthalten Änderungseinträge. Einträge mit Commit-Zeitstempeln
unmittelbar vor child_partitions_record.start_timestamp
liegen, werden
die in der aktuellen Partition zurückgegeben werden. Nachdem Sie alle
untergeordneten Partitionen für diese Partition enthält, gibt diese Abfrage
Erfolgsstatus, der angibt, dass alle Datensätze für diese
-Partition an.
Die Felder eines untergeordneten Partitionseintrags enthalten Folgendes:
GoogleSQL
Feld | Typ | Beschreibung |
---|---|---|
start_timestamp |
TIMESTAMP |
Vom untergeordneten Element zurückgegebene Datenänderungseinträge
Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel
ist größer als oder gleich start_timestamp . Bei der Abfrage einer untergeordneten Partition sollte die Abfrage
Geben Sie das untergeordnete Partitionstoken und einen start_timestamp größer oder gleich
child_partitions_token.start_timestamp . Alle untergeordneten Partitionen
die von einer Partition zurückgegeben wurden, denselben start_timestamp und den gleichen
Der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp
und end_timestamp . |
record_sequence |
STRING |
Eine kontinuierlich ansteigende Folge
Zahl, mit der die Reihenfolge der
wird der Datensatz von untergeordneten Partitionen
aufgenommen, wenn mehrere
Partitionsdatensätze, die mit demselben start_timestamp in einem
einer bestimmten Partition. Das Partitionstoken,
start_timestamp und
record_sequence eindeutig identifiziert
Child-Partitions-Datensatz. |
child_partitions |
ARRAY<STRUCT< |
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen. |
PostgreSQL
Feld | Typ | Beschreibung |
---|---|---|
start_timestamp |
STRING |
Vom untergeordneten Element zurückgegebene Datenänderungseinträge
Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel
ist größer als oder gleich start_timestamp . Bei der Abfrage eines untergeordneten Elements
-Partition muss in der Abfrage das untergeordnete Partitionstoken und ein
start_timestamp größer oder gleich
child_partitions_token.start_timestamp . Alle untergeordneten Partitionen
Datensätze, die von einer Partition zurückgegeben werden,
start_timestamp und der Zeitstempel liegt immer zwischen dem
den angegebenen start_timestamp und
end_timestamp .
|
record_sequence |
STRING |
Eine kontinuierlich ansteigende Folge
Zahl, mit der die Reihenfolge der
wird der Datensatz von untergeordneten Partitionen
aufgenommen, wenn mehrere
Partitionsdatensätze, die mit demselben start_timestamp in einem
einer bestimmten Partition. Das Partitionstoken,
start_timestamp und
record_sequence eindeutig identifiziert
Child-Partitions-Datensatz. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Gibt ein Array von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen. |
Das folgende Beispiel zeigt einen untergeordneten Partitionseintrag:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Workflow für Abfrage von Änderungsstreams
Führen Sie mit dem
ExecuteStreamingSql
API, einmalig
schreibgeschützt
Transaktion und ein
feste Zeitstempelgrenze. Die Änderung
Mit der Stream-Lesefunktion können Sie start_timestamp
und
end_timestamp
für den gewünschten Zeitraum. Alle Änderungseinträge
innerhalb der Aufbewahrungsdauer sind über die stark schreibgeschützte Version
Zeitstempelgrenze.
Alle anderen
TransactionOptions
sind ungültig für Änderungsstreamabfragen. Außerdem
Wenn TransactionOptions.read_only.return_read_timestamp
auf „true“ gesetzt ist,
wird im Transaction
ein spezieller Wert von kint64max - 1
zurückgegeben.
Nachricht, die die Transaktion beschreibt, anstelle eines gültigen Lesevorgangs
Zeitstempel. Dieser spezielle Wert sollte verworfen und für keine der
für nachfolgende Abfragen.
Jede Abfrage des Änderungsstreams kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils entweder einen Datenänderungseintrag, einen Heartbeat-Eintrag oder untergeordnete Partitionen Datensatz. Es muss keine Frist für die Anfrage festgelegt werden.
Beispiel:
Der Workflow für Streamingabfragen beginnt mit der Ausgabe des allerersten Änderungsstreams.
durch Angabe von partition_token
auf NULL
. In der Abfrage muss
die Lesefunktion für den Änderungsstream, den Start- und Endzeitstempel von Interesse und
das Heartbeat-Intervall. Wenn end_timestamp
auf NULL
gesetzt ist, behält die Abfrage
und Datenänderungen zurückgeben,
bis die Partition beendet ist.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Datensätze von dieser Abfrage verarbeiten, bis die Datensätze untergeordneter Partition
zurückgegeben. Im Beispiel unten sind zwei untergeordnete Partitionseinträge und drei Partitionseinträge
werden zurückgegeben, wird die Abfrage beendet. Untergeordnete Partitionseinträge aus einem
bestimmte Abfrage hat immer dieselbe start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Wenn Sie zukünftige Änderungen nach dem 2022-05-01T09:00:01Z
verarbeiten möchten, müssen Sie drei neue erstellen
und führen sie parallel aus. Die drei Abfragen zusammen geben
Änderungen an den Daten für denselben Schlüsselbereich, den ihr übergeordnetes Element abdeckt. Immer die
start_timestamp
zu start_timestamp
im selben untergeordneten Partitionseintrag und
dieselben Werte für end_timestamp
und Heartbeat-Intervall zur Verarbeitung der Datensätze verwenden
bei allen Suchanfragen einheitlich.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Nach einer Weile wird die Abfrage für child_token_2
beendet, nachdem eine andere Abfrage
untergeordneten Partitionseintrag. Dieser Eintrag zeigt an, dass eine neue Partition
in zukünftigen Änderungen für child_token_2
und child_token_3
ab dem
2022-05-01T09:30:15Z
Derselbe Datensatz wird von der Abfrage für
child_token_3
, da beide die übergeordneten Partitionen des neuen child_token_4
sind.
Um eine strikt geordnete Verarbeitung
von Datensätzen für einen bestimmten Schlüssel zu gewährleisten,
Die Abfrage für child_token_4
darf erst gestartet werden, wenn alle übergeordneten Elemente abgeschlossen sind.
in diesem Fall child_token_2
und child_token_3
. Nur eine Abfrage erstellen
Für jedes untergeordnete Partitionstoken sollte im Design des Abfrage-Workflows ein Token festgelegt werden,
Parent, um zu warten und die Abfrage für child_token_4
zu planen.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Beispiele für die Verarbeitung und Analyse von Änderungsstreameinträgen finden Sie in der Apache Beam SpannerIO. Dataflow-Connector aktiviert GitHub