Änderungsstream-Partitionen, -Datensätze und -Abfragen

Auf dieser Seite werden Änderungsstreams in Spanner für Datenbanken mit Google SQL- und PostgreSQL-Dialekt beschrieben. Dazu gehören:

  • Das splitbasierte Partitionierungsmodell
  • Format und Inhalt von Änderungsstream-Eintrags
  • Die Low-Level-Syntax, die zum Abfragen dieser Einträge verwendet wird
  • Beispiel für den Abfragevorgang

Mit der Spanner API können Sie Änderungsstreams direkt abfragen. Bei Anwendungen, die stattdessen Dataflow zum Lesen von Änderungsstreamdaten verwenden, muss nicht direkt mit dem hier beschriebenen Datenmodell gearbeitet werden.

Eine umfassende Einführung in Änderungsstreams finden Sie unter Änderungsstreams – Übersicht.

Streampartitionen ändern

Wenn eine Änderung an einer Tabelle auftritt, die von einem Änderungsstream beobachtet wird, schreibt Spanner einen entsprechenden Änderungsstream-Eintrag synchron in derselben Transaktion wie die Datenänderung in die Datenbank. Wenn die Transaktion erfolgreich ist, hat Spanner die Änderung also auch erfolgreich erfasst und gespeichert. Intern platziert Spanner den Änderungsstream-Eintrag und die Datenänderung an derselben Stelle, damit sie vom selben Server verarbeitet werden, um den Schreibaufwand zu minimieren.

Im Rahmen der DML für eine bestimmte Teilung hängt Spanner den Schreibvorgang in derselben Transaktion an den entsprechenden Datensplit des Änderungsstreams an. Aufgrund dieser Co-Lokation erfordern Änderungsstreams keine zusätzliche Koordination zwischen den Bereitstellungsressourcen, wodurch der Transaktions-Commit-Overhead minimiert wird.

Image

Spanner skaliert, indem Daten dynamisch basierend auf der Datenbanklast und ‑größe aufgeteilt und zusammengeführt und die Aufteilungen auf Bereitstellungsressourcen verteilt werden.

Damit Schreib- und Lesevorgänge für Änderungsstreams skaliert werden können, teilt und verschmilzt Spanner den internen Speicher für Änderungsstreams mit den Datenbankdaten und vermeidet so automatisch Hotspots. Um das Lesen von Änderungsstream-Einträgen nahezu in Echtzeit zu unterstützen, wenn Datenbankeinträge skaliert werden, ist die Spanner API so konzipiert, dass ein Änderungsstream gleichzeitig mithilfe von Änderungsstream-Partitionen abgefragt werden kann. Änderungsstream-Partitionen werden Änderungsstream-Datensplits zugeordnet, die die Änderungsstream-Datensätze enthalten. Die Partitionen eines Änderungsstreams ändern sich dynamisch im Laufe der Zeit und hängen davon ab, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.

Eine Änderungsstream-Partition enthält Einträge für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Change-Stream-Partition kann in eine oder mehrere Change-Stream-Partitionen aufgeteilt oder mit anderen Change-Stream-Partitionen zusammengeführt werden. Wenn diese Ereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für die jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum zu erfassen. Neben Datenänderungssätzen gibt eine Änderungsstreamabfrage auch untergeordnete Partitionssätze zurück, um Leser über neue Änderungsstreampartitionen zu informieren, die abgefragt werden müssen, sowie Heartbeat-Einträge, um den Fortschritt anzuzeigen, wenn in letzter Zeit keine Schreibvorgänge stattgefunden haben.

Wenn Sie eine bestimmte Change-Stream-Partition abfragen, werden die Änderungssätze in der Reihenfolge des Commit-Zeitstempels zurückgegeben. Jeder Änderungseintrag wird genau einmal zurückgegeben. Die Reihenfolge der Änderungsdatensätze ist nicht für alle Änderungsstream-Partitionen garantiert. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.

Aufgrund der übergeordneten und untergeordneten Partitionsabstammung müssen Einträge, die von untergeordneten Partitionen zurückgegeben werden, erst verarbeitet werden, nachdem Einträge aus allen übergeordneten Partitionen verarbeitet wurden, damit Änderungen für einen bestimmten Schlüssel in der Reihenfolge des Commit-Zeitstempels verarbeitet werden können.

Funktionen für Änderungsstream-Lesevorgänge und Abfragesyntax

Verwenden Sie die ExecuteStreamingSql API, um Änderungsstreams abzufragen. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet READ_change_stream_name.

Angenommen, in der Datenbank ist ein Änderungsstream SingersNameStream vorhanden, lautet die Abfragesyntax für GoogleSQL:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

Die Funktion „read“ akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich? Beschreibung
start_timestamp TIMESTAMP Erforderlich Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb des Zeitfensters für die Aufbewahrung des Änderungsstreams liegen und kleiner oder gleich der aktuellen Uhrzeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein.
end_timestamp TIMESTAMP Optional (Standard: NULL) Gibt an, dass Einträge mit einem commit_timestamp-Wert zurückgegeben werden sollen, der kleiner oder gleich end_timestamp ist. Der Wert muss innerhalb des Zeitraums für die Aufbewahrung des Änderungsstreams liegen und größer oder gleich start_timestamp sein. Die Abfrage wird entweder beendet, nachdem alle ChangeRecords bis zur end_timestamp zurückgegeben wurden, oder wenn Sie die Verbindung beenden. Wenn end_timestamp auf NULL festgelegt ist oder nicht angegeben ist, wird die Ausführung der Abfrage fortgesetzt, bis alle ChangeRecords zurückgegeben wurden oder Sie die Verbindung beenden.
partition_token STRING Optional (Standard: NULL) Gibt an, welche Änderungsstream-Partition abgefragt werden soll, basierend auf dem Inhalt der Datensätze der untergeordneten Partitionen. Wenn NULL oder nichts angegeben ist, bedeutet das, dass der Leser den Änderungsstream zum ersten Mal abfragt und keine bestimmten Partitionstokens für die Abfrage erhalten hat.
heartbeat_milliseconds INT64 Erforderlich Bestimmt, wie oft ein Heartbeat ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen verbindlich gemacht wurden.

Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf Minuten) liegen.
read_options ARRAY Optional (Standard: NULL) Es werden Leseoptionen hinzugefügt, die für die zukünftige Verwendung reserviert sind. Der einzige zulässige Wert ist NULL.

Wir empfehlen, eine Hilfsmethode zum Erstellen des Textes der Abfrage der Lesefunktion zu erstellen und Parameter daran zu binden, wie im folgenden Beispiel gezeigt.

Java
    private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
    "SELECT ChangeRecord FROM READ_SingersNameStream"
        + "("
        + "   start_timestamp => @startTimestamp,"
        + "   end_timestamp => @endTimestamp,"
        + "   partition_token => @partitionToken,"
        + "   heartbeat_milliseconds => @heartbeatMillis"
        + ")";

    // Helper method to conveniently create change stream query texts and
    // bind parameters.
    public static Statement getChangeStreamQuery(
          String partitionToken,
          Timestamp startTimestamp,
          Timestamp endTimestamp,
          long heartbeatMillis) {
      return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                        .bind("startTimestamp")
                        .to(startTimestamp)
                        .bind("endTimestamp")
                        .to(endTimestamp)
                        .bind("partitionToken")
                        .to(partitionToken)
                        .bind("heartbeatMillis")
                        .to(heartbeatMillis)
                        .build();
    }
    

Verwenden Sie die ExecuteStreamingSql API, um Änderungsstreams abzufragen. Spanner erstellt zusammen mit dem Änderungsstream automatisch eine spezielle Lesefunktion. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für Lesefunktionen lautet spanner.read_json_change_stream_name.

Angenommen, in der Datenbank ist ein Änderungsstream SingersNameStream vorhanden, lautet die Abfragesyntax für PostgreSQL:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

Die Funktion „read“ akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich? Beschreibung
start_timestamp timestamp with time zone Erforderlich Gibt an, dass Änderungseinträge mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Uhrzeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein.
end_timestamp timestamp with timezone Optional (Standard: NULL) Gibt an, dass Änderungseinträge mit einem commit_timestamp-Wert kleiner oder gleich end_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb des Zeitraums für die Aufbewahrung des Änderungsstreams liegen und größer oder gleich start_timestamp sein. Die Abfrage wird entweder beendet, nachdem alle Änderungseinträge bis zum end_timestamp zurückgegeben wurden, oder sobald Sie die Verbindung beenden. Bei NULL wird die Abfrage fortgesetzt, bis alle Änderungseinträge zurückgegeben wurden oder Sie die Verbindung beenden.
partition_token text Optional (Standard: NULL) Gibt an, welche Änderungsstream-Partition abgefragt werden soll, basierend auf dem Inhalt der Datensätze der untergeordneten Partitionen. Wenn NULL oder nichts angegeben ist, bedeutet das, dass der Leser den Änderungsstream zum ersten Mal abfragt und keine bestimmten Partitionstokens für die Abfrage erhalten hat.
heartbeat_milliseconds bigint Erforderlich Bestimmt, wie oft ein Heartbeat ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen verbindlich gemacht wurden. Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf Minuten) liegen.
null null Erforderlich Reserviert für zukünftige Verwendung

Wir empfehlen, eine Hilfsmethode zum Erstellen des Texts der Lesefunktion zu erstellen und Parameter daran zu binden, wie im folgenden Beispiel gezeigt.

Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and
// bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Format von Streams-Eintrags ändern

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte vom Typ ARRAY<STRUCT<...>> zurück. In jeder Zeile enthält dieses Array immer ein einzelnes Element.

Die Arrayelemente haben folgenden Typ:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Diese STRUCT enthält drei Felder: data_change_record, heartbeat_record und child_partitions_record, alle vom Typ ARRAY<STRUCT<...>>. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder enthalten NULL. Diese Arrayfelder enthalten höchstens ein Element.

In den folgenden Abschnitten werden die einzelnen Datensatztypen beschrieben.

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte vom Typ JSON mit der folgenden Struktur zurück:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Dieses Objekt hat drei mögliche Schlüssel: data_change_record, heartbeat_record und child_partitions_record. Der entsprechende Werttyp ist JSON. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, ist nur einer dieser drei Schlüssel vorhanden.

In den folgenden Abschnitten werden die einzelnen Datensatztypen beschrieben.

Datenänderungseinträge

Ein Datenänderungssatz enthält eine Reihe von Änderungen an einer Tabelle mit demselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), die zum selben Commit-Zeitstempel in einer Change-Stream-Partition für dieselbe Transaktion committet wurden. Für dieselbe Transaktion können mehrere Datenänderungseinträge über mehrere Änderungsstreampartitionen zurückgegeben werden.

Alle Datenänderungsdatensätze haben die Felder commit_timestamp, server_transaction_id und record_sequence, die zusammen die Reihenfolge im Änderungsstream für einen Streamdatensatz bestimmen. Diese drei Felder reichen aus, um die Reihenfolge der Änderungen abzuleiten und für externe Konsistenz zu sorgen.

Hinweis: Mehrere Transaktionen können denselben Commit-Zeitstempel haben, wenn sie sich nicht überschneidende Daten betreffen. Mit dem Feld server_transaction_id können Sie unterscheiden, welche Änderungen (potenziell über mehrere Änderungsstream-Partitionen hinweg) innerhalb derselben Transaktion vorgenommen wurden. In Kombination mit den Feldern record_sequence und number_of_records_in_transaction können Sie auch alle Einträge einer bestimmten Transaktion puffern und sortieren.

Die Felder eines Datenänderungs-Eintrags umfassen Folgendes:

Feld Typ Beschreibung
commit_timestamp TIMESTAMP Der Zeitstempel, zu dem die Änderung vorgenommen wurde.
record_sequence STRING Gibt die Sequenznummer für den Datensatz innerhalb der Transaktion an. Sequenznummern sind innerhalb einer Transaktion eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Einträge für dieselbe server_transaction_id nach record_sequence, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren. Spanner kann diese Sortierung für eine bessere Leistung optimieren und sie entspricht möglicherweise nicht immer der von Ihnen angegebenen ursprünglichen Sortierung.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung per Commit übergeben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert.
is_last_record_in_transaction_in_partition BOOL Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Typ der Werterfassung, der in der Änderungsstreamkonfiguration angegeben wurde, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann einer der folgenden sein:

  • OLD_AND_NEW_VALUES
  • NEW_ROW
  • NEW_VALUES
  • NEW_ROW_AND_OLD_VALUES

Der Standardwert ist OLD_AND_NEW_VALUES. Weitere Informationen finden Sie unter Arten der Werterfassung.

column_types
[
  {
      "name": "STRING",
      "type": {
        "code": "STRING"
      },
      "is_primary_key": BOOLEAN
      "ordinal_position": NUMBER
    },
    ...
]
Gibt den Namen der Spalte, den Spaltentyp, ob es sich um einen Primärschlüssel handelt, und die Position der Spalte gemäß Definition im Schema (ordinal_position) an. Die erste Spalte einer Tabelle im Schema hat die Ordnungsposition 1. Der Spaltentyp kann bei Arrayspalten verschachtelt sein. Das Format entspricht der in der Spanner API-Referenz beschriebenen Typenstruktur.
mods
[
  {
    "keys": {"STRING" : "STRING"},
    "new_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
    "old_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
  },
  [...]
]
Hier werden die vorgenommenen Änderungen beschrieben, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder erfassten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen von der konfigurierten value_capture_type ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Gibt die Anzahl der Datenänderungs-Datensätze an, die Teil dieser Transaktion in allen Änderungsstream-Partitionen sind.
number_of_partitions_in_transaction INT64 Gibt die Anzahl der Partitionen an, die Datenänderungsdatensätze für diese Transaktion zurückgeben.
transaction_tag STRING Das Transaktions-Tag, das mit dieser Transaktion verknüpft ist.
is_system_transaction BOOL Gibt an, ob es sich bei der Transaktion um eine Systemtransaktion handelt.
Feld Typ Beschreibung
commit_timestamp STRING Der Zeitstempel, zu dem die Änderung übernommen wurde.
record_sequence STRING Gibt die Sequenznummer für den Datensatz innerhalb der Transaktion an. Sequenznummern sind innerhalb einer Transaktion eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Einträge für dieselbe server_transaction_id nach record_sequence, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung per Commit übergeben wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API korreliert.
is_last_record_in_transaction_in_partition BOOLEAN Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Der Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Typ der Werterfassung, der in der Änderungsstreamkonfiguration angegeben wurde, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann einer der folgenden sein:

  • OLD_AND_NEW_VALUES
  • NEW_ROW
  • NEW_VALUES
  • NEW_ROW_AND_OLD_VALUES

Der Standardwert ist OLD_AND_NEW_VALUES. Weitere Informationen finden Sie unter Arten der Werterfassung.

column_types
[
  {
      "name": "STRING",
      "type": {
        "code": "STRING"
      },
      "is_primary_key": BOOLEAN
      "ordinal_position": NUMBER
    },
    ...
]
Gibt den Namen der Spalte, den Spaltentyp, ob es sich um einen Primärschlüssel handelt, und die Position der Spalte gemäß Definition im Schema (ordinal_position) an. Die erste Spalte einer Tabelle im Schema hat die Ordnungsposition 1. Der Spaltentyp kann bei Arrayspalten verschachtelt sein. Das Format entspricht der in der Spanner API-Referenz beschriebenen Typenstruktur.
mods
[
  {
    "keys": {"STRING" : "STRING"},
    "new_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
    "old_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
  },
  [...]
]
Hier werden die vorgenommenen Änderungen beschrieben, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder erfassten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten value_capture_type ab. Die Felder new_values und old_values enthalten nur die nicht primären Spalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Gibt die Anzahl der Datenänderungs-Datensätze an, die Teil dieser Transaktion in allen Änderungsstream-Partitionen sind.
number_of_partitions_in_transaction NUMBER Gibt die Anzahl der Partitionen an, die Datenänderungsdatensätze für diese Transaktion zurückgeben.
transaction_tag STRING Das Transaktions-Tag, das mit dieser Transaktion verknüpft ist.
is_system_transaction BOOLEAN Gibt an, ob es sich bei der Transaktion um eine Systemtransaktion handelt.

Beispiel für einen Datenänderungsdatensatz

Im Folgenden finden Sie zwei Beispieldatensätze für Datenänderungen. Sie beschreiben eine einzelne Transaktion, bei der eine Überweisung zwischen zwei Konten erfolgt. Die beiden Konten befinden sich in separaten Partitionen für Änderungsstreams.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp NEW_VALUES. Es werden nur neue Werte eingefügt. Da nur die Spalte LastUpdate geändert wurde, wurde nur diese Spalte zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp NEW_ROW. Es wurde nur die Spalte LastUpdate geändert, aber alle erfassten Spalten werden zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datenänderungseintrag ist ein Beispiel für einen Datensatz mit dem Erfassungstyp NEW_ROW_AND_OLD_VALUES. Es wurde nur die Spalte LastUpdate geändert, aber alle erfassten Spalten werden zurückgegeben. Bei diesem Werterfassungstyp werden der neue und der alte Wert von LastUpdate erfasst.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Heartbeat-Einträge

Wenn ein Heartbeat-Eintrag zurückgegeben wird, bedeutet das, dass alle Änderungen mit einem commit_timestamp, der kleiner oder gleich dem timestamp des Heartbeat-Eintrags ist, zurückgegeben wurden. Zukünftige Datensätze in dieser Partition müssen höhere Commit-Zeitstempel haben als der vom Heartbeat-Eintrag zurückgegebene. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben wurden. Wenn Datenänderungen in die Partition geschrieben werden, kann anstelle von heartbeat_record.timestamp die Zeichenfolge data_change_record.commit_timestamp verwendet werden, um anzugeben, dass der Leser beim Lesen der Partition Fortschritte macht.

Sie können Heartbeat-Einträge, die in Partitionen zurückgegeben werden, verwenden, um Leser über alle Partitionen hinweg zu synchronisieren. Sobald alle Leser entweder einen Heartbeat erhalten haben, der größer oder gleich einem bestimmten Zeitstempel A ist, oder Daten oder untergeordnete Partitionsdatensätze erhalten haben, die größer oder gleich dem Zeitstempel A sind, wissen sie, dass sie alle Datensätze erhalten haben, die bis zu diesem Zeitstempel A verbindlich gemacht wurden. Sie können dann mit der Verarbeitung der zwischengespeicherten Datensätze beginnen, z. B. die partitionenübergreifenden Datensätze nach Zeitstempel sortieren und nach server_transaction_id gruppieren.

Ein Heartbeat-Eintrag enthält nur ein Feld:

Feld Typ Beschreibung
timestamp TIMESTAMP Gibt den Zeitstempel des Heartbeat-Eintrags an.
Feld Typ Beschreibung
timestamp STRING Gibt den Zeitstempel des Heartbeat-Eintrags an.

Beispiel für einen Heartbeat-Eintrag

Beispiel für einen Heartbeat-Eintrag, der angibt, dass alle Einträge mit Zeitstempeln, die kleiner oder gleich dem Zeitstempel dieses Eintrags sind, zurückgegeben wurden:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Untergeordnete Partitionseinträge

Mit Untergeordnete Partitionsdatensätze werden Informationen zu untergeordneten Partitionen zurückgegeben: ihre Partitionstokens, die Tokens ihrer übergeordneten Partitionen und die start_timestamp, die den frühesten Zeitstempel darstellt, für den die untergeordneten Partitionen Änderungsdatensätze enthalten. Einträge, deren Commit-Zeitstempel unmittelbar vor dem child_partitions_record.start_timestamp liegen, werden in der aktuellen Partition zurückgegeben. Nachdem alle untergeordneten Partitionseinträge für diese Partition zurückgegeben wurden, gibt diese Abfrage den Status „Erfolg“ zurück, was bedeutet, dass alle Einträge für diese Partition zurückgegeben wurden.

Die Felder eines untergeordneten Partitionseintrags umfassen Folgendes:

Feld Typ Beschreibung
start_timestamp TIMESTAMP Gibt an, dass die Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben wurden, einen Commit-Zeitstempel haben, der mindestens start_timestamp ist. Bei der Abfrage einer untergeordneten Partition muss die Abfrage das Token der untergeordneten Partition und einen start_timestamp-Wert enthalten, der größer oder gleich child_partitions_token.start_timestamp ist. Alle von einer Partition zurückgegebenen Datensätze für untergeordnete Partitionen haben dieselbe start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Gibt eine monoton steigende Sequenznummer an, mit der die Reihenfolge der untergeordneten Partitionseinträge definiert werden kann, wenn in einer bestimmten Partition mehrere untergeordnete Partitionseinträge mit derselben start_timestamp zurückgegeben werden. Das Partitionstoken, start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig.
child_partitions
[
  {
    "token" : "STRING",
    "parent_partition_tokens" : ["STRING"]
  }
]
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitionstoken-String, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen.
Feld Typ Beschreibung
start_timestamp STRING Gibt an, dass die Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben wurden, einen Commit-Zeitstempel haben, der mindestens start_timestamp ist. Bei der Abfrage einer untergeordneten Partition muss die Abfrage das Token der untergeordneten Partition und einen start_timestamp-Wert enthalten, der größer oder gleich child_partitions_token.start_timestamp ist. Alle von einer Partition zurückgegebenen Datensätze für untergeordnete Partitionen haben dieselbe start_timestamp und der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Gibt eine monoton steigende Sequenznummer an, mit der die Reihenfolge der untergeordneten Partitionseinträge definiert werden kann, wenn in einer bestimmten Partition mehrere untergeordnete Partitionseinträge mit derselben start_timestamp zurückgegeben werden. Das Partitionstoken, start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig.
child_partitions
[
  {
    "token": "STRING",
    "parent_partition_tokens": ["STRING"],
  }, [...]
]
Gibt ein Array von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitionstoken-String, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen.

Beispiel für einen untergeordneten Partitionseintrag

Im folgenden Beispiel wird ein untergeordneter Partitionseintrag gezeigt:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Workflow für Abfragen von Änderungsstreams

Führen Sie Änderungsstreamabfragen mit der ExecuteStreamingSql API mit einer einmaligen schreibgeschützten Transaktion und einer starken Zeitstempelbindung aus. Mit der Lesefunktion für den Änderungsstream können Sie start_timestamp und end_timestamp für den gewünschten Zeitraum angeben. Auf alle Änderungsdatensätze innerhalb des Aufbewahrungszeitraums kann über die starke schreibgeschützte Zeitstempelbindung zugegriffen werden.

Alle anderen TransactionOptions sind für Änderungsstreamabfragen ungültig. Wenn TransactionOptions.read_only.return_read_timestamp auf true festgelegt ist, wird in der Transaction-Nachricht, die die Transaktion beschreibt, anstelle eines gültigen Lesezeitstempels ein spezieller Wert von kint64max - 1 zurückgegeben. Dieser spezielle Wert sollte verworfen und nicht für nachfolgende Abfragen verwendet werden.

Jede Änderungsstreamabfrage kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils einen Datenänderungseintrag, einen Herzschlageintrag oder einen Eintrag für untergeordnete Partitionen enthalten. Sie müssen keine Frist für die Anfrage festlegen.

Beispiel für einen Workflow für Abfragen zu Änderungsstreams

Der Workflow für Streamingabfragen beginnt mit der Ausgabe der allerersten Abfrage für den Änderungsstream. Geben Sie dazu partition_token bis NULL an. In der Abfrage müssen die Lesefunktion für den Änderungsstream, der gewünschte Start- und Endzeitstempel sowie das Heartbeat-Intervall angegeben werden. Wenn end_timestamp NULL ist, gibt die Abfrage so lange Datenänderungen zurück, bis die Partition endet.

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Verarbeiten Sie Datensätze aus dieser Abfrage, bis alle untergeordneten Partitionseinträge zurückgegeben werden. Im folgenden Beispiel werden zwei untergeordnete Partitionseinträge und drei Partitionstokens zurückgegeben, bevor die Abfrage beendet wird. Untergeordnete Partitionseinträge aus einer bestimmten Abfrage haben immer dieselbe start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Wenn Sie Änderungen nach dem 2022-05-01T09:00:01Z verarbeiten möchten, erstellen Sie drei neue Abfragen und führen Sie sie parallel aus. Zusammengenommen geben die drei Abfragen Datenänderungen für denselben Schlüsselbereich zurück, den auch das übergeordnete Element abdeckt. Legen Sie start_timestamp immer auf start_timestamp im selben untergeordneten Partitionseintrag fest und verwenden Sie dieselbe end_timestamp und dasselbe Heartbeat-Intervall, um die Einträge in allen Abfragen konsistent zu verarbeiten.

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Die Abfrage für child_token_2 wird abgeschlossen, nachdem ein weiterer untergeordneter Partitionseintrag zurückgegeben wurde. Dieser Eintrag gibt an, dass eine neue Partition Änderungen sowohl für child_token_2 als auch für child_token_3 ab 2022-05-01T09:30:15Z abdeckt. Die Abfrage für child_token_3 gibt genau denselben Datensatz zurück, da beide die übergeordneten Partitionen der neuen child_token_4 sind. Um eine strikte Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, muss die Abfrage auf child_token_4 gestartet werden, nachdem alle übergeordneten Elemente abgeschlossen sind. In diesem Fall sind die übergeordneten Elemente child_token_2 und child_token_3. Erstellen Sie nur eine Abfrage pro untergeordnetem Partitionstoken. Im Abfrage-Workflow-Design sollte ein übergeordnetes Element zum Warten und Planen der Abfrage am child_token_4 festgelegt werden.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Beispiele für die Verarbeitung und das Parsen von Änderungsstream-Einträgen finden Sie im Apache Beam SpannerIO Dataflow-Connector auf GitHub.