Partitionen, Datensätze und Abfragen von Streams ändern

Auf dieser Seite werden die folgenden Attribute von Änderungsstreams im Detail beschrieben:

  • Das aufteilungsbasierte Partitionierungsmodell
  • Format und Inhalt von Änderungsstreameinträgen
  • Die Low-Level-Syntax, die zum Abfragen dieser Datensätze verwendet wird
  • Beispiel für den Abfrageworkflow

Die Informationen auf dieser Seite sind am relevantesten für die Verwendung der Spanner API, um Änderungsstreams direkt abzufragen. Anwendungen, die stattdessen Dataflow zum Lesen des Änderungsstreams verwenden Daten nicht direkt mit dem Datenmodell arbeiten, beschrieben.

Eine umfassendere Einführung in Änderungsstreams finden Sie unter Streams ändern .

Streampartitionen ändern

Wenn eine Änderung an einer Tabelle auftritt, die von einem Änderungsstream überwacht wird, Spanner schreibt einen entsprechenden Änderungsstreameintrag in die Datenbank. synchron in derselben Transaktion erfolgen, in der sich die Daten ändern. Dieses garantiert, dass Spanner bei erfolgreicher Transaktion die Änderung erfolgreich erfasst und beibehalten hat. Intern Spanner bringt den Änderungsstreameintrag und die Datenänderung zusammen sodass sie vom selben Server verarbeitet werden, um den Schreib-Overhead zu minimieren.

Als Teil der DML für einen bestimmten Split kann Spanner Hängt den Schreibvorgang an die entsprechenden Änderungsstreamdaten an. in derselben Transaktion aufgeteilt. Aufgrund dieser Colocation Streams sorgen für keine zusätzliche Koordination zwischen den Bereitstellungsressourcen, den Commit-Overhead der Transaktion minimiert.

Image

Spanner skaliert durch dynamisches Aufteilen und Zusammenführen von Daten auf Datenbanklast und -größe sowie die Verteilung von Splits auf die Bereitstellungsressourcen.

Bis Ermöglicht das Skalieren von Änderungsstreams, Schreib- und Lesevorgängen, Spanner-Splits und führt den internen Änderungsstream-Speicher mit den Datenbankdaten zusammen. Hotspots werden automatisch vermieden. Um das Lesen von Änderungsstream-Datensätzen in nahezu in Echtzeit, wenn Datenbankschreibvorgänge skaliert werden, ist die Spanner API ist darauf ausgelegt, einen Änderungsstream gleichzeitig über den Änderungsstream abzufragen. Partitionen. Ändern Sie die Zuordnung von Streampartitionen, um Stream-Datenaufteilungen zu ändern, die Änderungsstream-Datensätze enthalten. Die Partitionen eines Änderungsstreams ändern sich dynamisch im Laufe der Zeit und korrelieren damit, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.

Eine Änderungsstreampartition enthält Datensätze für einen unveränderlichen Schlüsselbereich für einen Zeitraum festlegen. Jede Änderungsstreampartition kann in eine oder mehrere Änderungsstreampartitionen aufgeteilt oder mit anderen Änderungsstreampartitionen zusammengeführt werden. Wenn diese geteilte oder zusammengeführte Ereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für ihre jeweiligen unveränderlichen Schlüsselbereiche für den nächsten Zeitraum. Außerdem gibt eine Änderungsstreamabfrage untergeordnete Partitionseinträge an Leser über neue Partitions im Änderungsstream informieren, die abgefragt werden müssen, als Heartbeat-Datensätze, um den Vorwärtsfortschritt anzuzeigen, wenn keine Schreibvorgänge stattgefunden haben vor Kurzem.

Beim Abfragen einer bestimmten Änderungsstreampartition sind die Änderungseinträge werden in der Reihenfolge der Commit-Zeitstempel zurückgegeben. Jeder Änderungseintrag wird genau einmal. Bei Änderungsstreampartitionen gibt es keine garantierte Reihenfolge der Änderungen. Datensätze. Änderungseinträge für einen bestimmten Primärschlüssel werden nur auf einem für einen bestimmten Zeitraum.

Aufgrund der Partitionsherkunft zwischen über- und untergeordneten Elementen bestimmter Schlüssel in der Commit-Zeitstempelreihenfolge, Datensätze, die vom untergeordneten Partitionen sollten erst nach Datensätzen von allen übergeordneten Elementen verarbeitet werden Partitionen verarbeitet wurden.

Lesefunktionen und Abfragesyntax des Streams ändern

GoogleSQL

Änderungsstreams abfragen Sie mit dem ExecuteStreamingSql der API erstellen. Spanner erstellt automatisch eine spezielle Lesefunktion mit dem Änderungsstream. Die Lesefunktion bietet Zugriff auf die Änderung die Datensätze des Streams. Die Namenskonvention für Lesefunktionen READ_change_stream_name

Unter der Annahme, dass in der Datenbank der Änderungsstream SingersNameStream vorhanden ist, gilt Folgendes: für GoogleSQL lautet die Abfragesyntax wie folgt:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

Die Funktion read akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp TIMESTAMP Erforderlich Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp sind zurückgegeben werden soll. Der Wert muss sich im Änderungsstream befinden und sollte kleiner oder gleich der aktuellen Zeit sein, und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams ist.
end_timestamp TIMESTAMP Optional (Standardeinstellung: NULL) Gibt an, dass Einträge mit commit_timestamp weniger vorhanden sind oder gleich end_timestamp sollte zurückgegeben werden kann. Der Wert muss innerhalb der Aufbewahrungsfrist des Änderungsstreams liegen Punkt und größer oder gleich start_timestamp. Die Abfrage ist entweder nach Rückgabe aller ChangeRecords bis zu end_timestamp abgeschlossen oder der Nutzer die Verbindung trennt. Wenn NULL oder nicht wird die Abfrage ausgeführt, bis alle ChangeRecords zurückgegeben oder der bricht die Verbindung ab.
partition_token STRING Optional (Standardeinstellung: NULL) Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem Inhalt von untergeordneten Partitionen Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den den Änderungsstream zum ersten Mal abfragt und Es wurden keine spezifischen Partitionstokens für die Abfrage abgerufen.
heartbeat_milliseconds INT64 Erforderlich Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird falls in dieser Partition keine Transaktionen mit Commit durchgeführt werden. Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen. Minuten).
read_options ARRAY Optional (Standardeinstellung: NULL) Zusätzliche Leseoptionen, die für die zukünftige Verwendung reserviert sind. Derzeit ist der einzige zulässige Wert NULL.

Wir empfehlen, eine einfache Methode zum Erstellen des Textes für die Funktionsabfrage und Bindungsparameter daran lesen, wie im Folgenden gezeigt: Beispiel.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Änderungsstreams abfragen Sie mit dem ExecuteStreamingSql API Spanner erstellt automatisch eine spezielle Lesefunktion mit dem Änderungsstream. Die Lesefunktion bietet Zugriff auf die Änderung die Datensätze des Streams. Die Namenskonvention für Lesefunktionen spanner.read_json_change_stream_name

Unter der Annahme, dass in der Datenbank der Änderungsstream SingersNameStream vorhanden ist, gilt Folgendes: für PostgreSQL sieht so aus:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

Die Funktion read akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp timestamp with time zone Erforderlich Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp geändert werden zurückgegeben werden soll. Der Wert muss sich im Änderungsstream befinden und sollte kleiner oder gleich der aktuellen Zeit sein, und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams ist.
end_timestamp timestamp with timezone Optional (Standardeinstellung: NULL) Gibt an, dass Einträge mit commit_timestamp geändert werden kleiner oder gleich end_timestamp sollte zurückgegeben werden kann. Der Wert muss innerhalb der Aufbewahrungsfrist des Änderungsstreams liegen Punkt und größer oder gleich start_timestamp. Die Abfrage ist abgeschlossen, nachdem alle Änderungseinträge bis zu end_timestamp oder der Nutzer trennt die Verbindung. Mit NULL wird die Abfrage ausgeführt, bis alle Änderungseinträge zurückgegeben wurden oder der Nutzer die Verbindung trennt.
partition_token text Optional (Standardeinstellung: NULL) Gibt an, welche Änderungsstreampartition abgefragt werden soll, basierend auf dem Inhalt von untergeordneten Partitionen Datensätze. Wenn NULL oder nicht angegeben ist, bedeutet dies den den Änderungsstream zum ersten Mal abfragt und Es wurden keine spezifischen Partitionstokens für die Abfrage abgerufen.
heartbeat_milliseconds bigint Erforderlich Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird falls in dieser Partition keine Transaktionen mit Commit durchgeführt werden. Der Wert muss zwischen 1,000 (eine Sekunde) und 300,000 (fünf) liegen. Minuten).
null null Erforderlich Für zukünftige Verwendung reserviert

Wir empfehlen, eine einfache Methode zum Erstellen des Textes für die die Funktions- und Bindungsparameter, wie im Folgenden gezeigt: Beispiel.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Format von Streamaufzeichnungen ändern

GoogleSQL

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte des Typs zurück ARRAY<STRUCT<...>>. In jeder Zeile enthält dieses Array immer ein einzelnes Element.

Die Array-Elemente haben folgenden Typ:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Diese Struktur enthält drei Felder: data_change_record, heartbeat_record und child_partitions_record, jeweils vom Typ ARRAY<STRUCT<...>>. In jeder Zeile, die die Änderungsstream-Lesefunktion enthält gibt, enthält nur eines dieser drei Felder einen Wert; die anderen beiden sind leer oder NULL. Diese Array-Felder enthalten höchstens ein Element.

In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.

PostgreSQL

Die Lesefunktion für Änderungsstreams gibt eine einzelne ChangeRecord-Spalte mit Geben Sie JSON mit der folgenden Struktur ein:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

In diesem Objekt gibt es drei mögliche Schlüssel: data_change_record, heartbeat_record und child_partitions_record, der entsprechende Wert Typ ist JSON. In jeder Zeile, die die Lesefunktion des Änderungsstreams zurückgibt, einer dieser drei Schlüssel existiert.

In den folgenden Abschnitten werden diese drei Eintragstypen erläutert.

Einträge über Datenänderungen

Ein Datensatz für Datenänderung enthält eine Reihe von Änderungen an einer Tabelle mit dem denselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), der zur selben Zeit Commit-Zeitstempel in einer Änderungsstreampartition für denselben Transaktion. Für denselben Datensatz können mehrere Datenänderungseinträge zurückgegeben werden. in mehreren Partitions des Änderungsstreams.

Alle Datenänderungseinträge haben commit_timestamp, server_transaction_id, und record_sequence, die zusammen die Reihenfolge der Änderung bestimmen. für einen Stream-Datensatz. Diese drei Felder reichen aus, um und für externe Konsistenz sorgen.

Mehrere Transaktionen können den gleichen Commit-Zeitstempel haben, nicht überlappende Daten berühren. Das Feld server_transaction_id lässt sich erkennen, welche Änderungen (möglicherweise Änderungsstreams) wurden innerhalb derselben Transaktion. Kopplung mit record_sequence und Mit number_of_records_in_transaction-Feldern können Sie puffern und sortieren alle Datensätze einer bestimmten Transaktion.

Zu den Feldern eines Datenänderungseintrags gehören:

GoogleSQL

Feld Typ Beschreibung
commit_timestamp TIMESTAMP Der Zeitstempel, an dem die Änderung festgeschrieben wurde.
record_sequence STRING Die Sequenznummer für den Datensatz in der Transaktion. Für Sequenznummern wird garantiert, eindeutig sein und innerhalb einer Transaktion kontinuierlich ansteigen (aber nicht notwendigerweise fortlaufend). Sortieren Sie die Datensätze server_transaction_id von record_sequence bis die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion in in dem die Änderung festgeschrieben wurde. Der Wert darf nur werden im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet. mit der Transaktions-ID in der Spanner-API korreliert.
is_last_record_in_transaction_in_partition BOOL Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES" sein, "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES". Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen Weitere Informationen zu den Arten der Werterfassung

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema eine Ordinalposition von "1" haben. Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels. Werte, die alten Werte und die neuen Werte der geänderten oder erfassten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen vom konfigurierten Wert "value_capture_type" ab. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE
number_of_records_in_transaction INT64 Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams.
number_of_partitions_in_transaction INT64 Die Anzahl der Partitionen, für die Datenänderungseinträge zurückgegeben werden für diese Transaktion.
transaction_tag STRING <ph type="x-smartling-placeholder"></ph> Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOL Gibt an, ob die Transaktion eine Systemtransaktion ist.

PostgreSQL

Feld Typ Beschreibung
commit_timestamp STRING Der Zeitstempel, zu dem der Commit für die Änderung durchgeführt wurde.
record_sequence STRING Die Sequenznummer für den Datensatz in der Transaktion. Für Sequenznummern wird garantiert, eindeutig sein und innerhalb einer Transaktion kontinuierlich ansteigen (aber nicht notwendigerweise fortlaufend). Sortieren Sie die Datensätze „server_transaction_id“ durch „record_sequenz“, um die Reihenfolge der Änderungen innerhalb der Transaktion.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion in in dem die Änderung festgeschrieben wurde. Der Wert darf nur werden im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet. Korreliert mit der Transaktions-ID in der Spanner-API
is_last_record_in_transaction_in_partition BOOLEAN Gibt an, ob dies der letzte Datensatz für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der die Konfiguration des Änderungsstreams ändern, als diese Änderung erfasst wurde.

Der Werterfassungstyp kann "OLD_AND_NEW_VALUES" sein, "NEW_ROW", "NEW_VALUES" oder "NEW_ROW_AND_OLD_VALUES". Der Standardwert ist "OLD_AND_NEW_VALUES". Weitere Informationen Weitere Informationen zu den Arten der Werterfassung

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
Der Name der Spalte, der Spaltentyp, ob es ein Primärschlüssel ist, und die Position der Spalte im Schema definiert ("ordinal_position"). Die erste Spalte einer Tabelle im Schema eine Ordinalposition von "1" haben. Spaltentyp für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur. in der Spanner API-Referenz beschrieben.
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Beschreibt die vorgenommenen Änderungen, einschließlich des Primärschlüssels. Werte, die alten Werte und die neuen Werte der geänderten oder erfassten Spalten. Verfügbarkeit und Inhalt der alten und neuen Werte hängen davon ab, für den konfigurierten Wert "value_capture_type" festgelegt. Die new_values und old_values-Felder enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE
number_of_records_in_transaction INT64 Die Anzahl der Datensätze zu Datenänderungen, die Teil dieses in allen Partitions des Änderungsstreams.
number_of_partitions_in_transaction NUMBER Die Anzahl der Partitionen, für die Datenänderungseinträge zurückgegeben werden für diese Transaktion.
transaction_tag STRING <ph type="x-smartling-placeholder"></ph> Transaktions-Tag, das dieser Transaktion zugeordnet ist.
is_system_transaction BOOLEAN Gibt an, ob die Transaktion eine Systemtransaktion ist.

Es folgt ein Paar Beispieldatensätze für Datenänderungseinträge. Sie beschreiben eine einzelne Transaktion in dem es eine zwischen zwei Konten übertragen. Die beiden Konten befinden sich separater Änderungsstream Partitionen.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert Erfassungstyp "NEW_VALUES". Beachten Sie, dass nur neue Werte ausgefüllt werden. Da nur die Spalte "LastUpdate" geändert wurde, ist nur diese Spalte zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert Erfassungstyp "NEW_ROW". Nur die "LastUpdate" wurde geändert, es werden jedoch alle beobachteten Spalten zurückgegeben.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Der folgende Datensatz zur Datenänderung ist ein Beispiel für einen Datensatz mit dem Wert Erfassungstyp "NEW_ROW_AND_OLD_VALUES". Nur die "LastUpdate" wurde geändert, es werden jedoch alle beobachteten Spalten zurückgegeben. Diese Werterfassung type erfasst den neuen und den alten Wert von LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Heartbeat-Rekorde

Wenn ein Heartbeat-Datensatz zurückgegeben wird, zeigt dies an, dass alle Änderungen mit commit_timestamp kleiner oder gleich dem Wert des Heartbeat-Datensatzes timestamp wurden zurückgegeben. Zukünftige Datensätze in diesem Partition muss höhere Commit-Zeitstempel haben als die vom Heartbeat-Rekord. Heartbeat-Einträge werden zurückgegeben, wenn keine Daten vorhanden sind in eine Partition geschriebene Änderungen. Wenn Datenänderungen die Partition, stattdessen kann data_change_record.commit_timestamp verwendet werden von heartbeat_record.timestamp, um zu signalisieren, dass der Leser Fortschritte macht beim Lesen der Partition.

Sie können Heartbeat-Datensätze, die auf Partitionen zurückgegeben wurden, zur Synchronisierung verwenden. in allen Partitionen. Sobald alle Leser entweder eine Heartbeat größer oder gleich einem Zeitstempel A oder Daten oder untergeordnetes Element erhalten Partitionseinträge größer oder gleich dem Zeitstempel A sind, wissen die Leser, dass sie alle Datensätze, für die zum Zeitpunkt oder vor dem Zeitstempel A ein Commit durchgeführt wurde, und können beginnen Verarbeitung der zwischengespeicherten Datensätze, z. B. Sortieren der partitionsübergreifenden Datensätze nach Zeitstempel sortiert und nach server_transaction_id gruppiert.

Ein Heartbeat-Eintrag enthält nur ein Feld:

GoogleSQL

Feld Typ Beschreibung
timestamp TIMESTAMP Der Zeitstempel des Heartbeat-Datensatzes.

PostgreSQL

Feld Typ Beschreibung
timestamp STRING Der Zeitstempel des Heartbeat-Datensatzes.

Beispiel für einen Heartbeat-Datensatz, der kommuniziert, dass alle Datensätze mit Zeitstempeln Es wurden weniger oder gleich dem Zeitstempel dieses Eintrags zurückgegeben:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Untergeordnete Partitionseinträge

Ein untergeordneter Partitionseintrag gibt Informationen über untergeordnete Partitionen zurück: ihre Partitionstokens, die Tokens ihrer übergeordneten Partitionen und den start_timestamp, der den frühesten Zeitstempel darstellt, den das untergeordnete Element enthalten Änderungseinträge. Einträge mit Commit-Zeitstempeln unmittelbar vor child_partitions_record.start_timestamp liegen, werden die in der aktuellen Partition zurückgegeben werden. Nachdem Sie alle untergeordneten Partitionen für diese Partition enthält, gibt diese Abfrage Erfolgsstatus, der angibt, dass alle Datensätze für diese -Partition an.

Die Felder eines untergeordneten Partitionseintrags enthalten Folgendes:

GoogleSQL

Feld Typ Beschreibung
start_timestamp TIMESTAMP Vom untergeordneten Element zurückgegebene Datenänderungseinträge Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel ist größer als oder gleich start_timestamp. Bei der Abfrage einer untergeordneten Partition sollte die Abfrage Geben Sie das untergeordnete Partitionstoken und einen start_timestamp größer oder gleich child_partitions_token.start_timestamp. Alle untergeordneten Partitionen die von einer Partition zurückgegeben wurden, denselben start_timestamp und den gleichen Der Zeitstempel liegt immer zwischen dem in der Abfrage angegebenen start_timestamp und end_timestamp.
record_sequence STRING Eine kontinuierlich ansteigende Folge Zahl, mit der die Reihenfolge der wird der Datensatz von untergeordneten Partitionen aufgenommen, wenn mehrere Partitionsdatensätze, die mit demselben start_timestamp in einem einer bestimmten Partition. Das Partitionstoken, start_timestamp und record_sequence eindeutig identifiziert untergeordneten Partitionen ein.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen.

PostgreSQL

Feld Typ Beschreibung
start_timestamp STRING Vom untergeordneten Element zurückgegebene Datenänderungseinträge Partitionen in diesem untergeordneten Partitionseintrag haben einen Commit-Zeitstempel ist größer als oder gleich start_timestamp. Bei der Abfrage eines untergeordneten Elements -Partition muss in der Abfrage das untergeordnete Partitionstoken und ein start_timestamp größer oder gleich child_partitions_token.start_timestamp. Alle untergeordneten Partitionen Datensätze, die von einer Partition zurückgegeben werden, start_timestamp und der Zeitstempel liegt immer zwischen dem den angegebenen start_timestamp und end_timestamp.
record_sequence STRING Eine kontinuierlich ansteigende Folge Zahl, mit der die Reihenfolge der wird der Datensatz von untergeordneten Partitionen aufgenommen, wenn mehrere Partitionsdatensätze, die mit demselben start_timestamp in einem einer bestimmten Partition. Das Partitionstoken, start_timestamp und record_sequence eindeutig identifiziert untergeordneten Partitionen ein.
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Gibt ein Array von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehört auch der String des Partitionstokens, mit dem das untergeordnete Element identifiziert wird Partition in Abfragen sowie die Tokens der übergeordneten Partitionen.

Das folgende Beispiel zeigt einen untergeordneten Partitionseintrag:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Workflow für Abfrage von Änderungsstreams

Führen Sie mit dem ExecuteStreamingSql API, einmalig schreibgeschützt Transaktion und ein feste Zeitstempelgrenze. Die Änderung Mit der Stream-Lesefunktion können Sie start_timestamp und end_timestamp für den gewünschten Zeitraum. Alle Änderungseinträge innerhalb der Aufbewahrungsdauer sind über die stark schreibgeschützte Version Zeitstempelgrenze.

Alle anderen TransactionOptions sind ungültig für Änderungsstreamabfragen. Außerdem Wenn TransactionOptions.read_only.return_read_timestamp auf „true“ gesetzt ist, wird im Transaction ein spezieller Wert von kint64max - 1 zurückgegeben. Nachricht, die die Transaktion beschreibt, anstelle eines gültigen Lesevorgangs Zeitstempel. Dieser spezielle Wert sollte verworfen und für keine der für nachfolgende Abfragen.

Jede Abfrage des Änderungsstreams kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils entweder einen Datenänderungseintrag, einen Heartbeat-Eintrag oder untergeordnete Partitionen Datensatz. Es muss keine Frist für die Anfrage festgelegt werden.

Beispiel:

Der Workflow für Streamingabfragen beginnt mit der Ausgabe des allerersten Änderungsstreams. durch Angabe von partition_token auf NULL. In der Abfrage muss die Lesefunktion für den Änderungsstream, den Start- und Endzeitstempel von Interesse und das Heartbeat-Intervall. Wenn end_timestamp auf NULL gesetzt ist, behält die Abfrage und Datenänderungen zurückgeben, bis die Partition beendet ist.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Datensätze von dieser Abfrage verarbeiten, bis die Datensätze untergeordneter Partition zurückgegeben. Im Beispiel unten sind zwei untergeordnete Partitionseinträge und drei Partitionseinträge werden zurückgegeben, wird die Abfrage beendet. Untergeordnete Partitionseinträge aus einem bestimmte Abfrage hat immer dieselbe start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Wenn Sie zukünftige Änderungen nach dem 2022-05-01T09:00:01Z verarbeiten möchten, müssen Sie drei neue erstellen und führen sie parallel aus. Die drei Abfragen zusammen geben Änderungen an den Daten für denselben Schlüsselbereich, den ihr übergeordnetes Element abdeckt. Immer die start_timestamp zu start_timestamp im selben untergeordneten Partitionseintrag und dieselben Werte für end_timestamp und Heartbeat-Intervall zur Verarbeitung der Datensätze verwenden bei allen Suchanfragen einheitlich.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Nach einer Weile wird die Abfrage für child_token_2 beendet, nachdem eine andere Abfrage untergeordneten Partitionseintrag. Dieser Eintrag zeigt an, dass eine neue Partition in zukünftigen Änderungen für child_token_2 und child_token_3 ab dem 2022-05-01T09:30:15Z Derselbe Datensatz wird von der Abfrage für child_token_3, da beide die übergeordneten Partitionen des neuen child_token_4 sind. Um eine strikt geordnete Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, Die Abfrage für child_token_4 darf erst gestartet werden, wenn alle übergeordneten Elemente abgeschlossen sind. in diesem Fall child_token_2 und child_token_3. Nur eine Abfrage erstellen Für jedes untergeordnete Partitionstoken sollte im Design des Abfrage-Workflows ein Token festgelegt werden, Parent, um zu warten und die Abfrage für child_token_4 zu planen.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Beispiele für die Verarbeitung und Analyse von Änderungsstreameinträgen finden Sie in der Apache Beam SpannerIO. Dataflow-Connector aktiviert GitHub