Streampartitions, Einträge und Abfragen ändern

Mit Sammlungen den Überblick behalten Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.

Auf dieser Seite werden die folgenden Attribute von Änderungsstreams im Detail beschrieben:

  • Split-basiertes Partitionierungsmodell
  • Format und Inhalt von Änderungsstream-Einträgen
  • Die allgemeine Syntax zum Abfragen dieser Datensätze
  • Beispiel für den Abfrageworkflow

Die Informationen auf dieser Seite sind besonders relevant, um die Spanner API für die direkte Abfrage von Änderungsstreams zu verwenden. Anwendungen, die stattdessen Dataflow zum Lesen von Änderungsstreamdaten verwenden, müssen nicht direkt mit dem hier beschriebenen Datenmodell arbeiten.

Eine umfassendere Einführung zum Ändern von Streams findest du unter Streams ändern.

Streampartitionen ändern

Wenn eine Änderung an einer Tabelle vorgenommen wird, die von einem Änderungsstream beobachtet wird, schreibt Cloud Spanner einen entsprechenden Änderungsstream-Datensatz in die Datenbank, und zwar synchron in derselben Transaktion wie die Datenänderung. Dadurch wird sichergestellt, dass Spanner die Transaktion auch erfolgreich erfasst und beibehalten hat, wenn die Transaktion erfolgreich ist. Intern speichert Spanner den Datensatz für den Änderungsstream und die Datenänderung, sodass sie vom selben Server verarbeitet werden, um den Schreibaufwand zu minimieren.

Im Rahmen der DML an einen bestimmten Split hängt Spanner den Schreibvorgang an die entsprechende Änderungsstream-Datenaufteilung in derselben Transaktion an. Aufgrund dieser Kollokation sorgen Änderungsstreams nicht für eine zusätzliche Koordination zwischen den Bereitstellungsressourcen, wodurch der Aufwand für den Transaktions-Commit minimiert wird.

Image

Spanner skaliert, indem Daten basierend auf der Datenbanklast und -größe dynamisch aufgeteilt und zusammengeführt werden. Außerdem werden die Splits auf die Bereitstellungsressourcen verteilt.

Zum Schreiben und Skalieren von Änderungsstreams teilt Spanner den internen Änderungsstream-Speicher zusammen mit den Datenbankdaten auf und vermeidet so automatisch Hotspots. Damit Änderungsstream-Einträge nahezu in Echtzeit beim Skalieren von Datenbankschreibvorgängen gelesen werden können, wurde die Spanner API so konzipiert, dass ein Änderungsstream mithilfe von Änderungsstream-Partitionen gleichzeitig abgefragt wird. Ändern Sie die Streampartitionszuordnung, um die Aufteilung der Streamdaten zu ändern, die die Streamstream-Einträge enthalten. Änderungsstreams ändern sich dynamisch im Laufe der Zeit und korrelieren damit, wie Spanner die Datenbankdaten dynamisch aufteilt und zusammenführt.

Eine Änderungsstream-Partition enthält Datensätze für einen unveränderlichen Schlüsselbereich für einen bestimmten Zeitraum. Jede Änderungsstream-Partition kann in eine oder mehrere Änderungsstream-Partitionen aufgeteilt oder mit anderen Änderungsstream-Partitionen zusammengeführt werden. Wenn diese Aufteilungs- oder Zusammenführungsereignisse auftreten, werden untergeordnete Partitionen erstellt, um die Änderungen für die entsprechenden unveränderlichen Schlüsselbereiche für den nächsten Zeitraum zu erfassen. Zusätzlich zu Datenänderungseinträgen gibt eine Änderungsstream-Abfrage untergeordnete Partitionseinträge zurück, um Leser über neue Abfragestream-Partitionen zu informieren, die abgefragt werden müssen. Außerdem geben sie Heartbeat-Einträge an, um den Weiterleitungsfortschritt anzuzeigen, wenn in letzter Zeit keine Schreibvorgänge stattgefunden haben.

Beim Abfragen einer bestimmten Änderungsstream-Partition werden die Änderungseinträge in der Reihenfolge des Commit-Zeitstempels zurückgegeben. Jeder Änderungseintrag wird genau einmal zurückgegeben. Bei Änderungsstream-Partitionen kann die Reihenfolge der Änderungsdatensätze nicht garantiert werden. Änderungseinträge für einen bestimmten Primärschlüssel werden nur für eine Partition für einen bestimmten Zeitraum zurückgegeben.

Aufgrund der Herkunft von über- und untergeordneten Partitionen sollten Datensätze, die von untergeordneten Partitionen zurückgegeben werden, erst verarbeitet werden, nachdem Datensätze aller übergeordneten Partitionen verarbeitet wurden.

Syntax der Streamabfrage ändern

Änderungsstreams werden mit der ExecuteStreamingSql API abgefragt. Automatisch wird zusammen mit dem Änderungsstream eine spezielle Tabellenwertfunktion erstellt. Sie erhalten Zugriff auf die Datensätze des Änderungsstreams. Die TVF-Namenskonvention lautet READ_change_stream_name.

Wenn ein Änderungsstream SingersNameStream in der Datenbank vorhanden ist, lautet die Abfragesyntax so:

SELECT ChangeRecord
    FROM READ_SingersNameStream (
        start_timestamp,
        end_timestamp,
        partition_token,
        heartbeat_milliseconds
    )

Die Funktion akzeptiert die folgenden Argumente:

Name des Arguments Typ Erforderlich/Optional? Beschreibung
start_timestamp TIMESTAMP Erforderlich Gibt an, dass Einträge mit commit_timestamp größer oder gleich start_timestamp zurückgegeben werden sollen. Der Wert muss innerhalb der Aufbewahrungsdauer für den Änderungsstream liegen und kleiner oder gleich der aktuellen Zeit und größer oder gleich dem Zeitstempel der Erstellung des Änderungsstreams sein.
end_timestamp TIMESTAMP Optional (Standardeinstellung: NULL) Gibt an, dass Einträge mit commit_timestamp zurückgegeben werden sollen, die kleiner oder gleich end_timestamp sind. Der Wert muss innerhalb der Aufbewahrungsdauer für den Änderungsstream liegen und größer oder gleich start_timestamp sein. Die Abfrage wird abgeschlossen, nachdem alle Änderungseinträge bis zu end_timestamp zurückgegeben wurden oder eine Reihe von untergeordneten Partitionseinträgen zurückgegeben wurde. Wenn NULL nicht angegeben ist, wird die Abfrage ausgeführt, bis die aktuelle Partition abgeschlossen ist und alle ChangeRecords mit festgelegten child_partition_record-Feldern zurückgegeben werden. Wenn Sie NULL für end_timestamp angeben, werden immer die neuesten Änderungen gelesen.
partition_token STRING Optional (Standardeinstellung: NULL) Gibt auf der Grundlage des Inhalts der untergeordneten Partitionseinträge an, welche Streampartition geändert werden soll. Wenn NULL nicht angegeben ist, bedeutet dies, dass der Leser zum ersten Mal den Änderungsstream abfragt und keine bestimmten Partitionstokens abgefragt hat.
heartbeat_milliseconds INT64 Erforderlich Legt fest, wie oft ein Heartbeat-ChangeRecord zurückgegeben wird, wenn in dieser Partition keine Commits ausgeführt werden. Der Wert muss zwischen 1000 (eine Sekunde) und 300000 (fünf Minuten) liegen.

Wir empfehlen, eine praktische Methode zu erstellen, um den Text der TVF-Abfrage und die Bindungsparameter zu erstellen, wie im folgenden Beispiel gezeigt.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
    return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

Format des Stream-Eintrags ändern

Der Änderungsstream TVF gibt eine einzelne ChangeRecord-Spalte vom Typ ARRAY<STRUCT<...>> zurück. In jeder Zeile enthält dieses Array immer ein einzelnes Element.

Die Array-Elemente haben den folgenden Typ:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Diese Struktur hat drei Felder: data_change_record, heartbeat_record und child_partitions_record vom Typ ARRAY<STRUCT<...>>. In jeder Zeile, die vom Änderungsstream TVF zurückgegeben wird, enthält nur eines dieser drei Felder einen Wert. Die anderen beiden sind leer oder NULL. Diese Arrayfelder enthalten höchstens ein Element.

In den folgenden Abschnitten werden die drei Eintragstypen untersucht.

Datensätze zu Datenänderungen

Ein Datensatz für die Datenänderung enthält eine Reihe von Änderungen an einer Tabelle mit demselben Änderungstyp (Einfügen, Aktualisieren oder Löschen), für den in einem Änderungsstream-Partition für dieselbe Transaktion ein Commit-Zeitstempel verwendet wird. Für eine Transaktion können mehrere Datensätze zu Datenänderungen über mehrere Änderungsstream-Partitionen zurückgegeben werden.

Alle Datensätze zu Datenänderungen enthalten die Felder commit_timestamp, server_transaction_id und record_sequence. Diese bestimmen zusammen die Reihenfolge im Änderungsstream eines Streameintrags. Diese drei Felder sind ausreichend, um die Reihenfolge der Änderungen abzuleiten und externe Konsistenz bereitzustellen.

Beachten Sie, dass mehrere Transaktionen denselben Commit-Zeitstempel haben können, wenn sie sich nicht überschneidende Daten berühren. Das Feld server_transaction_id bietet die Möglichkeit zu unterscheiden, welche Änderungen (möglicherweise über Änderungsstream-Partitionen) innerhalb derselben Transaktion vorgenommen wurden. In Kombination mit den Feldern record_sequence und number_of_records_in_transaction können Sie außerdem alle Datensätze einer bestimmten Transaktion zwischenspeichern und sortieren.

Die Felder eines Datenänderungseintrags enthalten Folgendes:

Feld Typ Beschreibung
commit_timestamp TIMESTAMP Der Zeitstempel, in dem die Änderung übernommen wurde.
record_sequence STRING Die Sequenznummer für den Eintrag innerhalb der Transaktion. Die Sequenznummern sind innerhalb einer Transaktion garantiert eindeutig und erhöhen sich kontinuierlich (aber nicht unbedingt zusammenhängend). Sortieren Sie die Datensätze für dieselbe „server_transaction_id“ nach „record_sequence“, um die Reihenfolge der Änderungen innerhalb der Transaktion zu rekonstruieren.
server_transaction_id STRING Ein global eindeutiger String, der die Transaktion darstellt, in der die Änderung übernommen wurde. Der Wert sollte nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen verwendet werden und ist nicht mit der Transaktions-ID in der Spanner API, z.B. „TransactionSelector.id“, verbunden. Mit beiden wird eine Transaktion eindeutig identifiziert, wenn sie mit anderen Werten innerhalb desselben Kontexts (d.h. „stream_data_change_records“ oder Spanner API) verglichen wird.
is_last_record_in_transaction_in_partition BOOL Gibt an, ob dies der letzte Eintrag für eine Transaktion in der aktuellen Partition ist.
table_name STRING Name der Tabelle, die von der Änderung betroffen ist.
value_capture_type STRING

Beschreibt den Werterfassungstyp, der in der Konfiguration des Änderungsstreams beim Erfassen dieser Änderung angegeben wurde.

Derzeit immer "OLD_AND_NEW_VALUES".

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Der Name der Spalte, der Spaltentyp, ob es sich um einen Primärschlüssel handelt, und die im Schema definierte Spalte („ordinal_position“). Die erste Spalte einer Tabelle im Schema hat die ordinale Position „1“. Der Spaltentyp kann für Arrayspalten verschachtelt sein. Das Format entspricht der Typstruktur, die in der Spanner API-Referenz beschrieben wird.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Beschreibt die vorgenommenen Änderungen, einschließlich der Primärschlüsselwerte. Außerdem werden die alten und neuen Werte der geänderten Spalten angegeben, wenn der Änderungsstream mit „value_Capture_type“ konfiguriert ist. Die Felder new_values und old_values enthalten nur die Nicht-Schlüsselspalten. Mitglieder eines JSON-Objekts werden lexikografisch sortiert.
mod_type STRING Beschreibt die Art der Änderung. Entweder INSERT, UPDATE oder DELETE.
number_of_records_in_transaction INT64 Die Anzahl der Datenänderungseinträge, die Teil dieser Transaktion in allen Änderungsstream-Partitionen sind.
number_of_partitions_in_transaction INT64 Die Anzahl der Partitionen, die Datenänderungsdatensätze für diese Transaktion zurückgeben.
transaction_tag STRING Das mit dieser Transaktion verknüpfte Transaktions-Tag.
is_system_transaction BOOL Gibt an, ob es sich bei der Transaktion um eine Systemtransaktion handelt.

Im Anschluss finden Sie ein Beispiel für Datensätze zu Datenänderungen. Sie beschreiben eine einzelne Transaktion, bei der eine Übertragung zwischen zwei Konten stattfindet. Die beiden Konten befinden sich in separaten Streams mit Änderungsstreams.

data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
         "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Herzschlagplatten

Wenn ein Heartbeat-Eintrag zurückgegeben wird, bedeutet dies, dass alle Änderungen mit commit_timestamp zurückgegeben wurden, die kleiner oder gleich dem Heartbeat-Datensatz timestamp sind, und dass zukünftige Datensätze in dieser Partition einen höheren Commit-Zeitstempel haben als der, der vom Heartbeat-Datensatz zurückgegeben wurde. Heartbeat-Einträge werden zurückgegeben, wenn keine Datenänderungen in eine Partition geschrieben wurden. Wenn in die Partition Daten geschrieben wurden, kann anstelle von heartbeat_record.timestamp data_change_record.commit_timestamp verwendet werden, um zu signalisieren, dass der Leser beim Lesen der Partition Fortschritte macht.

Sie können die auf Partitionen zurückgegebenen Heartbeat-Einträge verwenden, um Leser über alle Partitionen hinweg zu synchronisieren. Sobald alle Leser einen Herzschlag empfangen haben, der größer oder gleich einem Zeitstempel A ist, oder Daten oder untergeordnete Partitionsdatensätze erhalten haben, die größer oder gleich dem Zeitstempel A sind, wissen die Leser, dass sie alle Datensätze erhalten haben, die mit oder vor diesem Zeitstempel (A) festgeschrieben sind. Sie können damit beginnen, die zwischengespeicherten Datensätze nach Zeitstempel zu sortieren und nach server_transaction_id zu gruppieren.

Ein Heartbeat-Datensatz enthält nur ein Feld:

Feld Typ Beschreibung
timestamp TIMESTAMP Der Zeitstempel des Herzschlags.

Beispiel für einen Heartbeat-Datensatz, der mitteilt, dass alle Datensätze mit Zeitstempeln, die kleiner oder gleich diesem Zeitstempel sind, zurückgegeben wurden:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Einträge untergeordneter Partitionen

Ein Eintrag für untergeordnete Partitionen gibt Informationen zu untergeordneten Partitionen zurück: ihren Partitionstokens, den Tokens der übergeordneten Partitionen und dem start_timestamp, der den frühesten Zeitstempel darstellt, für den die untergeordneten Partitionen Änderungsdatensätze enthalten. Datensätze, deren Commit-Zeitstempel unmittelbar vor child_partitions_record.start_timestamp liegen, werden in der aktuellen Partition zurückgegeben. Nachdem alle Einträge der untergeordneten Partitionen für diese Partition zurückgegeben wurden, wird diese Abfrage mit einem Erfolgsstatus zurückgegeben, der angibt, dass alle Einträge für diese Partition zurückgegeben wurden.

Die Felder eines Eintrags mit untergeordneten Partitionen enthalten Folgendes:

Feld Typ Beschreibung
start_timestamp TIMESTAMP Die Datenänderungseinträge, die von untergeordneten Partitionen in diesem untergeordneten Partitionseintrag zurückgegeben werden, haben einen Commit-Zeitstempel größer oder gleich start_timestamp. Beim Abfragen einer untergeordneten Partition sollten in der Abfrage das Token der untergeordneten Partition und eine start_timestamp größer oder gleich child_partitions_token.start_timestamp angegeben werden. Alle untergeordneten Partitionen, die von einer Partition zurückgegeben werden, haben dieselbe start_timestamp und der Zeitstempel liegt immer zwischen den angegebenen start_timestamp und end_timestamp der Abfrage.
record_sequence STRING Eine kontinuierlich ansteigende Sequenznummer, mit der die Reihenfolge des untergeordneten Partitionssatzes definiert werden kann, wenn mehrere untergeordnete Partitionseinträge mit demselben start_timestamp in einer bestimmten Partition zurückgegeben werden. Das Partitionstoken start_timestamp und record_sequence kennzeichnen einen untergeordneten Partitionseintrag eindeutig.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Gibt eine Reihe von untergeordneten Partitionen und die zugehörigen Informationen zurück. Dazu gehören der Partitions-Tokenstring, mit dem die untergeordnete Partition in Abfragen identifiziert wird, sowie die Tokens der übergeordneten Partitionen.

Beispiel für einen untergeordneten Partitionseintrag:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"],
    }
  ],
}

Abfrage-Workflow für Streams ändern

Führen Sie mit dem ExecuteStreamingSql API Abfragen zu Änderungsstreams aus. Sie haben dabei die Möglichkeit, eine einmalige Transaktion durchzuführen, die nur einmal verwendet wird, und haben eine starke Zeitstempelbindung. Mit dem Änderungsstream-TVF können Nutzer start_timestamp und end_timestamp für den gewünschten Zeitraum angeben. Auf alle Änderungsdatensätze innerhalb der Aufbewahrungsdauer kann mit der starken schreibgeschützten Zeitstempelgrenze zugegriffen werden.

Alle anderen TransactionOptions sind für Abfragen des Änderungsstreams ungültig. Wenn TransactionOptions.read_only.return_read_timestamp auf „true“ gesetzt ist, wird in der Transaction-Nachricht, die die Transaktion beschreibt, ein spezieller Wert von kint64max - 1 anstelle eines gültigen Lesezeitstempels zurückgegeben. Dieser Sonderwert sollte verworfen und nicht für nachfolgende Abfragen verwendet werden.

Jede Abfrage des Änderungsstreams kann eine beliebige Anzahl von Zeilen zurückgeben, die jeweils einen Datensatz mit Datenänderung, einen Herzschlag oder einen untergeordneten Partitionseintrag enthalten. Sie müssen keine Frist für die Anfrage festlegen.

Beispiel:

Der Workflow für Streamingabfragen beginnt mit der Abfrage des ersten Änderungsstreams. Dazu wird partition_token auf NULL festgelegt. In der Abfrage müssen die TVF-Funktion für den Änderungsstream, den gewünschten Zeitstempel für den Start und das Ende und das Herzschlagintervall angegeben werden. Wenn end_timestamp den Wert NULL hat, gibt die Abfrage weiterhin Datenänderungen zurück, bis untergeordnete Partitionen erstellt wurden.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:00-00",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

Verarbeiten Sie Datensätze aus dieser Abfrage, bis Datensätze der untergeordneten Partition zurückgegeben werden. Im folgenden Beispiel werden zwei untergeordnete Partitionseinträge und drei Partitionstokens zurückgegeben und die Abfrage wird beendet. Untergeordnete Partitionseinträge einer bestimmten Abfrage haben immer denselben start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
      // from the initial change stream queries.
      "parent_partition_tokens": [NULL],
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL],
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL],
    }
  ],
}

Erstellen Sie drei neue Abfragen und führen Sie parallel sie aus, um zukünftige Änderungen nach 2022-05-01 09:00:01-00 zu verarbeiten. Bei diesen drei Abfragen werden zukünftige Datenänderungen für denselben Schlüsselbereich zurückgegeben, den ihre übergeordneten Abfragen abdecken. Setze start_timestamp immer auf start_timestamp im selben untergeordneten Partitionseintrag und verwende dasselbe end_timestamp- und Heartbeat-Intervall, um die Einträge über alle Abfragen hinweg einheitlich zu verarbeiten.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000);

Nach einer Weile ist die Abfrage für child_token_2 abgeschlossen, nachdem ein anderer Eintrag für eine untergeordnete Partition zurückgegeben wurde. Dieser Eintrag gibt an, dass eine neue Partition zukünftige Änderungen für child_token_2 und child_token_3 ab 2022-05-01 09:30:15-00 abdeckt. Der genaue Eintrag wird von der Abfrage am child_token_3 zurückgegeben, da beide die übergeordneten Partitionen der neuen child_token_4 sind. Um eine strenge geordnete Verarbeitung von Datensätzen für einen bestimmten Schlüssel zu gewährleisten, darf die Abfrage für child_token_4 erst beginnen, nachdem alle übergeordneten Elemente abgeschlossen sind, in diesem Fall child_token_2 und child_token_3. Erstellen Sie nur eine Abfrage für jedes untergeordnete Partitionstoken. Das Abfrageworkflow-Design sollte ein übergeordnetes Element festlegen, das die Abfrage auf child_token_4 plant.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:30:15-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:30:15-00",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

Beispiele für die Verarbeitung und das Parsen von Änderungsstream-Einträgen im Dataflow-Connector von Apache Beam Spanner auf GitHub