Ereignisse und Streams

Überblick

Die Datenhierarchie in Datastream besteht aus folgenden Elementen:

  • Ein Stream, der aus einer Datenquelle und einem Ziel besteht.
  • Objekt: ein Teil eines Streams, z. B. eine Tabelle aus einer bestimmten Datenbank.
  • Ereignis: eine einzelne Änderung, die von einem bestimmten Objekt generiert wird, z. B. eine Datenbankeinfügung.

Streams, Objekte und Ereignisse sind mit Daten und Metadaten verknüpft. Diese Daten und Metadaten können für verschiedene Zwecke verwendet werden.

Über Ereignisse

Jedes Ereignis besteht aus drei Datentypen:

  • Ereignisdaten:Die Änderung des Objekts an den Daten aus der Streamquelle. Jedes Ereignis enthält die gesamte Zeile, die geändert wurde.
  • Allgemeine Metadaten: Diese Metadaten werden bei jedem von Datastream generierten Ereignis angezeigt, das für Aktionen verwendet wird, z. B. um doppelte Daten am Ziel zu entfernen.
  • Quellenspezifische Metadaten:Diese Metadaten werden für jedes Ereignis angezeigt, das von einer bestimmten Streamquelle generiert wird. Diese Metadaten variieren je nach Quelle.

Ereignisdaten

Ereignisdaten sind die Nutzlast jeder Änderung aus einem bestimmten Objekt, das aus einer Streamquelle stammt.

Ereignisse haben entweder das Avro- oder JSON-Format. Bei dem Avro-Format enthält das Ereignis in jeder Spalte den Spaltenindex und den Spaltenwert. Mit dem Spaltenindex können der Spaltenname und der einheitliche Typ aus dem Schema im Avro-Header abgerufen werden.

Bei Verwendung des JSON-Formats enthält das Ereignis für jede Spalte den Spaltennamen und -wert.

Ereignismetadaten können verwendet werden, um Informationen zum Ursprung des Ereignisses zu erfassen und doppelte Daten am Ziel zu entfernen und Ereignisse nach dem nachgelagerten Nutzer zu sortieren.

In den folgenden Tabellen werden die Felder und Datentypen für allgemeine und quellenspezifische Ereignismetadaten aufgelistet und beschrieben.

Allgemeine Metadaten

Diese Metadaten sind für alle Streams aller Typen einheitlich.

Feld Avro-Typ JSON-Typ Beschreibung
stream_name String String Der eindeutige Stream-Name, wie er bei der Erstellung festgelegt wurde.
read_method String String

Gibt an, ob die Daten aus der Quelle mit einer CDC-Methode (Change Data Capture) als Teil des verlaufsbezogenen Backfills oder als Teil einer Ergänzungsaufgabe gelesen wurden, die beim Rollback einer Transaktion während der CDC-Replikation erstellt wird.

Zulässige Werte:

  • oracle-cdc-logminer
  • oracle-backfill
  • oracle-supplementation
  • mysql-cdc-binlog
  • mysql-backfill-incremental
  • mysql-backfill-fulldump
  • postgres-cdc-wal
  • postgresql-backfill
object String String Der Name, mit dem verschiedene Ereignistypen gruppiert werden, in der Regel der Name der Tabelle oder des Objekts in der Quelle.
schema_key String String Die eindeutige Kennung für das einheitliche Schema des Ereignisses.
uuid String String Eine eindeutige Kennung für das von Datastream generierte Ereignis.
read_timestamp timestamp-millis String Der Zeitstempel (UTC), wenn der Datensatz von Datastream gelesen wurde (der Epochenzeitstempel in Millisekunden).
source_timestamp timestamp-millis String Der Zeitstempel (UTC), wenn sich der Eintrag in der Quelle geändert hat (der Epochenzeitstempel in Millisekunden).
sort_keys {"type": "array", "items": ["string", "long"]} array Ein Array von Werten, mit denen die Ereignisse in der Reihenfolge sortiert werden können, in der sie aufgetreten sind.

Quellspezifische Metadaten

Diese Metadaten sind CDC- und Backfill-Ereignissen aus einer Quelldatenbank zugeordnet. Wählen Sie im Drop-down-Menü unten eine Quelle aus, um diese Metadaten aufzurufen.

Quelle Feld Avro-Typ JSON-Typ Beschreibung
MySQL log_file String String Die Logdatei, aus der Datastream Ereignisse bei der CDC-Replikation abruft.
MySQL log_position long long Die Logposition (Offset) im binären MySQL-Log.
MySQL primary_keys String-Array String-Array Die Liste mit einem oder mehreren Spaltennamen, aus denen der Primärschlüssel der Tabellen besteht. Wenn die Tabelle keinen Primärschlüssel hat, ist dieses Feld leer.
MySQL is_deleted boolean boolean
  • Der Wert true gibt an, dass die Zeile in der Quelle gelöscht wurde.
  • Der Wert false gibt an, dass die Zeile nicht gelöscht wurde.
MySQL database String String Die dem Ereignis zugeordnete Datenbank.
MySQL table String String Die dem Ereignis zugeordnete Tabelle.
MySQL change_type String String

Die Art der Änderung (INSERT, UPDATE-INSERT, UPDATE-DELETE und DELETE), die das Ereignis darstellt.

Oracle log_file String String Die Logdatei, aus der Datastream Ereignisse bei der CDC-Replikation abruft.
Oracle scn long long Die Logposition (Offset) im Oracle-Transaktionslog.
Oracle row_id String String row_id von Oracle.
Oracle is_deleted boolean boolean
  • Der Wert true gibt an, dass die Zeile in der Quelle gelöscht wurde.
  • Der Wert false gibt an, dass die Zeile nicht gelöscht wurde.
Oracle database String String Die dem Ereignis zugeordnete Datenbank.
Oracle schema String String Das Schema, das der Tabelle aus dem Ereignis zugeordnet ist.
Oracle table String String Die dem Ereignis zugeordnete Tabelle.
Oracle change_type String String

Die Art der Änderung (INSERT, UPDATE-INSERT, UPDATE-DELETE und DELETE), die das Ereignis darstellt.

Oracle tx_id String String Die Transaktions-ID, zu der das Ereignis gehört.
Oracle rs_id String String Die Datensatz-ID. Durch die Kopplung von rs_id und ssn wird eine Zeile in V$LOGMNR_CONTENTS eindeutig identifiziert. rs_id identifiziert eindeutig den Wiederholungsdatensatz, der die Zeile generiert hat.
Oracle ssn long long Eine SQL-Sequenznummer. Diese Zahl wird mit rs_id verwendet und kennzeichnet eine Zeile in V$LOGMNR_CONTENTS eindeutig.
PostgreSQL schema String String Das Schema, das der Tabelle aus dem Ereignis zugeordnet ist.
PostgreSQL table String String Die dem Ereignis zugeordnete Tabelle.
PostgreSQL is_deleted boolean boolean
  • Der Wert true gibt an, dass die Zeile in der Quelle gelöscht wurde.
  • Der Wert false gibt an, dass die Zeile nicht gelöscht wurde.
PostgreSQL change_type String String Die Art der Änderung (INSERT, UPDATE, DELETE), die das Ereignis darstellt.
PostgreSQL tx_id String String Die Transaktions-ID, zu der das Ereignis gehört.
PostgreSQL lsn String String Die Logsequenznummer für den aktuellen Eintrag.
PostgreSQL primary_keys String-Array String-Array Die Liste mit einem oder mehreren Spaltennamen, aus denen der Primärschlüssel der Tabellen besteht. Wenn die Tabelle keinen Primärschlüssel hat, ist dieses Feld leer.
SQL Server table String String Die dem Ereignis zugeordnete Tabelle.
SQL Server database long long Die dem Ereignis zugeordnete Datenbank.
SQL Server schema String-Array String-Array Das Schema, das der Tabelle aus dem Ereignis zugeordnet ist.
SQL Server is_deleted boolean boolean
  • Der Wert „true“ gibt an, dass die Zeile in der Quelle gelöscht wurde.
  • Ein falscher Wert bedeutet, dass die Zeile nicht gelöscht wurde.
SQL Server lsn String String Die Logsequenznummer für das Ereignis.
SQL Server tx_id String String Die Transaktions-ID, zu der das Ereignis gehört.
SQL Server physical_location Ganzzahl-Array Ganzzahl-Array Physischer Speicherort des Protokolleintrags, angegeben durch drei Ganzzahlen: Datei-ID, Seiten-ID und Slot-ID des Eintrags.
SQL Server replication_index String-Array String-Array Die Liste der Spaltennamen eines Index, die eine Zeile in der Tabelle eindeutig identifizieren können.
SQL Server change_type String String

Die Art der Änderung („INSERT“, „UPDATE“, „DELETE“), die das Ereignis darstellt.

Beispiel für einen Ereignisfluss

Dieser Ablauf veranschaulicht die Ereignisse, die von drei aufeinanderfolgenden Vorgängen generiert werden: INSERT, UPDATE und DELETE in einer einzelnen Zeile in einer SAMPLE-Tabelle für eine Quelldatenbank.

UHRZEIT THIS_IS_MY_PK (int) FIELD1 (nchar, Nullwerte zulässig) FIELD2 (nchar non-null)>
0 1231535353 foo TLV
1 1231535353 NULL TLV

INSERT (T0)

Die Nachrichtennutzlast besteht aus der Gesamtheit der neuen Zeile.

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id":
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}

UPDATE (T1)

Die Nachrichtennutzlast besteht aus der Gesamtheit der neuen Zeile. Vorherige Werte werden nicht berücksichtigt.

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",
  "source_metadata": {
    "log_file":
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

DELETE (T2)

Die Nachrichtennutzlast besteht aus der Gesamtheit der neuen Zeile.

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file":
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

Sortierung und Konsistenz

In diesem Abschnitt wird erläutert, wie Datastream die Sortierung und Konsistenz verarbeitet.

Bestellung

Datastream garantiert die Reihenfolge nicht, aber jedes Ereignis enthält die vollständige Datenzeile und den Zeitstempel des Zeitpunkts, zu dem die Daten in die Quelle geschrieben wurden. In BigQuery werden Ereignisse in falscher Reihenfolge automatisch in der richtigen Reihenfolge zusammengeführt. BigQuery verwendet die Ereignismetadaten und eine interne Änderungssequenznummer (Change Sequence Number, CSN), um die Ereignisse in der richtigen Reihenfolge auf die Tabelle anzuwenden. In Cloud Storage können sich Ereignisse aus demselben Zeitpunkt über mehrere Dateien erstrecken.

In der falschen Reihenfolge generierte Ereignisse erfolgen standardmäßig, wenn ein Backfill für den ersten Backfill der Daten durchgeführt wird, die beim Initiieren des Streams erstellt wurden.

Die Reihenfolge kann von Quelle zu Quelle abgeleitet werden.

Quelle Beschreibung
MySQL

Ereignisse, die Teil des ersten Backfills sind, haben das Feld read_method, das mit mysql-backfill beginnt. Es gibt keine Auswirkungen auf die Reihenfolge, in der Ereignisse innerhalb des Backfills empfangen werden, da sie in beliebiger Reihenfolge verwendet werden können.

Für Ereignisse, die Teil der laufenden Replikation sind, ist das Feld read_method auf mysql-cdc-binlog festgelegt.

Die Reihenfolge lässt sich aus der Kombination des Felds log_file und des Felds log_position ableiten, das den Abstand zu der Logdatei hat. Diese Kombination liefert eine eindeutige, schrittweise ansteigende Zahl, die die Reihenfolge der Vorgänge in der Datenbank angibt.

Oracle

Ereignisse, die Teil des ersten Backfills sind, haben das Feld read_method, das mit oracle-backfill beginnt. Es gibt keine Auswirkungen auf die Reihenfolge, in der Ereignisse innerhalb des Backfills empfangen werden, da sie in beliebiger Reihenfolge verwendet werden können.

Für Ereignisse, die Teil der laufenden Replikation sind, ist das Feld read_method auf oracle-cdc-logminer festgelegt.

Die Reihenfolge kann aus der Kombination des Felds rs_id (Datensatz-ID) und ssn (einer SQL-Sequenznummer) abgeleitet werden. Diese Kombination liefert eine eindeutige, schrittweise ansteigende Zahl, die die Reihenfolge der Vorgänge in der Datenbank angibt.

PostgreSQL

Ereignisse, die Teil des ersten Backfills sind, haben das Feld read_method, das mit postgresql-backfill beginnt. Es gibt keine Auswirkungen auf die Reihenfolge, in der Ereignisse innerhalb des Backfills empfangen werden, da sie in beliebiger Reihenfolge verwendet werden können.

Für Ereignisse, die Teil der laufenden Replikation sind, ist das Feld read_method auf postgres-cdc-wal festgelegt.

Die Reihenfolge kann aus der Kombination des Felds source_timestamp und des Felds lsn (Logsequenznummer) abgeleitet werden. Diese Kombination liefert eine eindeutige, schrittweise ansteigende Zahl, die die Reihenfolge der Vorgänge in der Datenbank angibt.

SQL Server

Ereignisse, die Teil des ersten Backfills sind, haben das Feld read_method, das mit sqlserver-backfill beginnt. Es gibt keine Auswirkungen auf die Reihenfolge, in der Ereignisse innerhalb des Backfills empfangen werden, da sie in beliebiger Reihenfolge verwendet werden können.

Für Ereignisse, die Teil der laufenden Replikation sind, ist das Feld read_method auf sqlserver-cdc festgelegt.

Die Reihenfolge kann aus der Kombination des Felds source_timestamp und des Felds lsn (Logsequenznummer) abgeleitet werden. Diese Kombination liefert eine eindeutige, schrittweise ansteigende Zahl, die die Reihenfolge der Vorgänge in der Datenbank angibt.

Konsistenz

Datastream garantiert, dass die Daten aus der Quelldatenbank mindestens einmal an das Ziel gesendet werden. Es werden keine Ereignisse ausgelassen. Es ist aber möglich, dass im Stream doppelte Ereignisse vorliegen. Das Zeitfenster für doppelte Ereignisse sollte in der Größenordnung von Minuten angegeben werden und die UUID (Universally Unique Identifier) des Ereignisses in den Ereignismetadaten kann verwendet werden, um Duplikate zu erkennen.

Wenn Datenbank-Logdateien ohne Commit durchgeführt wurden und ein Rollback von Transaktionen durchgeführt wird, spiegelt die Datenbank dies in den Protokolldateien als DML-Vorgänge (Data Manipulation Language) wider. Ein Rollback für INSERT-Vorgänge hat beispielsweise einen entsprechenden DELETE-Vorgang. Datastream liest diese Vorgänge aus den Logdateien.

Streams

Jeder Stream hat Metadaten, die sowohl den Stream als auch die Quelle beschreiben, aus der Daten abgerufen werden. Diese Metadaten enthalten Informationen wie den Streamnamen, die Quell- und Zielverbindungsprofile usw.

Die vollständige Definition des Stream-Objekts finden Sie in der Dokumentation API-Referenz.

Streamzustand und -status

Ein Stream kann einen der folgenden Status haben:

  • Not started
  • Starting
  • Running
  • Draining
  • Paused
  • Failed
  • Failed permanently

In den Logs finden Sie zusätzliche Statusinformationen, z. B. den Tabellen-Backfill und die Anzahl der verarbeiteten Zeilen. Sie können auch die FetchStreamErrors API verwenden, um Fehler abzurufen.

Mit der Discover API verfügbare Objektmetadaten

Die Discover API gibt Objekte zurück, die die Struktur der Objekte darstellen, die in der Datenquelle oder im Ziel definiert sind, die durch das Verbindungsprofil dargestellt wird. Jedes Objekt verfügt über Metadaten zum Objekt selbst sowie zu jedem abgerufenen Datenfeld. Diese Metadaten sind über die Discover API verfügbar.