Partitionen, Datensätze und Abfragen von Änderungsstreams

Auf dieser Seite werden die folgenden Attribute von Änderungsstreams ausführlich beschrieben:

  • Das aufteilungsbasierte Partitionierungsmodell
  • Format und Inhalt von Änderungsstreameinträgen
  • Die Low-Level-Syntax, die zum Abfragen dieser Einträge verwendet wird
  • Beispiel für den Abfrageworkflow

Die Informationen auf dieser Seite sind für die Verwendung der Spanner API zur direkten Abfrage von Änderungsstreams relevant. Anwendungen, die stattdessen Dataflow zum Lesen des Änderungsstreams verwenden Daten nicht direkt mit dem Datenmodell arbeiten, beschrieben.

Eine umfassendere Einführung in Änderungsstreams finden Sie unter Streams ändern .

Streampartitionen ändern

Wenn eine Änderung an einer Tabelle auftritt, die von einem Änderungsstream beobachtet wird, schreibt Spanner einen entsprechenden Änderungsstreameintrag in die Datenbank, synchron in derselben Transaktion wie die Datenänderung. Dieses garantiert, dass Spanner bei erfolgreicher Transaktion die Änderung erfolgreich erfasst und beibehalten hat. Intern Spanner bringt den Änderungsstreameintrag und die Datenänderung zusammen sodass sie vom selben Server verarbeitet werden, um den Schreib-Overhead zu minimieren.

Im Rahmen der DML für eine bestimmte Teilung hängt Spanner den Schreibvorgang in derselben Transaktion an den entsprechenden Datensplit des Änderungsstreams an. Aufgrund dieser Colocation Streams sorgen für keine zusätzliche Koordination zwischen den Bereitstellungsressourcen, den Commit-Overhead der Transaktion minimiert.

Image

Spanner skaliert durch dynamisches Aufteilen und Zusammenführen von Daten auf Datenbanklast und -größe sowie die Verteilung von Splits auf die Bereitstellungsressourcen.

Bis Ermöglicht das Skalieren von Änderungsstreams, Schreib- und Lesevorgängen, Spanner-Splits und führt den internen Änderungsstream-Speicher mit den Datenbankdaten zusammen. Hotspots werden automatisch vermieden. Um das Lesen von Änderungsstream-Datensätzen in nahezu in Echtzeit, wenn Datenbankschreibvorgänge skaliert werden, ist die Spanner API ist darauf ausgelegt, einen Änderungsstream gleichzeitig über den Änderungsstream abzufragen. Partitionen. Änderungsstream-Partitionen werden Änderungsstream-Datensplits zugeordnet, die die Änderungsstream-Datensätze enthalten. Die Partitionen eines Änderungsstreams ändern sich dynamisch im Laufe der Zeit und korrelieren damit, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.

Eine Änderungsstream-Partition enthält Einträge für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Change-Stream-Partition kann in eine oder mehrere Change-Stream-Partitionen aufgeteilt oder mit anderen Change-Stream-Partitionen zusammengeführt werden. Wenn diese geteilte oder zusammengeführte Ereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für ihre jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum. Außerdem Datenänderungseinträge gibt eine Änderungsstreamabfrage Datensätze untergeordneter Partitionen an Leser über neue Partitions des Änderungsstreams informieren, die abgefragt werden müssen, als Heartbeat-Datensätze, um den Vorwärtsfortschritt anzuzeigen, wenn keine Schreibvorgänge stattgefunden haben vor Kurzem.

Wenn Sie eine bestimmte Change-Stream-Partition abfragen, werden die Änderungssätze in der Reihenfolge des Commit-Zeitstempels zurückgegeben. Jeder Änderungseintrag wird genau einmal. Zwischen den Partitionen eines Änderungsstreams ist keine Reihenfolge der Änderungssätze garantiert. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.

Aufgrund der übergeordneten und untergeordneten Partitionsabfolge müssen Einträge, die von untergeordneten Partitionen zurückgegeben werden, erst verarbeitet werden, nachdem Einträge aus allen übergeordneten Partitionen verarbeitet wurden, damit Änderungen für einen bestimmten Schlüssel in der Reihenfolge des Commit-Zeitstempels verarbeitet werden können.

Funktionen für Änderungsstream-Lesevorgänge und Abfragesyntax

GoogleSQL

Änderungsstreams werden mithilfe der Methode ExecuteStreamingSql der API erstellen. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet READ_change_stream_name.

Angenommen, in der Datenbank ist ein Änderungsstream SingersNameStream vorhanden, lautet die Abfragesyntax für GoogleSQL:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

Die Funktion read akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp TIMESTAMP Erforderlich Gibt an, dass Datensätze mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Uhrzeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein.
end_timestamp TIMESTAMP Optional (Standard: NULL) Gibt an, dass Einträge mit commit_timestamp kleiner oder gleich end_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb des Zeitrahmens für die Aufbewahrung des Änderungsstreams liegen und größer oder gleich start_timestamp sein. Die Abfrage ist entweder nach Rückgabe aller ChangeRecords bis zu end_timestamp abgeschlossen oder der Nutzer die Verbindung trennt. Wenn NULL oder nicht wird die Abfrage ausgeführt, bis alle ChangeRecords zurückgegeben oder der bricht die Verbindung ab.
partition_token STRING Optional (Standard: NULL) Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem Inhalt von untergeordneten Partitionen Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den den Änderungsstream zum ersten Mal abfragt und Es wurden keine bestimmten Partitionstokens für die Abfrage abgerufen.
heartbeat_milliseconds INT64 Erforderlich Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird falls keine Transaktionen in dieser Partition übergeben werden. Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen. Minuten).
read_options ARRAY Optional (Standard: NULL) Zusätzliche Leseoptionen, die für die zukünftige Verwendung reserviert sind. Derzeit ist NULL der einzige zulässige Wert.

Wir empfehlen, eine einfache Methode zum Erstellen des Textes für die Funktionsabfrage und Bindungsparameter daran lesen, wie im Folgenden gezeigt: Beispiel.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Änderungsstreams abfragen Sie mit dem ExecuteStreamingSql API Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet spanner.read_json_change_stream_name.

Unter der Annahme, dass in der Datenbank der Änderungsstream SingersNameStream vorhanden ist, gilt Folgendes: für PostgreSQL sieht so aus:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

Die Funktion read akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp timestamp with time zone Erforderlich Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp geändert werden zurückgegeben werden soll. Der Wert muss sich im Änderungsstream befinden und sollte kleiner oder gleich der aktuellen Zeit sein, und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams ist.
end_timestamp timestamp with timezone Optional (Standardeinstellung: NULL) Gibt an, dass Einträge mit commit_timestamp geändert werden kleiner oder gleich end_timestamp sollte zurückgegeben werden. Der Wert muss innerhalb der Aufbewahrungsfrist des Änderungsstreams liegen Punkt und größer oder gleich start_timestamp. Die Abfrage wird entweder beendet, nachdem alle Änderungseinträge bis zu end_timestamp zurückgegeben wurden, oder der Nutzer beendet die Verbindung. Mit NULL wird die Abfrage ausgeführt, bis alle Änderungseinträge zurückgegeben wurden oder der Nutzer die Verbindung trennt.
partition_token text Optional (Standard: NULL) Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem Inhalt von untergeordneten Partitionen Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den den Änderungsstream zum ersten Mal abfragt und Es wurden keine bestimmten Partitionstokens für die Abfrage abgerufen.
heartbeat_milliseconds bigint Erforderlich Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird falls keine Transaktionen in dieser Partition übergeben werden. Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen. Minuten).
null null Erforderlich Reserviert für zukünftige Verwendung

Wir empfehlen, eine praktische Methode zum Erstellen des Texts der Lesefunktion zu erstellen und Parameter daran zu binden, wie im folgenden Beispiel gezeigt.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Format von Streams-Eintrags ändern

GoogleSQL

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte des Typs zurück ARRAY<STRUCT<...>>. In jeder Zeile enthält dieses Array immer ein einzelnes Element.

Die Arrayelemente haben folgenden Typ:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Diese Struktur enthält drei Felder: data_change_record, heartbeat_record und child_partitions_record, jeweils vom Typ ARRAY<STRUCT<...>>. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder enthalten den Wert NULL. Diese Arrayfelder enthalten höchstens ein Element.

In den folgenden Abschnitten werden die einzelnen Datensatztypen beschrieben.

PostgreSQL

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte vom Typ JSON mit der folgenden Struktur zurück:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Dieses Objekt hat drei mögliche Schlüssel: data_change_record, heartbeat_record und child_partitions_record. Der entsprechende Werttyp ist JSON. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, einer dieser drei Schlüssel existiert.

In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.

Datenänderungseinträge

Ein Datensatz für Datenänderung enthält eine Reihe von Änderungen an einer Tabelle mit dem denselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), der zur selben Zeit Commit-Zeitstempel in einer Änderungsstreampartition für denselben Transaktion. Für denselben Datensatz können mehrere Datenänderungseinträge zurückgegeben werden. in mehreren Partitions des Änderungsstreams.

Alle Datenänderungsdatensätze haben die Felder commit_timestamp, server_transaction_id und record_sequence, die zusammen die Reihenfolge im Änderungsstream für einen Streamdatensatz bestimmen. Diese drei Felder reichen aus, um und für externe Konsistenz sorgen.

Hinweis: Mehrere Transaktionen können denselben Commit-Zeitstempel haben, wenn sie sich nicht überschneidende Daten betreffen. Das Feld server_transaction_id lässt sich erkennen, welche Änderungen (möglicherweise Änderungsstreams) wurden innerhalb derselben Transaktion. Kopplung mit record_sequence und Mit number_of_records_in_transaction-Feldern können Sie puffern und sortieren alle Datensätze einer bestimmten Transaktion.

Zu den Feldern eines Datenänderungseintrags gehören:

GoogleSQL

Feld Typ Beschreibung
commit_timestamp TIMESTAMP Der Zeitstempel, zu dem die Änderung vorgenommen wurde.
record_sequence STRING Die Sequenznummer für den Datensatz innerhalb der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze server_transaction_id von record_sequence bis die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren. Diese Sortierung kann von Spanner für eine bessere Leistung optimiert werden und entspricht möglicherweise nicht immer der ursprünglichen Sortierung, die Nutzer angeben.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion in in dem die Änderung festgeschrieben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert.
is_last_record_in_transaction_in_partition BOOL Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES" sein. Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen finden Sie unter Arten der Werterfassung.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema hätte eine Ordinalposition von "1". Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels. Werte, die alten Werte und die neuen Werte der geänderten oder erfassten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten „value_capture_type“ ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams.
number_of_partitions_in_transaction INT64 Die Anzahl der Partitionen, die Datenänderungsdatensätze für diese Transaktion zurückgeben.
transaction_tag STRING Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOL Gibt an, ob die Transaktion eine Systemtransaktion ist.

PostgreSQL

Feld Typ Beschreibung
commit_timestamp STRING Der Zeitstempel, zu dem die Änderung übernommen wurde.
record_sequence STRING Die Sequenznummer für den Datensatz innerhalb der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze mit derselben „server_transaction_id“ nach „record_sequence“, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung per Commit übergeben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert.
is_last_record_in_transaction_in_partition BOOLEAN Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES" sein. Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen finden Sie unter Arten der Werterfassung.

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema hätte eine Ordinalposition von "1". Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben.
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels. Werte, die alten Werte und die neuen Werte der geänderten oder erfassten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten „value_capture_type“ ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE
number_of_records_in_transaction INT64 Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams.
number_of_partitions_in_transaction NUMBER Die Anzahl der Partitionen, für die Datenänderungseinträge zurückgegeben werden für diese Transaktion.
transaction_tag STRING Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOLEAN Gibt an, ob es sich bei der Transaktion um eine Systemtransaktion handelt.

Im Folgenden finden Sie zwei Beispieldatensätze für Datenänderungen. Sie beschreiben eine einzelne Transaktion in dem es eine zwischen zwei Konten übertragen. Die beiden Konten befinden sich separater Änderungsstream Partitionen.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert Erfassungstyp "NEW_VALUES". Es werden nur neue Werte eingefügt. Da nur die Spalte "LastUpdate" geändert wurde, ist nur diese Spalte zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp "NEW_ROW". Es wurde nur die Spalte "LastUpdate" geändert, aber alle erfassten Spalten werden zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp "NEW_ROW_AND_OLD_VALUES". Nur die "LastUpdate" wurde geändert, es werden jedoch alle beobachteten Spalten zurückgegeben. Diese Werterfassung type erfasst den neuen und den alten Wert von LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Heartbeat-Einträge

Wenn ein Heartbeat-Eintrag zurückgegeben wird, bedeutet das, dass alle Änderungen mit einem commit_timestamp, der kleiner oder gleich dem timestamp des Heartbeat-Eintrags ist, zurückgegeben wurden. Zukünftige Datensätze in dieser Partition müssen höhere Commit-Zeitstempel haben als der vom Heartbeat-Eintrag zurückgegebene. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben wurden. Wenn Datenänderungen in die Partition geschrieben werden, kann anstelle von heartbeat_record.timestamp die Zeichenfolge data_change_record.commit_timestamp verwendet werden, um anzugeben, dass der Leser beim Lesen der Partition Fortschritte macht.

Sie können Heartbeat-Datensätze, die auf Partitionen zurückgegeben wurden, zur Synchronisierung verwenden. in allen Partitionen. Sobald alle Leser entweder eine Heartbeat größer oder gleich einem Zeitstempel A oder Daten oder untergeordnetes Element erhalten Partitionseinträge größer oder gleich dem Zeitstempel A sind, wissen die Leser, dass sie alle Datensätze, für die zum Zeitpunkt oder vor dem Zeitstempel A ein Commit durchgeführt wurde, und können beginnen Verarbeitung der zwischengespeicherten Datensätze, z. B. Sortieren der partitionsübergreifenden Datensätze nach Zeitstempel sortiert und nach server_transaction_id gruppiert.

Ein Heartbeat-Eintrag enthält nur ein Feld:

GoogleSQL

Feld Typ Beschreibung
timestamp TIMESTAMP Der Zeitstempel des Heartbeat-Eintrags.

PostgreSQL

Feld Typ Beschreibung
timestamp STRING Der Zeitstempel des Heartbeat-Eintrags.

Beispiel für einen Heartbeat-Eintrag, der angibt, dass alle Einträge mit Zeitstempeln, die kleiner oder gleich dem Zeitstempel dieses Eintrags sind, zurückgegeben wurden:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Datensätze für untergeordnete Partitionen

Ein untergeordneter Partitionseintrag gibt Informationen zu untergeordneten Partitionen zurück: ihre Partitionstokens, die Tokens ihrer übergeordneten Partitionen und den start_timestamp, der den frühesten Zeitstempel darstellt, den das untergeordnete Element enthalten Änderungseinträge. Einträge mit Commit-Zeitstempeln unmittelbar vor child_partitions_record.start_timestamp liegen, werden die in der aktuellen Partition zurückgegeben werden. Nachdem Sie alle untergeordneten Partitionen für diese Partition enthält, gibt diese Abfrage Erfolgsstatus, der angibt, dass alle Datensätze für diese -Partition an.

Die Felder eines untergeordneten Partitionseintrags enthalten Folgendes:

GoogleSQL

Feld Typ Beschreibung
start_timestamp TIMESTAMP Vom untergeordneten Element zurückgegebene Datenänderungseinträge Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel ist größer als oder gleich start_timestamp. Bei der Abfrage einer untergeordneten Partition sollte die Abfrage Geben Sie das untergeordnete Partitionstoken und einen start_timestamp größer oder gleich child_partitions_token.start_timestamp. Alle von einer Partition zurückgegebenen Datensätze für untergeordnete Partitionen haben denselben start_timestamp-Wert und der Zeitstempel liegt immer zwischen dem angegebenen start_timestamp und end_timestamp der Abfrage.
record_sequence STRING Eine kontinuierlich ansteigende Folge Zahl, mit der die Reihenfolge der wird der Datensatz von untergeordneten Partitionen aufgenommen, wenn mehrere Partitionsdatensätze, die mit demselben start_timestamp in einem einer bestimmten Partition. Das Partitionstoken, start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen.

PostgreSQL

Feld Typ Beschreibung
start_timestamp STRING Vom untergeordneten Element zurückgegebene Datenänderungseinträge Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel ist größer als oder gleich start_timestamp. Bei der Abfrage einer untergeordneten Partition muss die Abfrage das Token der untergeordneten Partition und einen start_timestamp-Wert enthalten, der größer oder gleich child_partitions_token.start_timestamp ist. Alle von einer Partition zurückgegebenen untergeordneten Partitionseinträge haben dieselbe start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Eine monoton steigende Sequenznummer, mit der die Reihenfolge des Datensatzes für untergeordnete Partitionen definiert werden kann, wenn in einer bestimmten Partition mehrere Datensätze für untergeordnete Partitionen mit demselben Startzeitstempel zurückgegeben werden. Das Partitionstoken, start_timestamp und record_sequence eindeutig identifiziert untergeordneten Partitionen ein.
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Gibt ein Array von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen.

Das folgende Beispiel zeigt einen untergeordneten Partitionseintrag:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Workflow für Abfrage von Änderungsstreams

Führen Sie Änderungsstreams-Abfragen mit der ExecuteStreamingSql API mit einer einmaligen schreibgeschützten Transaktion und einer starken Zeitstempelbindung aus. Die Änderung Mit der Stream-Lesefunktion können Sie start_timestamp und end_timestamp für den gewünschten Zeitraum. Auf alle Änderungsdatensätze innerhalb des Aufbewahrungszeitraums kann über die starke schreibgeschützte Zeitstempelbindung zugegriffen werden.

Alle anderen TransactionOptions sind für Änderungsstreamabfragen ungültig. Wenn TransactionOptions.read_only.return_read_timestamp auf „wahr“ festgelegt ist, wird in der Transaction-Nachricht, die die Transaktion beschreibt, anstelle eines gültigen Lesezeitstempels ein spezieller Wert von kint64max - 1 zurückgegeben. Dieser spezielle Wert sollte verworfen und für keine der für nachfolgende Abfragen.

Jede Abfrage des Änderungsstreams kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils entweder einen Datenänderungseintrag, einen Heartbeat-Eintrag oder untergeordnete Partitionen Datensatz. Es muss keine Frist für die Anfrage festgelegt werden.

Beispiel:

Der Workflow für Streamingabfragen beginnt mit der Ausgabe der allerersten Abfrage für den Änderungsstream. Geben Sie dazu partition_token bis NULL an. In der Abfrage muss die Lesefunktion für den Änderungsstream, den Start- und Endzeitstempel von Interesse und das Heartbeat-Intervall. Wenn end_timestamp NULL ist, gibt die Abfrage so lange Datenänderungen zurück, bis die Partition endet.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Datensätze von dieser Abfrage verarbeiten, bis die Datensätze der untergeordneten Partition zurückgegeben. Im Beispiel unten sind zwei untergeordnete Partitionseinträge und drei Partitionseinträge werden zurückgegeben, wird die Abfrage beendet. Untergeordnete Partitionseinträge aus einem bestimmte Abfrage hat immer dieselbe start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Wenn Sie zukünftige Änderungen nach dem 2022-05-01T09:00:01Z verarbeiten möchten, erstellen Sie drei neue Abfragen und führen Sie sie parallel aus. Die drei Abfragen zusammen geben Änderungen an den Daten für denselben Schlüsselbereich, den ihr übergeordnetes Element abdeckt. Immer die start_timestamp zu start_timestamp im selben untergeordneten Partitionseintrag und dieselben Werte für end_timestamp und Heartbeat-Intervall zur Verarbeitung der Datensätze verwenden bei allen Suchanfragen einheitlich.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Nach einer Weile wird die Abfrage für child_token_2 beendet, nachdem eine andere Abfrage untergeordneten Partitionseintrag. Dieser Eintrag zeigt an, dass eine neue Partition in zukünftigen Änderungen für child_token_2 und child_token_3 ab dem 2022-05-01T09:30:15Z Genau derselbe Datensatz wird von der Abfrage auf child_token_3 zurückgegeben, da beide die übergeordneten Partitionen der neuen child_token_4 sind. Um eine strikt geordnete Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, darf die Abfrage für child_token_4 erst gestartet werden, wenn alle übergeordneten Elemente abgeschlossen sind. In diesem Fall sind das child_token_2 und child_token_3. Nur eine Abfrage erstellen Für jedes untergeordnete Partitionstoken sollte im Design des Abfrage-Workflows ein Token festgelegt werden, Parent, um zu warten und die Abfrage für child_token_4 zu planen.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Beispiele für die Verarbeitung und Analyse von Änderungsstreameinträgen finden Sie in der Apache Beam SpannerIO. Dataflow-Connector aktiviert GitHub