Auf dieser Seite werden die folgenden Attribute von Änderungsstreams im Detail beschrieben:
- Split-basiertes Partitionierungsmodell
- Format und Inhalt von Änderungsstream-Einträgen
- Die allgemeine Syntax zum Abfragen dieser Datensätze
- Beispiel für den Abfrageworkflow
Die Informationen auf dieser Seite sind am relevantesten für die Verwendung der Spanner API zum direkten Abfragen von Änderungsstreams. Anwendungen, die stattdessen Dataflow zum Lesen von Änderungsstreamdaten verwenden, müssen nicht direkt mit dem hier beschriebenen Datenmodell arbeiten.
Eine umfassendere Einführung zum Ändern von Streams finden Sie unter Streams ändern.
Streampartitionen ändern
Wenn eine Änderung an einer Tabelle stattfindet, die von einem Änderungsstream beobachtet wird, schreibt Cloud Spanner einen entsprechenden Änderungsstream-Datensatz in die Datenbank, und zwar synchron in derselben Transaktion wie die Datenänderung. Dies garantiert, dass Spanner die Transaktion auch dann erfolgreich speichert und speichert, wenn die Transaktion erfolgreich ist. Spanner speichert intern den Änderungsstreamdatensatz und die Datenänderung, sodass sie vom selben Server verarbeitet werden, um den Schreibaufwand zu minimieren.
Im Rahmen der DML an einen bestimmten Split hängt Spanner den Schreibvorgang an die entsprechende Änderungsstream-Datenaufteilung in derselben Transaktion an. Aufgrund dieser Kollokation verursachen Änderungsstreams keine zusätzliche Koordination über Bereitstellungsressourcen, wodurch der Overhead für Transaktions-Commits minimiert wird.
Spanner skaliert, indem Daten basierend auf der Datenbanklast und -größe dynamisch aufgeteilt und zusammengeführt sowie auf mehrere Bereitstellungsressourcen verteilt werden.
Zur Aktivierung von skalierbaren Änderungsstreams und -lesevorgängen teilt Spanner den internen Änderungsstreamspeicher mit den Datenbankdaten zusammen und vermeidet automatisch Hotspots. Damit Änderungsstreameinträge nahezu in Echtzeit beim Lesen von Datenbankschreibvorgängen gelesen werden können, ist die Spanner API so konzipiert, dass ein Änderungsstream mithilfe von Änderungsstream-Partitionen gleichzeitig abgefragt wird. Ändern Sie die Zuordnung der Stream-Partitionen, um Stream-Datenaufteilungen zu ändern, die die Stream-Einträge enthalten. Die Partitionen eines Änderungsstreams ändern sich im Laufe der Zeit dynamisch und stehen in einem Zusammenhang mit der dynamischen Aufteilung und Zusammenführung der Datenbankdaten durch Spanner.
Eine Änderungsstream-Partition enthält Datensätze für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Änderungsstream-Partition kann in eine oder mehrere Änderungsstream-Partitionen aufgeteilt oder mit anderen Änderungsstream-Partitionen zusammengeführt werden. Wenn diese Aufteilungs- oder Zusammenführungsereignisse stattfinden, werden untergeordnete Partitionen erstellt, um die Änderungen für die jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum zu erfassen. Zusätzlich zu den Datenänderungseinträgen gibt eine Änderungsstream-Abfrage untergeordnete Partitionseinträge zurück, um Leser über neue abzufragende Änderungsstream-Partitionen zu informieren, sowie Heartbeat-Einträge, die den Vorwärtsfortschritt anzeigen, wenn in letzter Zeit keine Schreibvorgänge stattgefunden haben.
Beim Abfragen einer bestimmten Änderungsstream-Partition werden die Änderungseinträge in der Reihenfolge des Commit-Zeitstempels zurückgegeben. Jeder Änderungseintrag wird genau einmal zurückgegeben. Für Änderungsstream-Partitionen gibt es keine garantierte Reihenfolge der Änderungsdatensätze. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.
Aufgrund der Herkunft einer über-/untergeordneten Partition sollten Datensätze, die von untergeordneten Partitionen zurückgegeben werden, erst verarbeitet werden, nachdem die Datensätze von allen übergeordneten Partitionen verarbeitet wurden.
Syntax der Streamabfrage ändern
Änderungsstreams werden mit der ExecuteStreamingSql
API abgefragt. Neben dem Änderungsstream wird automatisch eine spezielle Tabellenwertfunktion (Table-Value, TVF) erstellt. Sie bietet Zugriff auf die Einträge des Änderungsstreams. Die TVF-Namenskonvention lautet READ_change_stream_name
.
Wenn ein Änderungsstream SingersNameStream
in der Datenbank vorhanden ist, lautet die Abfragesyntax so:
SELECT ChangeRecord FROM READ_SingersNameStream ( start_timestamp, end_timestamp, partition_token, heartbeat_milliseconds )
Die Funktion akzeptiert die folgenden Argumente:
Name des Arguments | Typ | Erforderlich/Optional? | Beschreibung |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Erforderlich | Gibt an, dass Datensätze mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Zeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein. |
end_timestamp |
TIMESTAMP |
Optional (Standardeinstellung: NULL ) |
Gibt an, dass Datensätze mit commit_timestamp , die kleiner oder gleich end_timestamp sind, zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer für den Änderungsstream liegen und größer oder gleich start_timestamp sein. Die Abfrage wird abgeschlossen, nachdem alle ChangeRecords-Daten bis „end_timestamp“ zurückgegeben wurden oder eine Reihe von untergeordneten Partitionseinträgen zurückgegeben wurde. Wenn NULL nicht angegeben ist, wird die Abfrage ausgeführt, bis die aktuelle Partition abgeschlossen ist und alle ChangeRecords mit ihren festgelegten child_partition_record -Feldern zurückgegeben werden. Wenn Sie NULL für end_timestamp angeben, werden immer die neuesten Änderungen gelesen. |
partition_token |
STRING |
Optional (Standardeinstellung: NULL ) |
Gibt auf der Grundlage des Inhalts von untergeordneten Partitionseinträgen an, welche Streampartition geändert werden soll. Wenn NULL nicht angegeben ist, bedeutet dies, dass der Leser den Änderungsstream zum ersten Mal abfragt und keine bestimmten Partitionstokens abgefragt hat. |
heartbeat_milliseconds |
INT64 |
Erforderlich | Legt fest, wie oft ein HeartRecord ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen ausgeführt werden.
Der Wert muss zwischen 1000 (eine Sekunde) und 300000 (fünf Minuten) liegen. |
Wir empfehlen, eine praktische Methode zum Erstellen des Texts der TVF-Abfrage und der Bindungsparameter zu erstellen, wie im folgenden Beispiel gezeigt.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
Format des Stream-Eintrags ändern
Der Änderungsstream TVF gibt eine einzelne ChangeRecord-Spalte vom Typ ARRAY<STRUCT<...>>
zurück. In jeder Zeile enthält dieses Array immer ein einzelnes Element.
Die Array-Elemente haben den folgenden Typ:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Diese Struktur hat drei Felder: data_change_record
, heartbeat_record
und child_partitions_record
vom Typ ARRAY<STRUCT<...>>
. In jeder Zeile, die vom Änderungsstream TVF zurückgegeben wird, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder NULL
. Diese Arrayfelder enthalten höchstens ein Element.
In den folgenden Abschnitten werden diese drei Eintragstypen untersucht.
Datensätze zur Datenänderung
Ein Datensatz mit Datenänderung enthält eine Reihe von Änderungen an einer Tabelle, für die derselbe Änderungstyp (einfügen, aktualisieren oder löschen) mit demselben Commit-Zeitstempel in einer Änderungsstream-Partition für dieselbe Transaktion übernommen wurde. Es können mehrere Datenänderungsdatensätze für dieselbe Transaktion über mehrere Änderungsstream-Partitionen zurückgegeben werden.
Alle Datenänderungseinträge haben die Felder commit_timestamp
, server_transaction_id
und record_sequence
, die zusammen die Reihenfolge im Änderungsstream für einen Streamdatensatz festlegen. Diese drei Felder reichen aus, um die Reihenfolge der Änderungen abzuleiten und externe Konsistenz bereitzustellen.
Beachten Sie, dass mehrere Transaktionen denselben Commit-Zeitstempel haben können, wenn sie sich nicht überschneidende Daten berühren. Mit dem Feld server_transaction_id
können Sie unterscheiden, welche Änderungen (möglicherweise über Änderungsstream-Partitionen) innerhalb derselben Transaktion vorgenommen wurden. Wenn Sie sie mit den Feldern record_sequence
und number_of_records_in_transaction
kombinieren, können Sie auch alle Datensätze einer bestimmten Transaktion puffern und sortieren.
Die Felder eines Datenänderungseintrags umfassen Folgendes:
Feld | Typ | Beschreibung |
---|---|---|
commit_timestamp |
TIMESTAMP |
Der Zeitstempel, in dem die Änderung übernommen wurde. |
record_sequence |
STRING |
Die Sequenznummer für den Datensatz innerhalb der Transaktion. Die Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und monoton steigen (aber nicht unbedingt zusammenhängend). Sortieren Sie die Einträge für dieselbe „server_transaction_id“ nach „record_sequence“, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren. |
server_transaction_id |
STRING |
Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung übernommen wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Einträgen verwendet werden und steht nicht im Zusammenhang mit der Transaktions-ID in der Spanner API, z.B. „TransactionSelector.id“. Mit beiden wird eine Transaktion im Vergleich zu anderen Werten innerhalb desselben Kontexts (d.h. Change Stream „data_change_records“ oder Spanner API) eindeutig identifiziert. |
is_last_record_in_transaction_in_partition |
BOOL |
Gibt an, ob dies der letzte Eintrag für eine Transaktion in der aktuellen Partition ist. |
table_name |
STRING |
Name der Tabelle, die von der Änderung betroffen ist. |
value_capture_type |
STRING |
Beschreibt den Werterfassungstyp, der bei der Erfassung dieser Änderung in der Änderungsstreamkonfiguration angegeben wurde. Der Werterfassungstyp kann |
column_types |
ARRAY<STRUCT< |
Der Name der Spalte, der Spaltentyp, ob er ein Primärschlüssel ist, und die Position der Spalte, die im Schema definiert ist (ordinal_position). Die erste Spalte einer Tabelle im Schema hat die ordinale Position „1“. Der Spaltentyp kann für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur, die in der Spanner API-Referenz beschrieben wird. |
mods |
ARRAY<STRUCT< |
Beschreibt die vorgenommenen Änderungen, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder erfassten Spalten.
Die Verfügbarkeit und der Inhalt der alten und neuen Werte hängen vom konfigurierten Wert für „Capture_type“ ab. Die Felder new_values und old_values enthalten nur die Spalten ohne Schlüssel. |
mod_type |
STRING |
Beschreibt die Art der Änderung. Entweder INSERT , UPDATE oder DELETE . |
number_of_records_in_transaction |
INT64 |
Die Anzahl der Datenänderungseinträge, die Teil dieser Transaktion in allen Änderungsstream-Partitionen sind. |
number_of_partitions_in_transaction |
INT64 |
Die Anzahl der Partitionen, die Datenänderungseinträge für diese Transaktion zurückgeben. |
transaction_tag |
STRING |
Mit dieser Transaktion verknüpftes Transaktions-Tag. |
is_system_transaction |
BOOL |
Gibt an, ob die Transaktion eine Systemtransaktion ist. |
Im Folgenden finden Sie zwei Beispiele für Änderungsdatensätze. Sie beschreiben eine einzelne Transaktion, bei der eine Übertragung zwischen zwei Konten stattfindet. Die beiden Konten befinden sich in separaten Änderungsstream-Partitionen.
data_change_record: {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
data_change_record: {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Der folgende Datensatz mit Datenänderungen ist ein Beispiel für einen Eintrag mit dem Werterfassungstyp "NEW_VALUES"
. Es werden nur neue Werte eingefügt.
Nur die Spalte "LastUpdate"
wurde geändert, sodass nur diese Spalte zurückgegeben wurde.
data_change_record: {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Der folgende Datensatz mit Datenänderungen ist ein Beispiel für einen Eintrag mit dem Werterfassungstyp "NEW_ROW"
. Nur die Spalte "LastUpdate"
wurde geändert, aber es werden alle erfassten Spalten zurückgegeben.
data_change_record: {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Herzschlagaufzeichnungen
Wenn ein Heartbeat-Eintrag zurückgegeben wird, gibt dies an, dass alle Änderungen mit commit_timestamp
, die kleiner oder gleich dem timestamp
des Heartbeat-Eintrags sind, zurückgegeben wurden und dass zukünftige Datensätze in dieser Partition höhere Commit-Zeitstempel haben müssen als die, die vom Heartbeat-Eintrag zurückgegeben werden. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben wurden. Wenn in die Partition geschriebene Datenänderungen vorhanden sind, kann data_change_record.commit_timestamp
anstelle von heartbeat_record.timestamp
verwendet werden, um anzuzeigen, dass der Leser beim Lesen der Partition Fortschritte macht.
Mit Heartbeat-Einträgen für Partitionen können Sie Leser über alle Partitionen synchronisieren. Sobald alle Leser einen Herzschlag empfangen haben, der größer als oder gleich einem Zeitstempel A
ist, oder Daten oder Datensätze mit untergeordneten Partitionen, die größer oder gleich dem Zeitstempel A
sind, wissen die Leser, dass sie alle Datensätze erhalten haben, die mit oder vor diesem Zeitstempel A
versehen sind, und können die Verarbeitung der gepufferten Datensätze beispielsweise starten und sie beispielsweise nach server_transaction_id
sortieren.
Ein Heartbeat-Datensatz enthält nur ein Feld:
Feld | Typ | Beschreibung |
---|---|---|
timestamp |
TIMESTAMP |
Der Zeitstempel des Heartbeat-Datensatzes. |
Ein Beispiel für einen Heartbeat-Eintrag, der darüber informiert, dass alle Datensätze mit Zeitstempeln kleiner oder gleich dem Zeitstempel dieses Datensatzes zurückgegeben wurden:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Einträge untergeordneter Partitionen
Ein Eintrag für untergeordnete Partitionen gibt Informationen zu untergeordneten Partitionen zurück: ihren Partitionstokens, den Tokens ihrer übergeordneten Partitionen und dem start_timestamp
, der den frühesten Zeitstempel darstellt, für den die untergeordneten Partitionen Änderungseinträge enthalten. Datensätze, deren Commit-Zeitstempel unmittelbar vor child_partitions_record.start_timestamp
liegen, werden in der aktuellen Partition zurückgegeben. Nachdem alle Datensätze der untergeordneten Partitionen für diese Partition zurückgegeben wurden, wird diese Abfrage mit einem Erfolgsstatus zurückgegeben. Dies bedeutet, dass alle Einträge für diese Partition zurückgegeben wurden.
Die Felder eines untergeordneten Partitionseintrags umfassen Folgendes:
Feld | Typ | Beschreibung |
---|---|---|
start_timestamp |
TIMESTAMP |
Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben werden, haben einen Commit-Zeitstempel größer oder gleich start_timestamp . Beim Abfragen einer untergeordneten Partition sollten in der Abfrage das Token der untergeordneten Partition und ein start_timestamp -Wert größer oder gleich child_partitions_token.start_timestamp angegeben werden. Alle von einer Partition zurückgegebenen Einträge der untergeordneten Partitionen haben denselben start_timestamp und der Zeitstempel liegt immer zwischen den für die Abfrage angegebenen start_timestamp und end_timestamp . |
record_sequence |
STRING |
Eine monoton steigende Sequenznummer, mit der die Reihenfolge des untergeordneten Partitionseintrags definiert werden kann, wenn in einer bestimmten Partition mehrere untergeordnete Partitionseinträge mit demselben start_timestamp zurückgegeben werden. Das Partitionstoken start_timestamp und record_sequence kennzeichnen einen untergeordneten Partitionseintrag eindeutig. |
child_partitions |
ARRAY<STRUCT< |
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitionstoken-String, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen. |
Beispiel für einen untergeordneten Partitionseintrag:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"],
}
],
}
Abfrage-Workflow für Streams ändern
Führen Sie Änderungsstream-Abfragen mit der ExecuteStreamingSql
API mit einer Einmal-schreibgeschützten Transaktion und einer starken Zeitstempelgrenze aus. Mit dem Änderungsstream TVF können Nutzer die start_timestamp
und end_timestamp
für den gewünschten Zeitraum angeben. Auf alle Änderungseinträge innerhalb der Aufbewahrungsdauer kann anhand der starken schreibgeschützten Zeitstempelgrenze zugegriffen werden.
Alle anderen TransactionOptions
sind für Abfragen des Änderungsstreams ungültig. Wenn TransactionOptions.read_only.return_read_timestamp
auf „true“ gesetzt ist, wird in der Nachricht Transaction
, die die Transaktion beschreibt, anstelle eines gültigen Lesezeitstempels der Wert kint64max - 1
zurückgegeben. Dieser spezielle Wert sollte verworfen und nicht für nachfolgende Abfragen verwendet werden.
Jede Abfrage für einen Änderungsstream kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils einen Datensatz mit Datenänderung, einen Heartbeat-Eintrag oder einen untergeordneten Partitionseintrag enthalten. Sie müssen keine Frist für die Anfrage festlegen.
Beispiel:
Der Workflow für Streamingabfragen beginnt mit der allerersten Änderungsstreamabfrage. Dazu wird partition_token
auf NULL
gesetzt. In der Abfrage müssen die TVF-Funktion für den Änderungsstream, den gewünschten Start- und End-Zeitstempel sowie das Herzschlagintervall angegeben werden. Wenn end_timestamp
den Wert NULL
hat, gibt die Abfrage so lange Datenänderungen zurück, bis untergeordnete Partitionen erstellt wurden.
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01 09:00:00-00",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
Verarbeiten Sie Datensätze aus dieser Abfrage, bis Datensätze der untergeordneten Partition zurückgegeben werden. Im folgenden Beispiel werden zwei untergeordnete Partitionseinträge und drei Partitionstokens zurückgegeben. Anschließend wird die Abfrage beendet. Untergeordnete Partitionseinträge einer bestimmten Abfrage teilen sich immer denselben start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01 09:00:01-00",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL],
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL],
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01 09:00:01-00",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL],
}
],
}
Erstellen Sie drei neue Abfragen und führen Sie sie parallel aus, um zukünftige Änderungen nach 2022-05-01 09:00:01-00
zu verarbeiten. Die drei Abfragen geben zusammen zukünftige Datenänderungen für denselben Schlüsselbereich zurück, den ihr übergeordnetes Element abdeckt. Setze start_timestamp
immer im selben untergeordneten Partitionseintrag auf start_timestamp
und verwende dasselbe end_timestamp
- und Herzschlagintervall, um die Einträge für alle Abfragen einheitlich zu verarbeiten.
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01 09:00:01-00",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01 09:00:01-00",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01 09:00:01-00",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000);
Nach einiger Zeit ist die Abfrage zu child_token_2
abgeschlossen, nachdem ein weiterer Eintrag für eine untergeordnete Partition zurückgegeben wurde. Dieser Eintrag weist darauf hin, dass eine neue Partition zukünftige Änderungen für child_token_2
und child_token_3
ab 2022-05-01 09:30:15-00
abdeckt. Der exakt gleiche Eintrag wird von der Abfrage am child_token_3
zurückgegeben, da beide die übergeordneten Partitionen des neuen child_token_4
sind.
Für eine strikt geordnete Verarbeitung von Datensätzen für einen bestimmten Schlüssel muss die Abfrage in child_token_4
erst beginnen, nachdem alle übergeordneten Elemente abgeschlossen sind, in diesem Fall child_token_2
und child_token_3
. Erstellen Sie für jedes untergeordnete Partitionstoken nur eine Abfrage. Im Design des Abfrageworkflows sollte ein übergeordnetes Element festgelegt werden, das die Abfrage auf child_token_4
plant und plant.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01 09:30:15-00",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01 09:30:15-00",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
Beispiele zum Verarbeiten und Parsen von Änderungsstream-Einträgen im Apache Beam SpannerIO-Dataflow-Connector auf GitHub