Streampartitionen, Einträge und Abfragen ändern

Auf dieser Seite werden die folgenden Attribute von Änderungsstreams ausführlich beschrieben:

  • Das aufgeteilte Partitionierungsmodell
  • Format und Inhalt von Änderungsstreameinträgen
  • Die Low-Level-Syntax, die zum Abfragen dieser Datensätze verwendet wird
  • Beispiel für den Abfrageworkflow

Die Informationen auf dieser Seite sind am wichtigsten für die Verwendung der Spanner API, um Änderungsstreams direkt abzufragen. Anwendungen, die stattdessen Dataflow zum Lesen von Änderungsstreamdaten verwenden, müssen nicht direkt mit dem hier beschriebenen Datenmodell arbeiten.

Einen umfassenderen Leitfaden zu Änderungsstreams finden Sie unter Änderungsstreams – Übersicht.

Streampartitionen ändern

Wenn eine Änderung an einer Tabelle erfolgt, die von einem Änderungsstream beobachtet wird, schreibt Spanner einen entsprechenden Änderungsstream-Eintrag in der Datenbank, synchron in derselben Transaktion wie die Datenänderung. Dadurch wird sichergestellt, dass bei einer erfolgreichen Transaktion auch Spanner die Änderung erfolgreich erfasst und beibehalten hat. Spanner platziert den Änderungsstream-Eintrag und die Datenänderung intern so, dass sie von demselben Server verarbeitet werden, um den Schreibaufwand zu minimieren.

Im Rahmen der DML an einen bestimmten Split hängt Spanner den Schreibvorgang an die entsprechende Datenaufteilung des Änderungsstreams in derselben Transaktion an. Aufgrund dieser Colocation fügen Änderungsstreams keine zusätzliche Koordination über die Bereitstellungsressourcen hinweg hinzu, wodurch der Commit-Overhead für Transaktionen minimiert wird.

Image

Spanner skaliert, indem Daten basierend auf Datenbanklast und -größe dynamisch aufgeteilt und zusammengeführt sowie Splits auf Bereitstellungsressourcen verteilt werden.

Damit Änderungsstreams zum Skalieren schreiben und lesen können, teilt Spanner den internen Änderungsstreamspeicher mit den Datenbankdaten auf und führt ihn zusammen. Dadurch werden Hotspots automatisch vermieden. Damit Änderungsstreameinträge bei Skalierung der Datenbankschreibvorgänge nahezu in Echtzeit gelesen werden können, ist die Spanner API so konzipiert, dass ein Änderungsstream gleichzeitig mit Änderungsstreampartitionen abgefragt werden kann. Änderungsstreampartitionen werden den Änderungsstreamdatenaufteilungen zugeordnet, die die Änderungsstreameinträge enthalten. Die Partitionen eines Änderungsstreams ändern sich im Laufe der Zeit dynamisch und korrelieren damit, wie Spanner die Datenbankdaten dynamisch teilt und zusammenführt.

Eine Änderungsstreampartition enthält Datensätze für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Änderungsstreampartition kann in eine oder mehrere Änderungsstreampartitionen aufgeteilt oder mit anderen Änderungsstreampartitionen zusammengeführt werden. Wenn diese Aufteilungs- oder Zusammenführungsereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für ihre jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum zu erfassen. Zusätzlich zu Datenänderungseinträgen gibt eine Änderungsstream-Abfrage Datensätze der untergeordneten Partition zurück, um Leser über neue abzufragende Änderungsstreampartitionen zu informieren, sowie Heartbeat-Datensätze, um den Vorwärtsfortschritt anzuzeigen, wenn in letzter Zeit keine Schreibvorgänge aufgetreten sind.

Beim Abfragen einer bestimmten Änderungsstreampartition werden die Änderungseinträge in der Reihenfolge des Commit-Zeitstempels zurückgegeben. Jeder Änderungsdatensatz wird genau einmal zurückgegeben. Die Reihenfolge von Änderungseinträgen ist in den Änderungsstreampartitionen nicht garantiert. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.

Aufgrund der Herkunft der über- und untergeordneten Partitionen sollten Änderungen an einem bestimmten Schlüssel in der Reihenfolge des Commit-Zeitstempels erst verarbeitet werden, nachdem die Datensätze aller übergeordneten Partitionen verarbeitet wurden.

Stream-Lesefunktionen und Abfragesyntax ändern

GoogleSQL

Änderungsstreams werden mit der ExecuteStreamingSql API abgefragt. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Funktion „read“ bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet READ_change_stream_name.

Wenn der Änderungsstream SingersNameStream in der Datenbank vorhanden ist, lautet die Abfragesyntax für GoogleSQL so:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

Die read-Funktion akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp TIMESTAMP Erforderlich Gibt an, dass Datensätze mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Zeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein.
end_timestamp TIMESTAMP Optional (Standardeinstellung: NULL) Gibt an, dass Datensätze mit commit_timestamp kleiner oder gleich end_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und größer oder gleich start_timestamp sein. Die Abfrage ist entweder beendet, nachdem alle ChangeRecords bis end_timestamp zurückgegeben wurden oder der Nutzer die Verbindung beendet. Ist NULL oder nicht angegeben, wird die Abfrage ausgeführt, bis alle ChangeRecords zurückgegeben sind oder der Nutzer die Verbindung beendet.
partition_token STRING Optional (Standardeinstellung: NULL) Gibt an, welche Änderungsstreampartition basierend auf dem Inhalt der Einträge untergeordneter Partitionen abgefragt werden soll. Wenn NULL oder nicht angegeben ist, fragt der Leser zum ersten Mal den Änderungsstream ab und hat keine bestimmten Partitionstokens abgerufen, von denen er abgefragt werden kann.
heartbeat_milliseconds INT64 Erforderlich Bestimmt, wie häufig ein Heartbeat-ChangeRecord zurückgegeben wird, falls in dieser Partition keine Transaktionen mit Commit durchgeführt wurden. Der Wert muss zwischen 1,000 (eine Sekunde) und 30,0000 (fünf Minuten) liegen.
read_options ARRAY Optional (Standardeinstellung: NULL) Zusätzliche Leseoptionen, die für eine zukünftige Verwendung reserviert sind. Der einzige zulässige Wert ist derzeit NULL.

Wir empfehlen, wie im folgenden Beispiel eine praktische Methode zum Erstellen des Textes der Lesefunktionsabfrage und zum Binden von Parametern zu erstellen.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Änderungsstreams werden mit der ExecuteStreamingSql API abgefragt. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Funktion „read“ bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet spanner.read_json_change_stream_name.

Wenn der Änderungsstream SingersNameStream in der Datenbank vorhanden ist, sieht die Abfragesyntax für PostgreSQL so aus:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

Die read-Funktion akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp timestamp with time zone Erforderlich Gibt an, dass Änderungseinträge mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Zeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein.
end_timestamp timestamp with timezone Optional (Standardeinstellung: NULL) Gibt an, dass Änderungseinträge mit commit_timestamp kleiner oder gleich end_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und größer oder gleich start_timestamp sein. Die Abfrage wird abgeschlossen, nachdem alle Änderungseinträge bis end_timestamp zurückgegeben wurden oder der Nutzer die Verbindung beendet. Bei NULL wird die Abfrage ausgeführt, bis alle Änderungseinträge zurückgegeben werden oder der Nutzer die Verbindung beendet.
partition_token text Optional (Standardeinstellung: NULL) Gibt an, welche Änderungsstreampartition basierend auf dem Inhalt der Einträge untergeordneter Partitionen abgefragt werden soll. Wenn NULL oder nicht angegeben ist, fragt der Leser zum ersten Mal den Änderungsstream ab und hat keine bestimmten Partitionstokens abgerufen, von denen er abgefragt werden kann.
heartbeat_milliseconds bigint Erforderlich Bestimmt, wie häufig ein Heartbeat-ChangeRecord zurückgegeben wird, falls in dieser Partition keine Transaktionen mit Commit durchgeführt wurden. Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf Minuten) liegen.
null null Erforderlich Für zukünftige Verwendung reserviert

Wir empfehlen, wie im folgenden Beispiel eine praktische Methode zum Erstellen des Textes der Lesefunktion und zum Binden von Parametern zu erstellen.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Format des Stream-Eintrags ändern

GoogleSQL

Die Lesefunktion „Änderungsstreams“ gibt eine einzelne ChangeRecord-Spalte vom Typ ARRAY<STRUCT<...>> zurück. Dieses Array in jeder Zeile enthält immer ein einzelnes Element.

Die Array-Elemente haben den folgenden Typ:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Diese Struktur enthält drei Felder: data_change_record, heartbeat_record und child_partitions_record, jeweils vom Typ ARRAY<STRUCT<...>>. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden Felder sind leer oder NULL. Diese Array-Felder enthalten höchstens ein Element.

In den folgenden Abschnitten werden diese drei Eintragstypen genauer untersucht.

PostgreSQL

Die Lesefunktion des Änderungsstreams gibt eine einzelne ChangeRecord-Spalte vom Typ JSON mit folgender Struktur zurück:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

In diesem Objekt gibt es drei mögliche Schlüssel: data_change_record, heartbeat_record und child_partitions_record. Der entsprechende Werttyp ist JSON. In jeder Zeile, die die Änderungsstream-Lesefunktion zurückgibt, ist nur einer dieser drei Schlüssel vorhanden.

In den folgenden Abschnitten werden diese drei Eintragstypen genauer untersucht.

Datensätze zu Datenänderungen

Ein Datenänderungseintrag enthält eine Reihe von Änderungen an einer Tabelle mit demselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), der für denselben Commit-Zeitstempel in einer Änderungsstreampartition für dieselbe Transaktion durchgeführt wird. Für dieselbe Transaktion können mehrere Datenänderungseinträge über mehrere Änderungsstreampartitionen zurückgegeben werden.

Alle Datenänderungseinträge haben die Felder commit_timestamp, server_transaction_id und record_sequence, die zusammen die Reihenfolge im Änderungsstream für einen Streamdatensatz festlegen. Diese drei Felder reichen aus, um die Reihenfolge von Änderungen abzuleiten und externe Konsistenz zu bieten.

Beachten Sie, dass mehrere Transaktionen denselben Commit-Zeitstempel haben können, wenn sie nicht überlappende Daten berühren. Mit dem Feld server_transaction_id lässt sich unterscheiden, welche Änderungen (möglicherweise über Änderungsstreampartitionen) innerhalb derselben Transaktion vorgenommen wurden. Wenn Sie sie mit den Feldern record_sequence und number_of_records_in_transaction kombinieren, können Sie auch alle Datensätze einer bestimmten Transaktion puffern und anordnen.

Die Felder eines Datenänderungseintrags umfassen Folgendes:

GoogleSQL

Feld Typ Beschreibung
commit_timestamp TIMESTAMP Der Zeitstempel, zu dem für die Änderung ein Commit durchgeführt wurde.
record_sequence STRING Die Sequenznummer für den Datensatz innerhalb der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und monoton ansteigen (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze für dieselbe server_transaction_id nach record_sequence, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der für die Änderung ein Commit durchgeführt wurde. Der Wert sollte nur im Kontext der Verarbeitung von Änderungsstreameinträgen verwendet werden und steht nicht in Beziehung mit der Transaktions-ID in der Spanner API.
is_last_record_in_transaction_in_partition BOOL Gibt an, ob dies der letzte Eintrag für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist
value_capture_type STRING

Beschreibt den Werterfassungstyp, der bei der Erfassung dieser Änderung in der Konfiguration des Änderungsstreams angegeben wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES" sein. Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen finden Sie unter Werterfassungstypen.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte, wie im Schema definiert („ordinal_position“). Die erste Spalte einer Tabelle im Schema hätte die Ordinalposition „1“. Der Spaltentyp kann für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur, die in der Spanner API-Referenz beschrieben wird.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Beschreibt die vorgenommenen Änderungen, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder nachverfolgten Spalten. Die Verfügbarkeit und der Inhalt der alten und neuen Werte hängen vom konfigurierten Werterfassungstyp (Wert-Erfassungstyp) ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Die Anzahl der Datenänderungseinträge, die Teil dieser Transaktion in allen Änderungsstreampartitionen sind.
number_of_partitions_in_transaction INT64 Die Anzahl der Partitionen, die Datenänderungseinträge für diese Transaktion zurückgeben.
transaction_tag STRING Ein Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOL Gibt an, ob die Transaktion eine Systemtransaktion ist.

PostgreSQL

Feld Typ Beschreibung
commit_timestamp STRING Der Zeitstempel, zu dem für die Änderung ein Commit durchgeführt wurde.
record_sequence STRING Die Sequenznummer für den Datensatz innerhalb der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und monoton ansteigen (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze für dieselbe „server_transaction_id“ nach „record_Sequence“, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der für die Änderung ein Commit durchgeführt wurde. Der Wert sollte nur im Kontext der Verarbeitung von Änderungsstreameinträgen verwendet werden und steht nicht in Beziehung mit der Transaktions-ID in der Spanner API
is_last_record_in_transaction_in_partition BOOLEAN Gibt an, ob dies der letzte Eintrag für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist
value_capture_type STRING

Beschreibt den Werterfassungstyp, der bei der Erfassung dieser Änderung in der Konfiguration des Änderungsstreams angegeben wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES" sein. Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen finden Sie unter Werterfassungstypen.

column_types

[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte, wie im Schema definiert („ordinal_position“). Die erste Spalte einer Tabelle im Schema hätte die Ordinalposition „1“. Der Spaltentyp kann für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur, die in der Spanner API-Referenz beschrieben wird.
mods

[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Beschreibt die vorgenommenen Änderungen, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder verfolgten Spalten. Die Verfügbarkeit und der Inhalt der alten und neuen Werte hängen vom konfigurierten Werterfassungstyp (Wert-Erfassungstyp) ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Die Anzahl der Datenänderungseinträge, die Teil dieser Transaktion in allen Änderungsstreampartitionen sind.
number_of_partitions_in_transaction NUMBER Die Anzahl der Partitionen, die Datenänderungseinträge für diese Transaktion zurückgeben.
transaction_tag STRING Ein Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOLEAN Gibt an, ob die Transaktion eine Systemtransaktion ist.

Es folgt ein Paar beispielhafter Datenänderungseinträge. Sie beschreiben eine einzelne Transaktion, bei der eine Übertragung zwischen zwei Konten stattfindet. Die beiden Konten befinden sich in separaten Änderungsstream-Partitionen.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Werterfassungstyp "NEW_VALUES". Es werden nur neue Werte eingefügt. Es wurde nur die Spalte "LastUpdate" geändert, sodass nur diese Spalte zurückgegeben wurde.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Werterfassungstyp "NEW_ROW". Nur die Spalte "LastUpdate" wurde geändert, aber alle erfassten Spalten werden zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Werterfassungstyp "NEW_ROW_AND_OLD_VALUES". Nur die Spalte "LastUpdate" wurde geändert, aber alle erfassten Spalten werden zurückgegeben. Mit diesem Werterfassungstyp werden der neue und der alte Wert von LastUpdate erfasst.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Heartbeat-Rekorde

Wenn ein Heartbeat-Datensatz zurückgegeben wird, bedeutet dies, dass alle Änderungen mit commit_timestamp, die kleiner oder gleich dem timestamp des Heartbeat-Datensatzes sind, zurückgegeben wurden. Zukünftige Datensätze in dieser Partition müssen höhere Commit-Zeitstempel haben als die, die vom Heartbeat-Datensatz zurückgegeben werden. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben wurden. Wenn Datenänderungen in die Partition geschrieben werden, kann data_change_record.commit_timestamp anstelle von heartbeat_record.timestamp verwendet werden, um zu signalisieren, dass der Leser beim Lesen der Partition voranschreitet.

Sie können Heartbeat-Einträge verwenden, die für Partitionen zurückgegeben werden, um Leser in allen Partitionen zu synchronisieren. Sobald alle Leser entweder einen Herzschlag größer oder gleich einem Zeitstempel A oder Daten oder untergeordnete Partitionseinträge empfangen haben, die größer oder gleich dem Zeitstempel A sind, wissen die Leser, dass sie alle Datensätze empfangen haben, die am oder vor diesem Zeitstempel A festgeschrieben sind, und mit der Verarbeitung der zwischengespeicherten Datensätze beginnen, z. B. indem die partitionierten Datensätze nach Zeitstempel sortiert und nach server_transaction_id gruppiert werden.

Ein Heartbeat-Eintrag enthält nur ein Feld:

GoogleSQL

Feld Typ Beschreibung
timestamp TIMESTAMP Der Zeitstempel des Heartbeat-Eintrags.

PostgreSQL

Feld Typ Beschreibung
timestamp STRING Der Zeitstempel des Heartbeat-Eintrags.

Ein Beispiel für einen Heartbeat-Datensatz, der angibt, dass alle Datensätze mit Zeitstempeln zurückgegeben wurden, die kleiner oder gleich dem Zeitstempel dieses Eintrags sind:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Einträge für untergeordnete Partitionen

Ein Datensatz untergeordneter Partitionen gibt Informationen über untergeordnete Partitionen zurück: die zugehörigen Partitionstokens, die Tokens der übergeordneten Partitionen und die start_timestamp für den frühesten Zeitstempel, für den die untergeordneten Partitionen Änderungseinträge enthalten. Datensätze, deren Commit-Zeitstempel unmittelbar vor dem child_partitions_record.start_timestamp liegen, werden in der aktuellen Partition zurückgegeben. Nachdem alle Datensätze der untergeordneten Partitionen für diese Partition zurückgegeben wurden, gibt diese Abfrage einen Erfolgsstatus zurück, der angibt, dass alle Datensätze für diese Partition zurückgegeben wurden.

Zu den Feldern eines untergeordneten Partitionseintrags gehören:

GoogleSQL

Feld Typ Beschreibung
start_timestamp TIMESTAMP Datenänderungseinträge, die von untergeordneten Partitionen in diesem Datensatz der untergeordneten Partition zurückgegeben werden, haben einen Commit-Zeitstempel, der größer oder gleich start_timestamp ist. Beim Abfragen einer untergeordneten Partition sollten das Token der untergeordneten Partition und ein start_timestamp größer oder gleich child_partitions_token.start_timestamp angegeben werden. Alle Datensätze untergeordneter Partitionen, die von einer Partition zurückgegeben werden, haben denselben start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Eine monoton ansteigende Sequenznummer, mit der die Reihenfolge der Datensätze untergeordneter Partitionen definiert werden kann, wenn mehrere Datensätze der untergeordneten Partitionen mit demselben start_timestamp in einer bestimmten Partition zurückgegeben werden. Das Partitionstoken start_timestamp und record_sequence identifiziert einen untergeordneten Partitionseintrag eindeutig.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitionstoken-String, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens ihrer übergeordneten Partitionen.

PostgreSQL

Feld Typ Beschreibung
start_timestamp STRING Datenänderungseinträge, die von untergeordneten Partitionen in diesem Datensatz der untergeordneten Partitionen zurückgegeben werden, haben einen Commit-Zeitstempel größer oder gleich start_timestamp. Beim Abfragen einer untergeordneten Partition müssen in der Abfrage das Token der untergeordneten Partition und ein start_timestamp angegeben werden, der größer oder gleich child_partitions_token.start_timestamp ist. Alle Datensätze der untergeordneten Partitionen, die von einer Partition zurückgegeben werden, haben denselben start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Eine monoton ansteigende Sequenznummer, mit der die Reihenfolge der Datensätze untergeordneter Partitionen definiert werden kann, wenn mehrere Datensätze der untergeordneten Partitionen mit demselben start_timestamp in einer bestimmten Partition zurückgegeben werden. Das Partitionstoken start_timestamp und record_sequence identifiziert einen untergeordneten Partitionseintrag eindeutig.
child_partitions

[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Gibt ein Array untergeordneter Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitionstoken-String, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens ihrer übergeordneten Partitionen.

Das folgende Beispiel zeigt einen Datensatz für eine untergeordnete Partition:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Abfrageworkflow für Streams ändern

Führen Sie Änderungsstreamabfragen mit der ExecuteStreamingSql API mit einer schreibgeschützten Transaktion zur einmaligen Verwendung und einer starken Zeitstempelbindung aus. Mit der Funktion zum Lesen des Änderungsstreams können Sie start_timestamp und end_timestamp für den gewünschten Zeitraum angeben. Alle Änderungseinträge innerhalb der Aufbewahrungsdauer sind über die starke schreibgeschützte Zeitstempelgrenze zugänglich.

Alle anderen TransactionOptions sind für Änderungsstreamabfragen ungültig. Wenn TransactionOptions.read_only.return_read_timestamp auf „true“ gesetzt ist, wird außerdem in der Transaction-Nachricht, die die Transaktion beschreibt, anstelle eines gültigen Lesezeitstempels ein spezieller Wert von kint64max - 1 zurückgegeben. Dieser spezielle Wert sollte verworfen und nicht für nachfolgende Abfragen verwendet werden.

Jede Änderungsstreamabfrage kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils entweder einen Datenänderungseintrag, einen Heartbeat-Eintrag oder einen Datensatz für untergeordnete Partitionen enthalten. Für die Anfrage muss keine Frist festgelegt werden.

Beispiel:

Der Workflow für Streamingabfragen beginnt mit der Ausgabe der allerersten Änderungsstreamabfrage. Dazu wird partition_token als NULL angegeben. Die Abfrage muss die Lesefunktion für den Änderungsstream, den gewünschten Start- und Endzeitstempel und das Heartbeat-Intervall angeben. Wenn end_timestamp den Wert NULL hat, gibt die Abfrage so lange Datenänderungen zurück, bis die Partition endet.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Verarbeiten Sie Datensätze aus dieser Abfrage, bis Datensätze der untergeordneten Partition zurückgegeben werden. Im folgenden Beispiel werden zwei Datensätze der untergeordneten Partition und drei Partitionstokens zurückgegeben. Die Abfrage wird dann beendet. Datensätze der untergeordneten Partition aus einer bestimmten Abfrage haben immer denselben start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Damit zukünftige Änderungen nach 2022-05-01T09:00:01Z verarbeitet werden können, müssen Sie drei neue Abfragen erstellen und parallel ausführen. Die drei Abfragen geben zusammen zukünftige Datenänderungen für denselben Schlüsselbereich zurück, den ihr übergeordnetes Element abdeckt. Setzen Sie start_timestamp immer auf den start_timestamp im selben untergeordneten Partitionseintrag und verwenden Sie dasselbe end_timestamp und dasselbe Heartbeat-Intervall, um die Datensätze über alle Abfragen hinweg konsistent zu verarbeiten.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Nachdem ein weiterer untergeordneter Partitionseintrag zurückgegeben wurde, wird die Abfrage für child_token_2 eine Weile abgeschlossen. Dieser Eintrag zeigt an, dass eine neue Partition zukünftige Änderungen für child_token_2 und child_token_3 ab 2022-05-01T09:30:15Z übernimmt. Die Abfrage für child_token_3 gibt genau denselben Eintrag zurück, da beide Partitionen die übergeordneten Partitionen des neuen child_token_4 sind. Um eine strikt geordnete Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, darf die Abfrage für child_token_4 erst beginnen, nachdem alle übergeordneten Elemente abgeschlossen sind. In diesem Fall sind das child_token_2 und child_token_3. Erstellen Sie nur eine Abfrage für jedes untergeordnete Partitionstoken. Das Design des Abfrageworkflows sollte ein übergeordnetes Element bestimmen, das warten und die Abfrage für child_token_4 planen soll.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Beispiele zum Umgang und Parsen von Änderungsstreameinträgen im Apache Beam SpannerIO-Dataflow-Connector auf GitHub finden