Partitionen, Datensätze und Abfragen von Streams ändern

Auf dieser Seite werden die folgenden Attribute von Änderungsstreams im Detail beschrieben:

  • Das aufteilungsbasierte Partitionierungsmodell
  • Format und Inhalt von Änderungsstreameinträgen
  • Die Low-Level-Syntax, die zum Abfragen dieser Datensätze verwendet wird
  • Beispiel für den Abfrageworkflow

Die Informationen auf dieser Seite sind am relevantesten für die Verwendung der Spanner API, um Änderungsstreams direkt abzufragen. Anwendungen, die stattdessen Dataflow zum Lesen von Änderungsstreamdaten verwenden, müssen nicht direkt mit dem hier beschriebenen Datenmodell arbeiten.

Eine umfassendere Einführung in Änderungsstreams finden Sie unter Übersicht über Änderungsstreams.

Streampartitionen ändern

Wenn eine Änderung an einer Tabelle auftritt, die von einem Änderungsstream überwacht wird, schreibt Spanner einen entsprechenden Änderungsstreameintrag in die Datenbank, synchron in derselben Transaktion wie die Datenänderung. Dies garantiert, dass Spanner die Änderung bei erfolgreicher Transaktion auch erfolgreich erfasst und beibehalten hat. Intern lagert Spanner den Datensatz des Änderungsstreams und die Datenänderungen zusammen, sodass sie vom selben Server verarbeitet werden, um den Schreibaufwand zu minimieren.

Als Teil der DML an einen bestimmten Split hängt Spanner den Schreibvorgang an den entsprechenden Änderungsstream-Daten-Split in derselben Transaktion an. Aufgrund dieser Colocation sorgen Änderungsstreams nicht für zusätzliche Koordination zwischen den Bereitstellungsressourcen, wodurch der Commit-Aufwand für Transaktionen minimiert wird.

Image

Spanner skaliert durch die dynamische Aufteilung und Zusammenführung von Daten basierend auf Datenbanklast und -größe und Verteilen von Splits auf Bereitstellungsressourcen.

Um das Skalieren von Schreib- und Lesevorgängen für Änderungsstreams zu ermöglichen, teilt Spanner den internen Änderungsstreamspeicher auf und führt ihn mit den Datenbankdaten zusammen. Dadurch werden Hotspots automatisch vermieden. Um das Lesen von Änderungsstream-Einträgen nahezu in Echtzeit zu unterstützen, wenn Datenbankschreibvorgänge skaliert werden, ist die Spanner API so konzipiert, dass ein Änderungsstream gleichzeitig mit Änderungsstreampartitionen abgefragt wird. Änderungsstreampartitionen werden so zugeordnet, dass Änderungsstreamdatenaufteilungen, die die Änderungsstreameinträge enthalten, geändert werden. Die Partitionen eines Änderungsstreams ändern sich im Laufe der Zeit dynamisch und hängen davon ab, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.

Eine Änderungsstreampartition enthält Datensätze für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Änderungsstreampartition kann in eine oder mehrere Änderungsstreampartitionen aufgeteilt oder mit anderen Änderungsstreampartitionen zusammengeführt werden. Wenn diese Aufteilungs- oder Zusammenführungsereignisse eintreten, werden untergeordnete Partitionen erstellt, um die Änderungen für ihre jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum zu erfassen. Neben Datenänderungseinträgen gibt eine Änderungsstreamabfrage untergeordnete Partitionseinträge zurück, um Leser über neue Änderungsstreampartitionen zu informieren, die abgefragt werden müssen, sowie Heartbeat-Einträge, um den Vorwärtsfortschritt anzuzeigen, wenn in letzter Zeit keine Schreibvorgänge stattgefunden haben.

Beim Abfragen einer bestimmten Änderungsstreampartition werden die Änderungseinträge in der Reihenfolge der Commit-Zeitstempel zurückgegeben. Jeder Änderungseintrag wird genau einmal zurückgegeben. Bei Änderungsstreampartitionen gibt es keine garantierte Reihenfolge der Änderungseinträge. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.

Aufgrund der Partitionsherkunft zwischen über- und untergeordneten Partitionen sollten zum Verarbeiten von Änderungen für einen bestimmten Schlüssel in der Commit-Zeitstempelreihenfolge Datensätze, die von untergeordneten Partitionen zurückgegeben werden, erst verarbeitet werden, nachdem Datensätze aus allen übergeordneten Partitionen verarbeitet wurden.

Lesefunktionen und Abfragesyntax des Streams ändern

GoogleSQL

Änderungsstreams werden mit der ExecuteStreamingSql API abgefragt. Spanner erstellt automatisch eine spezielle Lesefunktion zusammen mit dem Änderungsstream. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für die Lesefunktion lautet READ_change_stream_name.

Unter der Annahme, dass der Änderungsstream SingersNameStream in der Datenbank vorhanden ist, sieht die Abfragesyntax für GoogleSQL so aus:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

Die Funktion read akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp TIMESTAMP Erforderlich Gibt an, dass Datensätze mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Zeit sowie größer oder gleich dem Zeitstempel für die Erstellung des Änderungsstreams sein.
end_timestamp TIMESTAMP Optional (Standardeinstellung: NULL) Gibt an, dass Datensätze mit commit_timestamp kleiner oder gleich end_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams und größer oder gleich start_timestamp sein. Die Abfrage ist abgeschlossen, nachdem alle ChangeRecords bis zu end_timestamp zurückgegeben wurden oder der Nutzer die Verbindung beendet. Wenn NULL angegeben oder nicht angegeben ist, wird die Abfrage ausgeführt, bis alle ChangeRecords zurückgegeben wurden oder der Nutzer die Verbindung beendet.
partition_token STRING Optional (Standardeinstellung: NULL) Gibt an, welche Änderungsstreampartition basierend auf dem Inhalt von untergeordneten Partitionen abgefragt werden soll. Wenn NULL oder nicht angegeben ist, fragt der Leser den Änderungsstream zum ersten Mal ab und hat keine bestimmten Partitionstokens für die Abfrage erhalten.
heartbeat_milliseconds INT64 Erforderlich Bestimmt, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen übergeben werden. Der Wert muss zwischen 1,000 (eine Sekunde) und 30,0000 (fünf Minuten) liegen.
read_options ARRAY Optional (Standardeinstellung: NULL) Zusätzliche Leseoptionen, die für die zukünftige Verwendung reserviert sind. Der einzige zulässige Wert ist derzeit NULL.

Wir empfehlen, eine bequeme Methode zum Erstellen des Textes der Lesefunktionsabfrage zu entwickeln und daran zu binden, wie im folgenden Beispiel gezeigt.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Änderungsstreams werden mit der ExecuteStreamingSql API abgefragt. Spanner erstellt automatisch eine spezielle Lesefunktion zusammen mit dem Änderungsstream. Die Lesefunktion bietet Zugriff auf die Datensätze des Änderungsstreams. Die Namenskonvention für die Lesefunktion lautet spanner.read_json_change_stream_name.

Wenn in der Datenbank der Änderungsstream SingersNameStream vorhanden ist, lautet die Abfragesyntax für PostgreSQL so:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

Die Funktion read akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp timestamp with time zone Erforderlich Gibt an, dass Änderungseinträge mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams liegen und kleiner oder gleich der aktuellen Zeit sowie größer oder gleich dem Zeitstempel für die Erstellung des Änderungsstreams sein.
end_timestamp timestamp with timezone Optional (Standardeinstellung: NULL) Gibt an, dass Änderungseinträge zurückgegeben werden sollen, deren commit_timestamp kleiner oder gleich end_timestamp ist. Der Wert muss innerhalb der Aufbewahrungsdauer des Änderungsstreams und größer oder gleich start_timestamp sein. Die Abfrage ist abgeschlossen, nachdem alle Änderungseinträge bis zum end_timestamp zurückgegeben wurden oder der Nutzer die Verbindung beendet. Bei NULL wird die Abfrage ausgeführt, bis alle Änderungseinträge zurückgegeben wurden oder der Nutzer die Verbindung beendet.
partition_token text Optional (Standardeinstellung: NULL) Gibt an, welche Änderungsstreampartition basierend auf dem Inhalt von untergeordneten Partitionen abgefragt werden soll. Wenn NULL oder nicht angegeben ist, fragt der Leser den Änderungsstream zum ersten Mal ab und hat keine bestimmten Partitionstokens für die Abfrage erhalten.
heartbeat_milliseconds bigint Erforderlich Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen übergeben werden. Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf Minuten) liegen.
null null Erforderlich Für zukünftige Verwendung reserviert

Wir empfehlen, eine einfache Methode zum Erstellen des Texts der Lesefunktion zu entwickeln und dafür Parameter zu binden, wie im folgenden Beispiel gezeigt.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Format von Streamaufzeichnungen ändern

GoogleSQL

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte vom Typ ARRAY<STRUCT<...>> zurück. In jeder Zeile enthält dieses Array immer ein einzelnes Element.

Die Array-Elemente haben folgenden Typ:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Diese Struktur enthält drei Felder: data_change_record, heartbeat_record und child_partitions_record, die jeweils vom Typ ARRAY<STRUCT<...>> sind. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder NULL. Diese Array-Felder enthalten höchstens ein Element.

In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.

PostgreSQL

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte vom Typ JSON mit der folgenden Struktur zurück:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

In diesem Objekt gibt es drei mögliche Schlüssel: data_change_record, heartbeat_record und child_partitions_record. Der entsprechende Werttyp ist JSON. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, ist nur einer dieser drei Schlüssel vorhanden.

In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.

Einträge über Datenänderungen

Ein Datenänderungseintrag enthält eine Reihe von Änderungen an einer Tabelle mit demselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), die mit demselben Commit-Zeitstempel in einer Änderungsstreampartition für dieselbe Transaktion durchgeführt wurden. Für dieselbe Transaktion können mehrere Datenänderungseinträge über mehrere Partitions des Änderungsstreams zurückgegeben werden.

Alle Datenänderungseinträge haben die Felder commit_timestamp, server_transaction_id und record_sequence, die zusammen die Reihenfolge im Änderungsstream für einen Stream-Eintrag festlegen. Diese drei Felder reichen aus, um die Reihenfolge der Änderungen abzuleiten und für externe Konsistenz zu sorgen.

Beachten Sie, dass mehrere Transaktionen denselben Commit-Zeitstempel haben können, wenn sie nicht überlappende Daten berühren. Mit dem Feld server_transaction_id kann unterschieden werden, welche Änderungen (möglicherweise über Änderungsstreampartitionen hinweg) mit derselben Transaktion vorgenommen wurden. Durch die Kopplung mit den Feldern record_sequence und number_of_records_in_transaction können Sie auch alle Datensätze einer bestimmten Transaktion zwischenspeichern und ordnen.

Zu den Feldern eines Datenänderungseintrags gehören:

GoogleSQL

Feld Typ Beschreibung
commit_timestamp TIMESTAMP Der Zeitstempel, an dem die Änderung festgeschrieben wurde.
record_sequence STRING Die Sequenznummer für den Datensatz in der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze für denselben server_transaction_id nach record_sequence, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der für die Änderung ein Commit durchgeführt wurde. Der Wert sollte nur im Kontext der Verarbeitung von Änderungsstreameinträgen verwendet werden und steht in keinem Zusammenhang mit der Transaktions-ID in der Spanner-API.
is_last_record_in_transaction_in_partition BOOL Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der Konfiguration des Änderungsstreams angegeben wurde, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES" sein. Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen finden Sie unter Werterfassungstypen.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist und die Position der Spalte gemäß der Definition im Schema ("ordinal_position"). Die erste Spalte einer Tabelle im Schema hat die ordinale Position "1". Der Spaltentyp kann für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur, die in der Referenz zur Spanner API beschrieben wird.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Beschreibt die vorgenommenen Änderungen, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder nachverfolgten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten Wert "value_capture_type" ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Die Anzahl der Datensätze zu Datenänderungen, die Teil dieser Transaktion in allen Änderungsstreampartitionen sind.
number_of_partitions_in_transaction INT64 Die Anzahl der Partitionen, die Datenänderungseinträge für diese Transaktion zurückgeben.
transaction_tag STRING Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOL Gibt an, ob die Transaktion eine Systemtransaktion ist.

PostgreSQL

Feld Typ Beschreibung
commit_timestamp STRING Der Zeitstempel, zu dem der Commit für die Änderung durchgeführt wurde.
record_sequence STRING Die Sequenznummer für den Datensatz in der Transaktion. Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Sortieren Sie die Datensätze für dieselbe Servertransaktions-ID nach "Datensatz_Sequenz", um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der für die Änderung ein Commit durchgeführt wurde. Der Wert sollte nur im Kontext der Verarbeitung von Änderungsstreameinträgen verwendet werden und steht in keinem Zusammenhang mit der Transaktions-ID in der Spanner-API
is_last_record_in_transaction_in_partition BOOLEAN Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der Konfiguration des Änderungsstreams angegeben wurde, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES" sein. Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen finden Sie unter Werterfassungstypen.

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist und die Position der Spalte gemäß der Definition im Schema ("ordinal_position"). Die erste Spalte einer Tabelle im Schema hat die ordinale Position "1". Der Spaltentyp kann für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur, die in der Referenz zur Spanner API beschrieben wird.
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Beschreibt die vorgenommenen Änderungen, einschließlich der Primärschlüsselwerte, der alten Werte und der neuen Werte der geänderten oder nachverfolgten Spalten. Die Verfügbarkeit und der Inhalt der alten und neuen Werte hängen vom konfigurierten Wert "value_capture_type" ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Die Anzahl der Datensätze zu Datenänderungen, die Teil dieser Transaktion in allen Änderungsstreampartitionen sind.
number_of_partitions_in_transaction NUMBER Die Anzahl der Partitionen, die Datenänderungseinträge für diese Transaktion zurückgeben.
transaction_tag STRING Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOLEAN Gibt an, ob die Transaktion eine Systemtransaktion ist.

Es folgt ein Paar Beispieldatensätze für Datenänderungseinträge. Sie beschreiben eine einzelne Transaktion, bei der eine Übertragung zwischen zwei Konten erfolgt. Beachten Sie, dass sich die beiden Konten in separaten Änderungsstreampartitionen befinden.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Werterfassungstyp "NEW_VALUES". Beachten Sie, dass nur neue Werte ausgefüllt werden. Da nur die Spalte "LastUpdate" geändert wurde, wurde nur diese Spalte zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Werterfassungstyp "NEW_ROW". Nur die Spalte "LastUpdate" wurde geändert, aber alle nachverfolgten Spalten werden zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Werterfassungstyp "NEW_ROW_AND_OLD_VALUES". Nur die Spalte "LastUpdate" wurde geändert, aber alle nachverfolgten Spalten werden zurückgegeben. Dieser Werterfassungstyp erfasst den neuen und alten Wert von LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Heartbeat-Rekorde

Wenn ein Heartbeat-Datensatz zurückgegeben wird, zeigt dies an, dass alle Änderungen mit commit_timestamp, die kleiner oder gleich dem timestamp des Heartbeat-Datensatzes sind, zurückgegeben wurden und dass zukünftige Datensätze in dieser Partition höhere Commit-Zeitstempel haben müssen als die, die vom Heartbeat-Datensatz zurückgegeben werden. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben werden. Wenn Datenänderungen in die Partition geschrieben werden, kann data_change_record.commit_timestamp anstelle von heartbeat_record.timestamp verwendet werden, um anzugeben, dass der Leser beim Lesen der Partition voranschreitet.

Sie können Heartbeat-Einträge verwenden, die in Partitionen zurückgegeben wurden, um Leser in allen Partitionen zu synchronisieren. Sobald alle Leser entweder einen Heartbeat-Wert größer oder gleich einem Zeitstempel A oder Daten- oder untergeordnete Partitionsdatensätze größer oder gleich dem Zeitstempel A erhalten haben, wissen die Leser, dass sie alle Datensätze empfangen haben, die zum oder vor diesem Zeitstempel A übergeben wurden, und mit der Verarbeitung der zwischengespeicherten Datensätze beginnen, z. B. durch Sortieren der partitionsübergreifenden Datensätze nach Zeitstempel und Gruppierung nach server_transaction_id.

Ein Heartbeat-Eintrag enthält nur ein Feld:

GoogleSQL

Feld Typ Beschreibung
timestamp TIMESTAMP Der Zeitstempel des Heartbeat-Datensatzes.

PostgreSQL

Feld Typ Beschreibung
timestamp STRING Der Zeitstempel des Heartbeat-Datensatzes.

Ein Beispiel für einen Heartbeat-Datensatz, der darüber informiert, dass alle Datensätze zurückgegeben wurden, deren Zeitstempel kleiner oder gleich dem Zeitstempel dieses Eintrags sind:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Untergeordnete Partitionseinträge

Ein untergeordneter Partitionseintrag gibt Informationen zu untergeordneten Partitionen zurück: ihre Partitionstokens, die Tokens ihrer übergeordneten Partitionen und den start_timestamp, der den frühesten Zeitstempel darstellt, für den die untergeordneten Partitionen Änderungseinträge enthalten. Datensätze, deren Commit-Zeitstempel unmittelbar vor child_partitions_record.start_timestamp liegt, werden in der aktuellen Partition zurückgegeben. Nachdem alle untergeordneten Partitionen für diese Partition zurückgegeben wurden, gibt diese Abfrage einen Erfolgsstatus zurück, der angibt, dass alle Datensätze für diese Partition zurückgegeben wurden.

Die Felder eines untergeordneten Partitionseintrags enthalten Folgendes:

GoogleSQL

Feld Typ Beschreibung
start_timestamp TIMESTAMP Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben werden, haben einen Commit-Zeitstempel größer oder gleich start_timestamp. Bei der Abfrage einer untergeordneten Partition sollte in der Abfrage das untergeordnete Partitionstoken und ein start_timestamp größer oder gleich child_partitions_token.start_timestamp angegeben werden. Alle untergeordneten Partitionen, die von einer Partition zurückgegeben werden, haben denselben start_timestamp und der Zeitstempel liegt immer zwischen den in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Eine kontinuierlich ansteigende Sequenznummer, mit der die Reihenfolge des Datensatzes untergeordneter Partitionen definiert werden kann, wenn in einer bestimmten Partition mehrere untergeordnete Partitionseinträge mit demselben start_timestamp zurückgegeben werden. Das Partitionstoken start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der String des Partitionstokens, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen.

PostgreSQL

Feld Typ Beschreibung
start_timestamp STRING Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben wurden, haben einen Commit-Zeitstempel größer oder gleich start_timestamp. Bei der Abfrage einer untergeordneten Partition sollte die Abfrage das untergeordnete Partitionstoken und einen start_timestamp angeben, der größer oder gleich child_partitions_token.start_timestamp ist. Alle untergeordneten Partitionen, die von einer Partition zurückgegeben werden, haben denselben start_timestamp und der Zeitstempel liegt immer zwischen den in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Eine kontinuierlich ansteigende Sequenznummer, mit der die Reihenfolge des Datensatzes untergeordneter Partitionen definiert werden kann, wenn in einer bestimmten Partition mehrere untergeordnete Partitionseinträge mit demselben start_timestamp zurückgegeben werden. Das Partitionstoken start_timestamp und record_sequence identifizieren einen untergeordneten Partitionseintrag eindeutig.
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Gibt ein Array von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der String des Partitionstokens, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen.

Das folgende Beispiel zeigt einen untergeordneten Partitionseintrag:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Workflow für Abfrage von Änderungsstreams

Führen Sie mit der ExecuteStreamingSql API Abfragen für Änderungsstreams mit einer schreibgeschützten Transaktion zur einmaligen Verwendung und einer starken Zeitstempelgrenze aus. Mit der Funktion zum Lesen des Änderungsstreams können Sie start_timestamp und end_timestamp für den gewünschten Zeitraum angeben. Alle Änderungsdatensätze innerhalb der Aufbewahrungsdauer sind mithilfe der starken schreibgeschützten Zeitstempelgrenze zugänglich.

Alle anderen TransactionOptions sind für Abfrage des Änderungsstreams ungültig. Wenn TransactionOptions.read_only.return_read_timestamp auf „true“ gesetzt ist, wird anstelle eines gültigen Lesezeitstempels ein spezieller Wert von kint64max - 1 in der Transaction-Nachricht zurückgegeben, die die Transaktion beschreibt. Dieser spezielle Wert sollte verworfen und nicht für nachfolgende Abfragen verwendet werden.

Jede Änderungsstreamabfrage kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils entweder einen Datensatz für die Datenänderung, einen Heartbeat-Datensatz oder einen Datensatz für untergeordnete Partitionen enthalten. Es muss keine Frist für die Anfrage festgelegt werden.

Beispiel:

Der Workflow der Streamingabfrage beginnt mit der ersten Änderungsstreamabfrage. Dazu wird für partition_token der Wert NULL angegeben. In der Abfrage muss die Lesefunktion für den Änderungsstream, der gewünschte Start- und Endzeitstempel sowie das Heartbeat-Intervall angegeben werden. Ist end_timestamp auf NULL gesetzt, gibt die Abfrage weiterhin Datenänderungen zurück, bis die Partition beendet ist.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Verarbeiten Sie Datensätze von dieser Abfrage, bis untergeordnete Partitionsdatensätze zurückgegeben werden. Im folgenden Beispiel werden zwei untergeordnete Partitionseinträge und drei Partitionstokens zurückgegeben. Anschließend wird die Abfrage beendet. Untergeordnete Partitionseinträge einer bestimmten Abfrage haben immer dieselbe start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Erstellen Sie drei neue Abfragen und führen Sie sie parallel aus, um zukünftige Änderungen nach 2022-05-01T09:00:01Z zu verarbeiten. Die drei Abfragen zusammen geben zukünftige Datenänderungen für denselben Schlüsselbereich zurück, den das übergeordnete Element abdeckt. Legen Sie für start_timestamp immer die start_timestamp im selben untergeordneten Partitionseintrag fest und verwenden Sie denselben end_timestamp-Wert und dasselbe Heartbeat-Intervall, um die Datensätze über alle Abfragen hinweg einheitlich zu verarbeiten.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Nach einer Weile ist die Abfrage für child_token_2 abgeschlossen, nachdem ein weiterer untergeordneter Partitionseintrag zurückgegeben wurde. Dieser Eintrag zeigt an, dass eine neue Partition zukünftige Änderungen sowohl für child_token_2 als auch für child_token_3 ab 2022-05-01T09:30:15Z abdeckt. Bei der Abfrage von child_token_3 wird genau derselbe Eintrag zurückgegeben, da beide übergeordneten Partitionen des neuen child_token_4-Elements sind. Um eine strikt geordnete Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, darf die Abfrage für child_token_4 erst gestartet werden, nachdem alle übergeordneten Elemente abgeschlossen sind, in diesem Fall child_token_2 und child_token_3. Erstellen Sie nur eine Abfrage für jedes untergeordnete Partitionstoken. Im Design des Abfrageworkflows sollte ein übergeordnetes Element festgelegt werden, das die Abfrage wartet und die Abfrage für child_token_4 plant.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Beispiele für die Verarbeitung und Analyse von Änderungsstreameinträgen finden Sie im Dataflow-Connector von Apache Beam SpannerIO auf GitHub.