Änderungen aus Cloud Spanner mithilfe von Debezium von BigQuery erfassen

In dieser Anleitung wird gezeigt, wie Sie mit Debezium Änderungen in Cloud Spanner erfassen. In dieser Anleitung verwenden Sie Debezium, um eine Cloud Spanner-Datenbank auf alle neuen oder bearbeiteten Daten abzufragen. Debezium ist ein Open-Source-Tool zum Nachverfolgen und Erfassen von Datenbankänderungen. Es basiert auf Apache Kafka. In dieser Anleitung verwenden Sie Kafka Connect, um Daten in Kafka und zurück zu streamen. Kafka Connect bietet mehrere integrierte Connectors, mit denen Sie Daten in oder aus beliebten Datenbanken streamen können.

Diese Anleitung richtet sich an Datenbankadministratoren und -entwickler, die die Designmuster definieren und verwalten, die Datenänderungen in relationalen Datenbanken bestimmen (und verfolgen). Es wird davon ausgegangen, dass Sie die Konzepte von Cloud Spanner, BigQuery und Kafka Connect verstehen.

Hintergrund

Mit Change Data Capture (CDC) können Sie jedes Ereignis aus einer Datenbank streamen. Im Allgemeinen verwenden Datenbanken ein Transaktionslog, um einen Datensatz jedes Datenbankereignisses zu speichern. Diese Logdateien werden je nach Datenbank Geschmack auch als binlog-, Redo-Log oder Write-Ahead-Log bezeichnet. CDC-Tools verwenden dieses Transaktionslog, um diese Ereignisse zu lesen.

Hier ein typisches CDC-Log:

{
  "before": {
    "ID": 1,
    "Firstname": "john",
    "Lastname": "doe",
    "LastUpdateTime": 1544000706000000
  },
  "after": {
    "ID": 1,
    "Firstname": "johny",
    "Lastname": "doe",
    "LastUpdateTime": 1544000742000000
  },
}

Bei einer Datenbankabfrage werden die aktuellen Werte aus der Datenbank erfasst.

Hier eine typische Datenbankumfrage:

{
    "ID": 1,
    "Firstname": "johny",
    "Lastname": "doe",
    "LastUpdateTime": 1544000742000000
}

Ein Änderungsprotokoll der Datenbank erfasst alle Datenänderungen in ihrer exakten Reihenfolge. Diese Liste kann wichtig sein, wenn Sie den vollständigen Verlauf der an einer Datenbank vorgenommenen Änderungen sehen möchten. Im Gegensatz dazu kann der Einsatz einer Abfrage, die auf Abfragen beruht, jedoch dazu führen, dass Datenänderungen verloren gehen, die zwischen zwei Durchläufen der Abfrageschleife auftreten. Wenn beispielsweise ein Datensatz zwischen zwei Umfragen eingefügt und gelöscht wird, wird diese anhand eines abfragebasierten Logs durchgeführten Änderung nicht erfasst.

Und so gehts. Die Datenbankabfrage wird mit einer festgelegten Häufigkeit durchgeführt, erfasst Werte basierend auf einem Zeitstempel und alle Änderungen, die nach der letzten Abfrage vorgenommen wurden – aus den Zeilen „select * aus der Tabelle, in der gilt LastUpdateTime > last_poll_timestamp.“

Übersicht

Connectors in Kafka Connect-Ausführungsjobs. Diese Jobs definieren, wo die Connectors Daten kopieren und von wo sie kopiert werden. Zum Ausführen der Jobs planen Sie Connectors und Aufgaben auf Kafka connect workers. Diese Jobs enthalten Aufgaben, mit denen die Daten kopiert werden. Ein Connector kann einen einzelnen Job in viele Aufgaben unterteilen. Auf diese Weise können Connectors Jobs parallel ausführen. Das parallele Ausführen von Jobs skaliert die Kopierfähigkeit des Connectors. Kafka verwendet die Konfigurationsdaten Zookeeper. Zookeeper überwacht den Status der Kafka-Clusterknoten, -themen, -partitionen und andere Daten.

Wenn diese Jobs ausgeführt werden, werden mehrere Streams einzelner Datensätze erstellt. Kafka braucht eine effiziente Methode, um das Schema für jeden ankommenden Datensatz zu bestimmen. Sie können kein Schema zusammen mit einem Datensatz senden, da dessen Serialisierung zu beachten ist. Schemas sind oft größer als die Datensätze selbst. Schema Registry verwaltet Avro-Schemata für Kafka-Produzenten und -Nutzer. Mit Avro-Schemas können Sie Kompatibilitätseinstellungen konfigurieren, um die Schema Evolution zu unterstützen.

Das folgende Diagramm zeigt eine allgemeine Ansicht der Architektur, die Sie in dieser Anleitung bereitstellen.

Allgemeine Ansicht der Architektur, die Sie in dieser Anleitung bereitstellen.

Im Folgenden finden Sie eine ausführlichere Beschreibung der Vorgänge im Diagramm:

  • Um eine Verbindung zu Spanner herzustellen, konfigurieren Sie einen Quellconnector, der den Open-Source-JDBC-Treiber von Cloud Spanner verwendet.
  • Um Daten in BigQuery zu schreiben, konfigurieren Sie einen Sink-Connector, der einen Out-of-Box-Connector von Confluent verwendet.
  • Um Zeilen mit Werten in der Spalte LastUpdateTime zu identifizieren, die älter sind als der zuletzt abgerufene Zeitstempel, gibt es die Quelltabellen der Quell-Connector in Spanner.
  • Zur Vorbereitung der Datenübertragung speichert der Quell-Connector alle qualifizierten Datensätze in Kafka-Themen.
  • Zum Speichern der qualifizierenden Datensätze ruft der Senken-Connector die Datensätze aus den Kafka-Themen ab und schreibt sie in BigQuery.

Systemdesign und Beschränkungen

Bevor Sie diese Einrichtung in der Produktion verwenden, lesen Sie die folgenden Hinweise zu Einschränkungen und Einschränkungen des Systemdesigns.

Kaufbereitschaft Anmerkungen
Leistung Diese Lösung eignet sich für eine inkrementelle Batchreplikation mit Schreibtraffic von bis zu einigen Tausend Aktualisierungen pro Sekunde.
Hohe Verfügbarkeit und Fehlertoleranz Unterstützt.
Diese Lösung führt Kafka Connect im verteilten Modus aus. Connector-Jobs sind über den Cluster verteilt und bieten eine hohe Verfügbarkeit und Fehlertoleranz. Weitere Informationen finden Sie unter Kafka behandeln Fehler.
Dynamische Schemaaktualisierungen Unterstützt.
Weitere Informationen finden Sie unter Entwicklung des Schemas überprüfen.
Aufbewahrung der Transaktionsreihenfolge
Zeilenaktualisierungen
Deduplizierung
Genau einmalige Zustellung
Nicht unterstützt.
Diese Lösung unterstützt die Anfügereplikation. Bei einer Replikation, die nur für Anfügungen zulässig ist, werden neu replizierte Daten an das Ende einer Tabelle angehängt. Bestehende Zeilen werden nicht aktualisiert. Aktualisierungen werden am Ende der Tabelle als neue Zeilen hinzugefügt.
Löschvorgänge Nicht unterstützt.
Diese Lösung unterstützt vorläufige Löschvorgänge oder Torsteine in Form von Zeilenaktualisierungen.
Auswirkungen auf das Datenschema Alle Tabellen, die Sie replizieren möchten, müssen eine Spalte mit dem Commit-Zeitstempel von Spanner enthalten, um den Zeitpunkt der letzten Aktualisierung zu erfassen.
Auswirkungen auf eine vorhandene Anwendung Wenn Sie Datensätze in Tabellen ändern, die Sie replizieren möchten, müssen Sie den Spanner-Zeitstempel commit festlegen oder aktualisieren. Wenn die Anwendung Zeilen löscht, ändern Sie diese in einen weichen Löschvorgang oder einen Tombstone.
Unterstützung für Senken Mit Kafka Connect können Sie das Datensenkesystem flexibel ändern, ohne den Code für die Streamverarbeitung ändern zu müssen. Weitere Informationen finden Sie unter Unterstützte Connectors in der Kafka Connect-Dokumentation.

Ziele

  • Cloud Spanner mit einer Testdatenbank einrichten
  • Richten Sie BigQuery mit einem Test-Dataset ein.
  • Einen Google Kubernetes Engine-Cluster (GKE) erstellen
  • Debezium-Cluster in GKE bereitstellen
  • Konfigurieren Sie einen Job so, dass Daten aus der Spanner-Datenbank erfasst werden.
  • Konfigurieren Sie einen Job, mit dem Daten in eine BigQuery-Tabelle geschrieben werden.
  • Daten in Cloud Spanner einfügen und aktualisieren
  • Prüfen Sie, ob Änderungen in Kafka und BigQuery erfasst werden.
  • Tabellenschema in Cloud Spanner ändern.
  • Die Schemaentwicklung in BigQuery überprüfen.

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweis

  1. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  2. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  3. Aktivieren Sie Cloud Shell in der Cloud Console.

    Cloud Shell aktivieren

  4. Suchen Sie Ihre Projekt-ID und legen Sie sie in Cloud Shell fest. Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.

    gcloud config set project YOUR_PROJECT_ID
    
  5. Erstellen Sie Umgebungsvariablen für das Cloud-Projekt, die Region, die Zone, den Dienstkontonamen und andere Werte, die in dieser Anleitung benötigt werden:

    export PROJECT=$(gcloud config get-value project)
    export REGION="us-central1"
    export ZONE="us-central1-a"
    export CLUSTER="test-cluster"
    export SPANNER_INSTANCE="test-spanner-instance"
    export SPANNER_DB="test-db"
    export SERVICE_ACCOUNT="test-sa"
    export BQ_DATASET="test_bq"
    export SA_EMAIL=${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com
    
  6. Aktivieren Sie die APIs für Compute Engine, GKE, Cloud Spanner, Container Analysis und Container Registry:

    gcloud services enable \
    cloudbuild.googleapis.com \
    compute.googleapis.com \
    container.googleapis.com \
    spanner.googleapis.com \
    bigquery.googleapis.com \
    containeranalysis.googleapis.com \
    containerregistry.googleapis.com
    

Cloud Spanner einrichten

  1. Erstellen Sie in Cloud Shell eine Cloud Spanner-Instanz:

    gcloud spanner instances create $SPANNER_INSTANCE \
        --config=regional-$REGION \
        --description="test" \
        --nodes=1
    
  2. Erstellen Sie eine Datenbank und eine Testtabelle:

    gcloud spanner databases create $SPANNER_DB \
        --instance=$SPANNER_INSTANCE \
        --ddl='CREATE TABLE Test (Id INT64 NOT NULL, Firstname STRING(1024), Lastname STRING(1024), LastUpdateTime TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),) PRIMARY KEY(Id)'
    

Die obige Anweisung erstellt die Testtabelle mithilfe der Option allow_commit_timestamp für eine TIMESTAMP-Spalte. Mit der Spaltenoption allow_commit_timestamp können Sie den Commit-Zeitstempel in kleinstmöglicher Form speichern. Wenn der Abfragedienst ausgeführt wird, verwendet das System Commit-Zeitstempel, um Datenbankmutationen zu bestimmen.

BigQuery einrichten

  • Erstellen Sie in Cloud Shell ein Test-Dataset in BigQuery:

    bq --location=US mk --dataset $PROJECT:$BQ_DATASET
    

In BigQuery werden die Tabellen oder Tabellen aus der Quelldatenbank repliziert.

GKE-Cluster erstellen

  1. Erstellen Sie in Cloud Shell einen GKE-Cluster:

    gcloud container clusters create $CLUSTER --zone $ZONE
    
  2. Klonen Sie das Repository mit den GKE-Clusterkonfigurationsdateien und den Kafka-Quell- und -Konfigurationsdateien für diese Anleitung:

    git clone https://github.com/GoogleCloudPlatform/spanner-debezium-change-capture
    cd spanner-debezium-change-capture
    

Authentifizierungstoken generieren

  1. Erstellen Sie in Cloud Shell ein Google Cloud-Dienstkonto:

    gcloud iam service-accounts create $SERVICE_ACCOUNT \
      --display-name $SERVICE_ACCOUNT
    

    Später in dieser Anleitung generieren Sie ein Authentifizierungstoken für dieses Konto.

  2. Weisen Sie dem Dienstkonto die Rolle „Spanner-Datenbank-Leser“ zu:

    gcloud projects add-iam-policy-binding $PROJECT \
        --member serviceAccount:$SA_EMAIL \
        --role roles/spanner.databaseReader
    
  3. Weisen Sie dem Dienstkonto die Rolle „BigQuery-Dateninhaber“ zu:

    gcloud projects add-iam-policy-binding $PROJECT \
        --member serviceAccount:$SA_EMAIL \
        --role roles/bigquery.dataOwner
    
  4. Generieren Sie die JSON-Datei für die Anmeldedaten zur Authentifizierung:

    gcloud iam service-accounts keys create credentials.json \
        --iam-account=$SA_EMAIL
    

Debezium in GKE bereitstellen

  1. Ersetzen Sie in Cloud Shell den Platzhalter [PROJECT_ID] durch Ihre Cloud-Projekt-ID in den Dateien source.json und sink.json:

    sed -i -e "s/\[PROJECT_ID\]/$PROJECT/g" source.json
    sed -i -e "s/\[PROJECT_ID\]/$PROJECT/g" sink.json
    
  2. Erstellen Sie Kubernetes-Secrets aus der Datei credentials.json:

    gcloud container clusters get-credentials test-cluster \
        --zone "${ZONE}" \
        --project "${PROJECT}" && \
    kubectl create secret generic service-account-creds \
        --from-file credentials.json
    
  3. Erstellen Sie zum Einrichten des Abfragejobs für Spanner einen configmap aus der Datei source.json:

    kubectl create configmap connector-source --from-file source.json
    
  4. Erstellen Sie zum Einrichten des Schreibconnectors für BigQuery einen configmap aus der Datei sink.json:

    kubectl create configmap connector-sink --from-file sink.json
    
  5. Stellen Sie Debezium-Dienste im GKE-Cluster bereit:

    kubectl apply -f kube/
    
  6. Prüfen Sie, ob alle Debezium-Dienste bereitgestellt sind:

    kubectl get pods -w
    

    Es dauert einige Zeit, bis alle Dienste ausgeführt sind. Warten Sie, bis alle Pods ausgeführt sind, bevor Sie mit dem nächsten Schritt fortfahren.

    Die finale Ausgabe sieht etwa so aus:

    NAME                         READY   STATUS    RESTARTS   AGE
    connect-5999cdb4f6-zcfw7     1/1     Running   0          1m
    kafka-0                      1/1     Running   2          1m
    schema-registry-0            1/1     Running   0          1m
    zookeeper-0                  1/1     Running   0          1m
    

    Drücken Sie Strg+C bzw. Befehlstaste+C auf einem Mac, um die Warteschleife zu beenden.

Kafka, Zookeeper und Schema-Registry werden in GKE als StatefulSets bereitgestellt.

In dieser Anleitung erstellen Sie ein System, das eine einzelne Spanner-Tabelle abfragt. In der Praxis müssen bei der Einrichtung mehrerer Workflows die Aufnahmezeiten reduziert werden, wenn mehrere Tabellen abgefragt werden. Weitere Informationen finden Sie unter Erste Schritte mit Kafka Connect.

Lesevorgänge aus Cloud Spanner einrichten

  1. Rufen Sie in Cloud Shell den Pod-Namen von Kafka Connect ab:

    POD_NAME=$(kubectl get pods \
        --selector=service=connect -o jsonpath='{.items..metadata.name}')
    
  2. Rufen Sie die IP-Adresse des Kafka Connect-Dienstes ab:

    POD_IP=$(kubectl get pods \
        --selector=service=connect -o jsonpath='{.items..podIP}')
    
  3. Konfigurieren Sie den Abfragejob:

    kubectl exec -ti $POD_NAME -- curl -X POST \
        http://$POD_IP:8083/connectors \
        -H "Content-Type: application/json" \
        -d @/config/source.json
    

    Der Connector spanner-connector ist so konfiguriert, dass er jede Sekunde die Tabelle Test in Cloud Spanner abfragt. Die Spalte LastUpdateTime wird verwendet, um die Daten schrittweise abzurufen. Weitere Informationen zum Einrichten des Connectors finden Sie unter Konfigurationsparameter.

  4. Konfiguration prüfen:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector
    

    Die Ausgabe sieht etwa so aus:

    {"name":"spanner-connector","config":{..."mode":"timestamp","timestamp.column.name":"LastUpdateTime","table.whitelist":"Test"...},"type":"source"...}
    
  5. Prüfen Sie, ob der Kafka-Connector ausgeführt wird:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector/status
    

    Die Ausgabe sieht etwa so aus:

    {"name":"spanner-connector","connector":{"state":"RUNNING","worker_id":"10.8.1.6:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.1.6:8083"}],"type":"source"}
    

Schreibvorgänge in BigQuery einrichten

  1. Konfigurieren Sie in Cloud Shell den BigQuery-Connector:

    kubectl exec -ti $POD_NAME -- curl -X POST \
        http://$POD_IP:8083/connectors \
        -H "Content-Type: application/json" \
        -d @/config/sink.json
    

    Der Connector bq-connector ist so konfiguriert, dass automatisch eine spanner_Test-Tabelle im Datasettest_bq erstellt wird. Alle Daten aus dem Kafka-Thema spanner_Test werden in BigQuery in die Tabelle spanner_Test geschrieben.

  2. Konfiguration prüfen:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/bq-connector
    

    Die Ausgabe sieht etwa so aus:

    {"name":"bq-connector","config":{..."autoCreateTables":"true","topics":"spanner_Test","project":"<Project-Id>","datasets":".*=test_bq"...},"type":"sink"...}
    
  3. Überprüfen Sie, ob der Connector ausgeführt wird:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/bq-connector/status
    

    Die Ausgabe sieht etwa so aus:

    {"name":"bq-connector","connector":{"state":"RUNNING","worker_id":"10.8.1.6:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.1.6:8083"}],"type":"sink"}
    

Daten in Cloud Spanner einfügen und aktualisieren

Nachdem Sie die Lesevorgänge von Cloud Spanner konfiguriert und in BigQuery geschrieben haben, können Sie einen Datensatz in Cloud Spanner einfügen und aktualisieren.

  1. Fügen Sie in Cloud Shell eine Zeile in die Tabelle ein:

    gcloud spanner databases execute-sql $SPANNER_DB \
      --instance=$SPANNER_INSTANCE \
      --sql="INSERT Test (Id, Firstname, Lastname, LastUpdateTime) VALUES (1, 'john', 'doe', PENDING_COMMIT_TIMESTAMP())"
    
  2. Aktualisieren Sie die Zeile:

    gcloud spanner databases execute-sql $SPANNER_DB \
      --instance=$SPANNER_INSTANCE \
      --sql="UPDATE Test SET Firstname = 'johny', LastUpdateTime = PENDING_COMMIT_TIMESTAMP() where Id = 1"
    

In BigQuery erfasste Änderungen prüfen

In diesem Abschnitt überprüfen Sie, ob die vorgenommenen Änderungen im Thema spanner_Test Kafka erfasst und in die Tabelle spanner_Test in BigQuery geschrieben werden.

  1. Prüfen Sie in Cloud Shell, ob die Daten im Kafka-Thema erfasst werden:

    kubectl exec -ti schema-registry-0 \
        -- kafka-avro-console-consumer \
        --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092 \
        --topic spanner_Test --from-beginning \
        --property schema.registry.url=http://schema-registry-0.schema-registry.default.svc.cluster.local:8081
    

    Die Ausgabe sieht etwa so aus:

    {"Id":{"long":1},"Firstname":{"string":"john"},"Lastname":{"string":"doe"},"LastUpdateTime":{"long":1585758520535}}
    {"Id":{"long":1},"Firstname":{"string":"johny"},"Lastname":{"string":"doe"},"LastUpdateTime":{"long":1585758545824}}
    

    Drücken Sie Strg + C oder Mac + C auf einem Mac, um die Präsentation zu beenden.

  2. Überprüfen Sie, ob die BigQuery-Tabelle spanner_Test das richtige Schema hat:

    bq show --format=prettyjson $PROJECT:$BQ_DATASET.spanner_Test | jq '.schema.fields'
    

    Die Ausgabe sieht etwa so aus:

    [
      {
      "mode": "NULLABLE",
      "name": "Id",
      "type": "INTEGER"
      },
    {
      "mode": "NULLABLE",
      "name": "Firstname",
      "type": "STRING"
    },
      {
      "mode": "NULLABLE",
      "name": "Lastname",
      "type": "STRING"
    },
      {
      "mode": "NULLABLE",
      "name": "LastUpdateTime",
      "type": "TIMESTAMP"
      }
    ]
    
  3. Prüfen Sie, ob die Daten in die Tabelle spanner_Test geschrieben sind:

    bq --location=US query \
        --use_legacy_sql=false 'select * from test_bq.spanner_Test'
    

    Die Ausgabe sieht etwa so aus:

    +----+-----------+----------+---------------------+
    | Id | Firstname | Lastname |   LastUpdateTime    |
    +----+-----------+----------+---------------------+
    |  1 | john      | doe      | 2020-04-01 16:28:40 |
    |  1 | johny     | doe      | 2020-04-01 16:29:05 |
    +----+-----------+----------+---------------------+
    

Schemaentwicklung überprüfen

In diesem Abschnitt ändern Sie das Tabellenschema in Cloud Spanner und prüfen, ob die Schemaänderungen korrekt an die BigQuery-Tabelle weitergegeben wurden.

  1. Fügen Sie in Cloud Shell die Spalte SoftDelete in die Spanner-Tabelle ein, um das vorläufige Löschen der Zeile zu erfassen:

    gcloud spanner databases ddl update $SPANNER_DB \
        --instance=$SPANNER_INSTANCE \
        --ddl='ALTER TABLE Test ADD COLUMN SoftDelete BOOL'
    
  2. Löschen Sie eine Zeile in der Tabelle:

    gcloud spanner databases execute-sql $SPANNER_DB \
        --instance=$SPANNER_INSTANCE \
        --sql="UPDATE Test SET SoftDelete = TRUE, LastUpdateTime = PENDING_COMMIT_TIMESTAMP() where Id = 1"
    

    Statt einen harten Löschvorgang auszuführen, geben Sie den Befehl update ein, der die Spalte SoftDelete auf True festlegt.

  3. Prüfen Sie, ob die Schemaänderungen in der BigQuery-Tabelle übernommen werden:

    bq show --format=prettyjson $PROJECT:$BQ_DATASET.spanner_Test | jq '.schema.fields'
    

    Die Ausgabe sieht etwa so aus:

    [
    ...
      {
      "mode": "NULLABLE",
      "name": "SoftDelete",
      "type": "BOOLEAN"
      }
    ]
    
  4. Prüfen Sie, ob das vorläufige Löschen in der Tabelle angezeigt wird:

    bq  --location=US query \
        --use_legacy_sql=false 'select * from test_bq.spanner_Test'
    

    Die Ausgabe sieht etwa so aus:

    +----+-----------+----------+---------------------+------------+
    | Id | Firstname | Lastname |   LastUpdateTime    | SoftDelete |
    +----+-----------+----------+---------------------+------------+
    |  1 | john      | doe      | 2020-04-01 16:28:40 |       NULL |
    |  1 | johny     | doe      | 2020-04-01 16:29:05 |       NULL |
    |  1 | johny     | doe      | 2020-04-01 16:31:04 |       true |
    +----+-----------+----------+---------------------+------------+
    

Sie haben Debezium erfolgreich eingerichtet, um alle Änderungen in Cloud Spanner zu erfassen und an BigQuery zu senden.

Fehlerbehebung bei der Kafka Connect-Einrichtung

Wenn in der Anleitung Fehler aufgetreten sind, verwenden Sie die folgenden Vorschläge, um Fehler bei der Kafka Connect-Einrichtung zu beheben:

  • Kafka speichert alle Werte in Themen. Führen Sie den folgenden Befehl aus, um diese Themen aufzurufen:

    kubectl exec -ti kafka-0 -- bin/kafka-topics.sh \
        --list --zookeeper zookeeper-0.zookeeper.default.svc.cluster.local:2181
    
  • Der Connector kann so konfiguriert werden, dass mehrere Aufgaben generiert werden. Wenn z. B. mehrere Tabellen in der Datenbank vorhanden sind, kann pro Tabelle eine Aufgabe pro Abfrage zugewiesen werden. Sie können die für einen Connector konfigurierten Aufgaben mit dem folgenden Befehl anzeigen:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector/tasks
    

    Die Ausgabe sieht etwa so aus:

    [{"id":{"connector":"spanner-connector","task":0},"config":{...}}]
    
  • Kafka Connect erstellt automatisch interne Themen, wenn ein Job konfiguriert ist. In diesen Themen werden Informationen dazu gespeichert, wo der Job zuletzt im Quellsystem gelesen wurde. Führen Sie den folgenden Befehl aus, um den neuesten Offset abzurufen:

    kubectl exec -ti kafka-0 -- bin/kafka-console-consumer.sh
        --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092
        --from-beginning --topic my_connect_offsets
    

    Die Ausgabe sieht etwa so aus:

    {"timestamp_nanos":923115000,"timestamp":1584535152923}
    

Nach dem Neustart einer Connector-Aufgabe kann die Verarbeitung ab dem letzten Offset fortgesetzt werden. Wenn Sie den Offset manuell ändern möchten, folgen Sie dieser Anleitung. Das Ändern des Offsets ist nützlich, wenn Sie Änderungen nicht von Anfang an streamen möchten.

Alternative Ansätze

In dieser Anleitung erfahren Sie, wie Sie die inkrementelle Replikation von Spanner in eine Zieldatenbank einrichten. Je nach Ihren Anforderungen müssen Sie Alternativen berücksichtigen. Diese Anforderungen können Replikationen nahezu in Echtzeit, genau nach der Datenübermittlung oder zur Aufbewahrung von Transaktionsaufträgen umfassen. Im Folgenden finden Sie eine kurze, nicht exklusive Liste von Alternativen:

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Am einfachsten können Sie die Abrechnung deaktivieren, wenn Sie das Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben. Alternativ haben Sie die Möglichkeit, die einzelnen Ressourcen zu löschen.

Projekt löschen

  1. Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Ressourcen löschen

Wenn Sie das in dieser Anleitung verwendete Cloud-Projekt beibehalten möchten, löschen Sie die einzelnen Ressourcen:

  1. Löschen Sie den GKE-Cluster in Cloud Shell:

    gcloud container clusters delete $CLUSTER --zone $ZONE --async
    
  2. Löschen Sie den heruntergeladenen Code, die Artefakte, den privaten Schlüssel des Dienstkontos und andere Abhängigkeiten:

    cd .. && rm -rf spanner-debezium-change-capture
    
  3. Löschen Sie das Image in Container Registry:

    gcloud container images list-tags \
    gcr.io/$PROJECT/connect \
        --format 'value(digest)' | \
    xargs -I {} gcloud container images delete \
        --force-delete-tags --quiet \
    gcr.io/${PROJECT}/connect@sha256:{}
    
  4. Löschen Sie das Dienstkonto:

    gcloud iam service-accounts delete $SA_EMAIL
    
  5. Löschen Sie die Spanner-Instanz:

    gcloud spanner instances delete $SPANNER_INSTANCE`
    
  6. Löschen Sie das BigQuery-Dataset:

    bq rm -r -d $BQ_DATASET
    

Nächste Schritte