Streampartitionen, -einträge und -abfragen ändern

Auf dieser Seite werden die folgenden Attribute von Änderungsstreams detailliert beschrieben:

  • Das geteilte Partitionierungsmodell
  • Format und Inhalt von Änderungsstream-Einträgen
  • Die Syntax auf niedriger Ebene, die zum Abfragen dieser Einträge verwendet wird
  • Beispiel für den Abfrage-Workflow

Die Informationen auf dieser Seite sind am wichtigsten für die Verwendung der Spanner API zur Abfrage von Änderungsstreams direkt. Anwendungen, die stattdessen Dataflow zum Lesen von Änderungsstreamdaten verwenden, müssen nicht direkt mit dem hier beschriebenen Datenmodell arbeiten.

Eine umfassendere Einführung zum Ändern von Streams findest du unter Streams ändern.

Streampartitionen ändern

Wenn eine Änderung in einer Tabelle stattfindet, die von einem Änderungsstream beobachtet wird, schreibt Cloud Spanner einen entsprechenden Änderungsstream-Eintrag in die Datenbank, synchron in derselben Transaktion wie die Datenänderung. Dies garantiert, dass die Änderung auch erfolgreich durchgeführt wurde, wenn die Transaktion erfolgreich war. Intern speichert Spanner den Datensatz des Änderungsstreams und die Datenänderung, sodass sie vom selben Server verarbeitet werden, um den Schreibaufwand zu minimieren.

Im Rahmen der DML an eine bestimmte Aufteilung fügt Spanner den Schreibvorgang an die entsprechende Änderungsstream-Datenaufteilung in derselben Transaktion hinzu. Aufgrund dieser Colocation wird durch die Änderungsstreams keine zusätzliche Koordination bei den Bereitstellungsressourcen hinzugefügt, was den Aufwand für den Commit für Transaktionen minimiert.

Image

Spanner skaliert durch dynamisches Aufteilen und Zusammenführen von Daten auf Grundlage der Datenbanklast und -größe und verteilt die Splits auf die Bereitstellungsressourcen.

Damit Änderungsstreams zum Skalieren und Lesen aktiviert werden, teilt Spanner den internen Änderungsstreamspeicher sowie die Datenbankdaten auf und vermeidet automatisch Hotspots. Damit Änderungsstream-Einträge nahezu in Echtzeit beim Lesen von Datenbanken gelesen werden können, ist die Spanner API darauf ausgelegt, gleichzeitig mithilfe von Änderungsstream-Partitionen einen Änderungsstream abzufragen. Streampartitionszuordnung ändern, um Streamdatenaufteilungen zu ändern, die die Streamstream-Einträge enthalten. Ein Änderungsstream ändert sich im Laufe der Zeit dynamisch und ist abhängig davon, wie die Datenbankdaten dynamisch von Spanner geteilt und zusammengeführt werden.

Eine Änderungsstream-Partition enthält Datensätze für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Änderungsstream-Partition kann in eine oder mehrere Änderungsstream-Partitionen aufgeteilt oder mit anderen Änderungsstream-Partitionen zusammengeführt werden. Wenn diese Ereignisse aufgeteilt oder zusammengeführt werden, werden untergeordnete Partitionen erstellt, um die Änderungen für den jeweiligen unveränderlichen Schlüsselbereich des nächsten Zeitraums zu erfassen. Zusätzlich zu den Datenänderungseinträgen gibt eine Änderungsstream-Abfrage untergeordnete Partitionseinträge zurück, um Leser über neue Änderungsstream-Partitionen zu informieren, die abgefragt werden müssen, sowie Heartbeat-Einträge, um den Weiterleitungsfortschritt anzeigen zu lassen, wenn in letzter Zeit keine Schreibvorgänge stattgefunden haben.

Beim Abfragen einer bestimmten Änderungsstream-Partition werden die Änderungseinträge in der Commit-Zeitstempelreihenfolge zurückgegeben. Jeder Änderungseintrag wird genau einmal zurückgegeben. Es gibt keine Garantie für die Reihenfolge der Änderungsstream-Partitionen. Änderungseinträge für einen bestimmten Primärschlüssel werden nur in einer Partition für einen bestimmten Zeitraum zurückgegeben.

Aufgrund der Herkunft dieser hierarchischen Partition sollten die von untergeordneten Partitionen zurückgegebenen Datensätze erst verarbeitet werden, wenn die Einträge aller übergeordneten Partitionen verarbeitet wurden, um Änderungen für einen bestimmten Schlüssel zu verarbeiten.

Syntax der Streamabfrage ändern

Änderungsstreams werden mit der ExecuteStreamingSql API abgefragt. Zusammen mit dem Änderungsstream wird automatisch eine spezielle Tabellenwertfunktion (TV-Wert) erstellt. Sie haben dann Zugriff auf den Änderungsstream. Die TVF-Namenskonvention lautet READ_change_stream_name.

Wenn ein Änderungsstream SingersNameStream in der Datenbank vorhanden ist, lautet die Abfragesyntax so:

SELECT ChangeRecord
    FROM READ_SingersNameStream (
        start_timestamp,
        end_timestamp,
        partition_token,
        heartbeat_milliseconds
    )

Die Funktion akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp TIMESTAMP Erforderlich Gibt an, dass zurückgegeben werden soll, wenn commit_timestamp größer oder gleich start_timestamp ist. Der Wert muss innerhalb der Aufbewahrungsdauer für den Änderungsstream liegen und kleiner oder gleich der aktuellen Zeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein.
end_timestamp TIMESTAMP Optional (Standardeinstellung: NULL) Gibt an, dass Einträge mit commit_timestamp zurückgegeben werden sollen, die kleiner oder gleich end_timestamp sind. Der Wert muss innerhalb der Aufbewahrungsdauer für den Änderungsstream liegen und größer oder gleich start_timestamp sein. Die Abfrage wird abgeschlossen, nachdem alle ChangeRecords bis zu end_timestamp zurückgegeben oder ein Satz von untergeordneten Partitionseinträgen zurückgegeben wurden. Wenn NULL nicht angegeben ist, wird die Abfrage ausgeführt, bis die aktuelle Partition abgeschlossen ist und alle ChangeRecords mit ihren festgelegten child_partition_record-Feldern zurückgegeben werden. Wenn Sie NULL für end_timestamp angeben, werden immer die neuesten Änderungen gelesen.
partition_token STRING Optional (Standardeinstellung: NULL) Gibt anhand des Inhalts der untergeordneten Partitioneneinträge an, welche Streampartition abgefragt werden soll. Wenn NULL nicht angegeben ist, bedeutet dies, dass der Leser den Änderungsstream zum ersten Mal abfragt und keine spezifischen Partitionstokens abgerufen hat.
heartbeat_milliseconds INT64 Erforderlich Legt fest, wie oft ein HeartRecord ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Transaktionen ausgeführt werden. Der Wert muss zwischen 1000 (eine Sekunde) und 300000 (fünf Minuten) liegen.

Wir empfehlen, eine praktische Methode einzurichten, um den Text der TVF-Abfrage und die Bindungsparameter zu erstellen, wie im folgenden Beispiel gezeigt.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
    return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

Format des Streameintrags ändern

Der Änderungsstream-TVF gibt eine einzelne ChangeRecord-Spalte vom Typ ARRAY<STRUCT<...>> zurück. In jeder Zeile enthält dieses Array immer ein einzelnes Element.

Array-Elemente haben den folgenden Typ:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Diese Struktur hat drei Felder: data_change_record, heartbeat_record und child_partitions_record mit jeweils ARRAY<STRUCT<...>>. In jeder Zeile, die von dem Änderungsstream-TVF zurückgegeben wird, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder NULL. Diese Arrayfelder enthalten höchstens ein Element.

In den folgenden Abschnitten wird jeder dieser drei Eintragstypen untersucht.

Datensätze zur Datenänderung

Ein Datensatz mit Datenänderungen enthält eine Reihe von Änderungen an einer Tabelle mit demselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), der für einen Commit-Zeitstempel in einer Änderungsstream-Partition für dieselbe Transaktion verwendet wird. Für dieselbe Transaktion können mehrere Datensätze zur Datenänderung über mehrere Änderungsstream-Partitionen zurückgegeben werden.

Alle Datensätze zu Datenänderungen haben die Felder commit_timestamp, server_transaction_id und record_sequence. Diese bestimmen zusammen die Reihenfolge im Änderungsstream für einen Streamdatensatz. Mit diesen drei Feldern kann die Reihenfolge der Änderungen abgeleitet und externe Konsistenz bereitgestellt werden.

Beachten Sie, dass mehrere Transaktionen denselben Commit-Zeitstempel haben können, wenn sie sich überlappende Daten berühren. Mit dem Feld server_transaction_id können Sie unterscheiden, welche Änderungen (möglicherweise über Änderungsstream-Partitionen) innerhalb derselben Transaktion vorgenommen wurden. Wenn du sie mit den Feldern record_sequence und number_of_records_in_transaction koppelst, kannst du auch alle Datensätze einer bestimmten Transaktion zwischenspeichern und sortieren.

Die Felder eines Datenänderungseintrags enthalten Folgendes:

Feld Typ Beschreibung
commit_timestamp TIMESTAMP Der Zeitstempel, in dem die Änderung übernommen wurde.
record_sequence STRING Die Sequenznummer für den Eintrag innerhalb der Transaktion. Die Sequenznummern innerhalb einer Transaktion werden garantiert eindeutig und monoton erhöht (aber nicht unbedingt zusammenhängend). Sortieren Sie die Einträge für dieselbe „server_transaction_id“ nach „record_sequenz“, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung vorgenommen wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Einträgen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API verknüpft, z.B. „TransactionSelector.id“. Durch beide wird eine Transaktion eindeutig verglichen mit anderen Werten innerhalb desselben Kontexts (z.B. Änderung des Streams „data_change_records“ oder der Spanner API).
is_last_record_in_transaction_in_partition BOOL Gibt an, ob dies der letzte Eintrag für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der Konfiguration des Änderungsstreams angegeben wurde, als diese Änderung erfasst wurde

Derzeit immer "OLD_AND_NEW_VALUES".

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Der Name der Spalte, der Spaltentyp, ob es sich um einen Primärschlüssel handelt, und die Position der Spalte, die im Schema definiert ist („ordinal_position“). Die erste Spalte einer Tabelle im Schema hätte die Ordinalposition „1“. Der Spaltentyp kann für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur in der Spanner API-Referenz.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Beschreibt die vorgenommenen Änderungen, einschließlich der Primärschlüsselwerte, sowie die alten und neuen Werte der geänderten Spalten, wenn der Änderungsstream mit „value_Capture_type“ konfiguriert ist. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Die Anzahl der Datenänderungsdatensätze, die Teil dieser Transaktion in allen Änderungsstream-Partitionen sind.
number_of_partitions_in_transaction INT64 Die Anzahl der Partitionen, die Datenänderungseinträge für diese Transaktion zurückgeben.

Unten finden Sie zwei Beispiele für Datensätze zur Datenänderung. Sie beschreiben eine einzelne Transaktion, bei der eine Übertragung zwischen zwei Konten stattfindet. Beachten Sie, dass sich die beiden Konten in separaten Stream-Partitionen befinden.

data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
         "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
}
data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
}

Herzschlagdatensätze

Wenn ein Heartbeat-Eintrag zurückgegeben wird, bedeutet das, dass alle Änderungen mit commit_timestamp kleiner oder gleich dem Heartbeat-Eintrag zurückgegeben wurdentimestamp. Zukünftige Datensatz in dieser Partition müssen höhere Commit-Zeitstempel haben als der Datensatz, der vom Herzschlageintrag zurückgegeben wurde. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben werden. Wenn Daten in die Partition geändert werden, kann data_change_record.commit_timestamp anstelle von heartbeat_record.timestamp verwendet werden, um festzustellen, ob der Leser die Partition weiterlesen kann.

Sie können Heartbeat-Einträge für Partitionen verwenden, um Leser über alle Partitionen hinweg zu synchronisieren. Sobald alle Leser einen Herzschlag- Zahlungenwert von mindestens dem Zeitstempel A oder Daten oder untergeordnete Partitionseinträge erhalten haben, die größer oder gleich dem Zeitstempel A sind, wissen die Leser, dass sie alle Aufzeichnungen empfangen haben, die mit diesem Zeitstempel A oder früher verknüpft sind, und können die gepufferten Datensätze beispielsweise nach dem Zeitstempel sortieren und nach server_transaction_id gruppieren.

Ein Heartbeat-Eintrag enthält nur ein Feld:

Feld Typ Beschreibung
timestamp TIMESTAMP Der Heartbeat-Zeitstempel.

Beispiel für einen Heartbeat-Eintrag, der angibt, dass alle Datensätze mit einem Zeitstempel zurückgegeben werden, der kleiner oder gleich diesem Zeitstempel ist:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Untergeordnete Partitioneneinträge

Ein Eintrag für untergeordnete Partitionen gibt Informationen zu untergeordneten Partitionen zurück: ihren Partitionstokens, den Tokens der übergeordneten Partitionen und dem start_timestamp, der den frühesten Zeitstempel für die untergeordneten Partitionen enthält. Datensätze, deren Commit-Zeitstempel unmittelbar vor dem child_partitions_record.start_timestamp liegen, werden in der aktuellen Partition zurückgegeben. Nachdem Sie alle untergeordneten Partitionen für diese Partition zurückgegeben haben, wird die Abfrage mit einem Erfolgsstatus zurückgegeben, der angibt, dass alle Einträge für diese Partition zurückgegeben wurden.

Die Felder eines untergeordneten Partitionseintrags enthalten Folgendes:

Feld Typ Beschreibung
start_timestamp TIMESTAMP Von den untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegebene Datenänderungseinträge haben einen Commit-Zeitstempel, der größer oder gleich start_timestamp ist. Bei der Abfrage einer untergeordneten Partition sollte das untergeordnete Partitions-Token und ein start_timestamp-Wert größer oder gleich child_partitions_token.start_timestamp angegeben werden. Alle untergeordneten Partitionen, die von einer Partition zurückgegeben werden, haben denselben start_timestamp. Der Zeitstempel liegt immer zwischen den Abfragen start_timestamp und end_timestamp.
record_sequence STRING Eine monoton steigende Sequenznummer, mit der die Reihenfolge des untergeordneten Partitionseintrags definiert werden kann, wenn mehrere untergeordnete Partitionseinträge mit demselben start_timestamp in einer bestimmten Partition zurückgegeben werden. Das Partitionstoken start_timestamp und record_sequence kennzeichnen eindeutig einen untergeordneten Partitionseintrag.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Gibt eine Gruppe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitions-Tokenstring, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen.

Beispiel für einen untergeordneten Partitionseintrag:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"],
    }
  ],
}

Workflow für Streamabfragen ändern

Abfragen für Änderungsstreams mithilfe der ExecuteStreamingSql API mit einer Einmal-schreibgeschützten Transaktion und einer starken Zeitstempelbindung ausführen. Mit dem Änderungs-Stream-TVF können Nutzer start_timestamp und end_timestamp für den gewünschten Zeitraum angeben. Auf alle Änderungseinträge innerhalb der Aufbewahrungsdauer kann mithilfe der starken schreibgeschützten Zeitstempelgrenze zugegriffen werden.

Alle anderen TransactionOptions sind für Änderungen des Streams ungültig. Wenn TransactionOptions.read_only.return_read_timestamp auf „true“ gesetzt ist, wird anstelle eines gültigen Lesezeitstempels der spezielle Wert kint64max - 1 in der Nachricht Transaction zurückgegeben, die die Transaktion beschreibt. Dieser spezielle Wert sollte verworfen und nicht für nachfolgende Abfragen verwendet werden.

Jede Änderungsstream-Abfrage kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils einen Datenänderungs-, einen Heartbeat-Eintrag oder einen untergeordneten Partitionseintrag enthalten. Es muss keine Frist für die Anfrage festgelegt werden.

Beispiel:

Der Streamingabfrage-Workflow beginnt mit der erstmaligen Änderung des Streams, indem partition_token auf NULL gesetzt wird. In der Abfrage müssen die TVF-Funktion für den Änderungsstream, den gewünschten Zeitstempel (Start und Ende) und das Heartbeat-Intervall angegeben werden. Wenn end_timestamp den Wert NULL hat, gibt die Abfrage so lange Datenänderungen zurück, bis die untergeordneten Partitionen erstellt wurden.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:00-00",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

Verarbeiten Sie Dateneinträge aus dieser Abfrage, bis untergeordnete Partitionseinträge zurückgegeben werden. Im folgenden Beispiel werden zwei untergeordnete Partitionseinträge und drei Partitionstokens zurückgegeben. Anschließend wird die Abfrage beendet. Untergeordnete Partitionseinträge aus einer bestimmten Abfrage teilen immer denselben start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
      // from the initial change stream queries.
      "parent_partition_tokens": [NULL],
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL],
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL],
    }
  ],
}

Zur Verarbeitung zukünftiger Änderungen nach 2022-05-01 09:00:01-00 müssen Sie drei neue Abfragen erstellen und parallel ausführen. Die drei Abfragen geben später zukünftige Datenänderungen für den gleichen Schlüsselbereich zurück, den ihre übergeordneten Bereiche abdecken. Legen Sie start_timestamp immer auf start_timestamp im selben untergeordneten Partitionseintrag fest und verwenden Sie dasselbe end_timestamp- und Heartbeat-Intervall, um die Einträge einheitlich über alle Abfragen hinweg zu verarbeiten.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000);

Wenn die Abfrage in child_token_2 nach einiger Zeit abgeschlossen ist, nachdem ein anderer untergeordneter Partitionseintrag zurückgegeben wurde, weist dieser Eintrag darauf hin, dass eine neue Partition ab dem 2022-05-01 09:30:15-00 künftige Änderungen für child_token_2 und child_token_3 abdeckt. Der exakt gleiche Eintrag wird von der Abfrage am child_token_3 zurückgegeben, da beide die übergeordneten Partitionen der neuen child_token_4 sind. Um eine strenge Reihenfolge der Datensätze für einen bestimmten Schlüssel zu gewährleisten, darf die Abfrage für child_token_4 erst nach Abschluss aller übergeordneten Elemente beginnen – in diesem Fall child_token_2 und child_token_3. Erstellen Sie für jedes untergeordnete Partitionstoken nur eine Abfrage. Beim Design des Abfrageworkflows sollte eine übergeordnete Person darauf warten, die Abfrage abzuwarten und für child_token_4 zu planen.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:30:15-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:30:15-00",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

Beispiele zum Verarbeiten und Parsen von Änderungsstream-Einträgen im Apache Beam SpannerIO-Connector auf GitHub