In dieser Anleitung wird gezeigt, wie Sie mit Debezium Änderungen in Cloud Spanner erfassen. In dieser Anleitung verwenden Sie Debezium, um eine Cloud Spanner-Datenbank auf alle neuen oder bearbeiteten Daten abzufragen. Debezium ist ein Open-Source-Tool zum Nachverfolgen und Erfassen von Datenbankänderungen. Es basiert auf Apache Kafka. In dieser Anleitung verwenden Sie Kafka Connect, um Daten in Kafka und zurück zu streamen. Kafka Connect bietet mehrere integrierte Connectors, mit denen Sie Daten in oder aus beliebten Datenbanken streamen können.
Diese Anleitung richtet sich an Datenbankadministratoren und -entwickler, die die Designmuster definieren und verwalten, die Datenänderungen in relationalen Datenbanken bestimmen (und verfolgen). Es wird davon ausgegangen, dass Sie die Konzepte von Cloud Spanner, BigQuery und Kafka Connect verstehen.
Hintergrund
Mit Change Data Capture (CDC) können Sie jedes Ereignis aus einer Datenbank streamen. Im Allgemeinen verwenden Datenbanken ein Transaktionslog, um einen Datensatz jedes Datenbankereignisses zu speichern. Diese Logdateien werden je nach Datenbank Geschmack auch als binlog-, Redo-Log oder Write-Ahead-Log bezeichnet. CDC-Tools verwenden dieses Transaktionslog, um diese Ereignisse zu lesen.
Hier ein typisches CDC-Log:
{
"before": {
"ID": 1,
"Firstname": "john",
"Lastname": "doe",
"LastUpdateTime": 1544000706000000
},
"after": {
"ID": 1,
"Firstname": "johny",
"Lastname": "doe",
"LastUpdateTime": 1544000742000000
},
}
Bei einer Datenbankabfrage werden die aktuellen Werte aus der Datenbank erfasst.
Hier eine typische Datenbankumfrage:
{
"ID": 1,
"Firstname": "johny",
"Lastname": "doe",
"LastUpdateTime": 1544000742000000
}
Ein Änderungsprotokoll der Datenbank erfasst alle Datenänderungen in ihrer exakten Reihenfolge. Diese Liste kann wichtig sein, wenn Sie den vollständigen Verlauf der an einer Datenbank vorgenommenen Änderungen sehen möchten. Im Gegensatz dazu kann der Einsatz einer Abfrage, die auf Abfragen beruht, jedoch dazu führen, dass Datenänderungen verloren gehen, die zwischen zwei Durchläufen der Abfrageschleife auftreten. Wenn beispielsweise ein Datensatz zwischen zwei Umfragen eingefügt und gelöscht wird, wird diese anhand eines abfragebasierten Logs durchgeführten Änderung nicht erfasst.
Und so gehts. Die Datenbankabfrage wird mit einer festgelegten Häufigkeit durchgeführt, erfasst Werte basierend auf einem Zeitstempel und alle Änderungen, die nach der letzten Abfrage vorgenommen wurden – aus den Zeilen „select * aus der Tabelle, in der gilt LastUpdateTime
>
last_poll_timestamp
.“
Übersicht
Connectors in Kafka Connect-Ausführungsjobs. Diese Jobs definieren, wo die Connectors Daten kopieren und von wo sie kopiert werden. Zum Ausführen der Jobs planen Sie Connectors und Aufgaben auf Kafka connect workers. Diese Jobs enthalten Aufgaben, mit denen die Daten kopiert werden. Ein Connector kann einen einzelnen Job in viele Aufgaben unterteilen. Auf diese Weise können Connectors Jobs parallel ausführen. Das parallele Ausführen von Jobs skaliert die Kopierfähigkeit des Connectors. Kafka verwendet die Konfigurationsdaten Zookeeper. Zookeeper überwacht den Status der Kafka-Clusterknoten, -themen, -partitionen und andere Daten.
Wenn diese Jobs ausgeführt werden, werden mehrere Streams einzelner Datensätze erstellt. Kafka braucht eine effiziente Methode, um das Schema für jeden ankommenden Datensatz zu bestimmen. Sie können kein Schema zusammen mit einem Datensatz senden, da dessen Serialisierung zu beachten ist. Schemas sind oft größer als die Datensätze selbst. Schema Registry verwaltet Avro-Schemata für Kafka-Produzenten und -Nutzer. Mit Avro-Schemas können Sie Kompatibilitätseinstellungen konfigurieren, um die Schema Evolution zu unterstützen.
Das folgende Diagramm zeigt eine allgemeine Ansicht der Architektur, die Sie in dieser Anleitung bereitstellen.
Im Folgenden finden Sie eine ausführlichere Beschreibung der Vorgänge im Diagramm:
- Um eine Verbindung zu Spanner herzustellen, konfigurieren Sie einen Quellconnector, der den Open-Source-JDBC-Treiber von Cloud Spanner verwendet.
- Um Daten in BigQuery zu schreiben, konfigurieren Sie einen Sink-Connector, der einen Out-of-Box-Connector von Confluent verwendet.
- Um Zeilen mit Werten in der Spalte
LastUpdateTime
zu identifizieren, die älter sind als der zuletzt abgerufene Zeitstempel, gibt es die Quelltabellen der Quell-Connector in Spanner. - Zur Vorbereitung der Datenübertragung speichert der Quell-Connector alle qualifizierten Datensätze in Kafka-Themen.
- Zum Speichern der qualifizierenden Datensätze ruft der Senken-Connector die Datensätze aus den Kafka-Themen ab und schreibt sie in BigQuery.
Systemdesign und Beschränkungen
Bevor Sie diese Einrichtung in der Produktion verwenden, lesen Sie die folgenden Hinweise zu Einschränkungen und Einschränkungen des Systemdesigns.
Kaufbereitschaft | Anmerkungen |
---|---|
Leistung | Diese Lösung eignet sich für eine inkrementelle Batchreplikation mit Schreibtraffic von bis zu einigen Tausend Aktualisierungen pro Sekunde. |
Hohe Verfügbarkeit und Fehlertoleranz | Unterstützt. Diese Lösung führt Kafka Connect im verteilten Modus aus. Connector-Jobs sind über den Cluster verteilt und bieten eine hohe Verfügbarkeit und Fehlertoleranz. Weitere Informationen finden Sie unter Kafka behandeln Fehler. |
Dynamische Schemaaktualisierungen | Unterstützt. Weitere Informationen finden Sie unter Entwicklung des Schemas überprüfen. |
Aufbewahrung der Transaktionsreihenfolge Zeilenaktualisierungen Deduplizierung Genau einmalige Zustellung |
Nicht unterstützt. Diese Lösung unterstützt die Anfügereplikation. Bei einer Replikation, die nur für Anfügungen zulässig ist, werden neu replizierte Daten an das Ende einer Tabelle angehängt. Bestehende Zeilen werden nicht aktualisiert. Aktualisierungen werden am Ende der Tabelle als neue Zeilen hinzugefügt. |
Löschvorgänge | Nicht unterstützt. Diese Lösung unterstützt vorläufige Löschvorgänge oder Torsteine in Form von Zeilenaktualisierungen. |
Auswirkungen auf das Datenschema | Alle Tabellen, die Sie replizieren möchten, müssen eine Spalte mit dem Commit-Zeitstempel von Spanner enthalten, um den Zeitpunkt der letzten Aktualisierung zu erfassen. |
Auswirkungen auf eine vorhandene Anwendung | Wenn Sie Datensätze in Tabellen ändern, die Sie replizieren möchten, müssen Sie den Spanner-Zeitstempel commit festlegen oder aktualisieren. Wenn die Anwendung Zeilen löscht, ändern Sie diese in einen weichen Löschvorgang oder einen Tombstone. |
Unterstützung für Senken | Mit Kafka Connect können Sie das Datensenkesystem flexibel ändern, ohne den Code für die Streamverarbeitung ändern zu müssen. Weitere Informationen finden Sie unter Unterstützte Connectors in der Kafka Connect-Dokumentation. |
Ziele
- Cloud Spanner mit einer Testdatenbank einrichten
- Richten Sie BigQuery mit einem Test-Dataset ein.
- Einen Google Kubernetes Engine-Cluster (GKE) erstellen
- Debezium-Cluster in GKE bereitstellen
- Konfigurieren Sie einen Job so, dass Daten aus der Spanner-Datenbank erfasst werden.
- Konfigurieren Sie einen Job, mit dem Daten in eine BigQuery-Tabelle geschrieben werden.
- Daten in Cloud Spanner einfügen und aktualisieren
- Prüfen Sie, ob Änderungen in Kafka und BigQuery erfasst werden.
- Tabellenschema in Cloud Spanner ändern.
- Die Schemaentwicklung in BigQuery überprüfen.
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.
Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweis
-
Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.
-
Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.
-
Aktivieren Sie Cloud Shell in der Cloud Console.
Suchen Sie Ihre Projekt-ID und legen Sie sie in Cloud Shell fest. Ersetzen Sie
YOUR_PROJECT_ID
durch Ihre Projekt-ID.gcloud config set project YOUR_PROJECT_ID
Erstellen Sie Umgebungsvariablen für das Cloud-Projekt, die Region, die Zone, den Dienstkontonamen und andere Werte, die in dieser Anleitung benötigt werden:
export PROJECT=$(gcloud config get-value project) export REGION="us-central1" export ZONE="us-central1-a" export CLUSTER="test-cluster" export SPANNER_INSTANCE="test-spanner-instance" export SPANNER_DB="test-db" export SERVICE_ACCOUNT="test-sa" export BQ_DATASET="test_bq" export SA_EMAIL=${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com
Aktivieren Sie die APIs für Compute Engine, GKE, Cloud Spanner, Container Analysis und Container Registry:
gcloud services enable \ cloudbuild.googleapis.com \ compute.googleapis.com \ container.googleapis.com \ spanner.googleapis.com \ bigquery.googleapis.com \ containeranalysis.googleapis.com \ containerregistry.googleapis.com
Cloud Spanner einrichten
Erstellen Sie in Cloud Shell eine Cloud Spanner-Instanz:
gcloud spanner instances create $SPANNER_INSTANCE \ --config=regional-$REGION \ --description="test" \ --nodes=1
Erstellen Sie eine Datenbank und eine Testtabelle:
gcloud spanner databases create $SPANNER_DB \ --instance=$SPANNER_INSTANCE \ --ddl='CREATE TABLE Test (Id INT64 NOT NULL, Firstname STRING(1024), Lastname STRING(1024), LastUpdateTime TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),) PRIMARY KEY(Id)'
Die obige Anweisung erstellt die Testtabelle mithilfe der Option allow_commit_timestamp
für eine TIMESTAMP
-Spalte. Mit der Spaltenoption allow_commit_timestamp
können Sie den Commit-Zeitstempel in kleinstmöglicher Form speichern. Wenn der Abfragedienst ausgeführt wird, verwendet das System Commit-Zeitstempel, um Datenbankmutationen zu bestimmen.
BigQuery einrichten
Erstellen Sie in Cloud Shell ein Test-Dataset in BigQuery:
bq --location=US mk --dataset $PROJECT:$BQ_DATASET
In BigQuery werden die Tabellen oder Tabellen aus der Quelldatenbank repliziert.
GKE-Cluster erstellen
Erstellen Sie in Cloud Shell einen GKE-Cluster:
gcloud container clusters create $CLUSTER --zone $ZONE
Klonen Sie das Repository mit den GKE-Clusterkonfigurationsdateien und den Kafka-Quell- und -Konfigurationsdateien für diese Anleitung:
git clone https://github.com/GoogleCloudPlatform/spanner-debezium-change-capture cd spanner-debezium-change-capture
Authentifizierungstoken generieren
Erstellen Sie in Cloud Shell ein Google Cloud-Dienstkonto:
gcloud iam service-accounts create $SERVICE_ACCOUNT \ --display-name $SERVICE_ACCOUNT
Später in dieser Anleitung generieren Sie ein Authentifizierungstoken für dieses Konto.
Weisen Sie dem Dienstkonto die Rolle „Spanner-Datenbank-Leser“ zu:
gcloud projects add-iam-policy-binding $PROJECT \ --member serviceAccount:$SA_EMAIL \ --role roles/spanner.databaseReader
Weisen Sie dem Dienstkonto die Rolle „BigQuery-Dateninhaber“ zu:
gcloud projects add-iam-policy-binding $PROJECT \ --member serviceAccount:$SA_EMAIL \ --role roles/bigquery.dataOwner
Generieren Sie die JSON-Datei für die Anmeldedaten zur Authentifizierung:
gcloud iam service-accounts keys create credentials.json \ --iam-account=$SA_EMAIL
Debezium in GKE bereitstellen
Ersetzen Sie in Cloud Shell den Platzhalter
[PROJECT_ID]
durch Ihre Cloud-Projekt-ID in den Dateiensource.json
undsink.json
:sed -i -e "s/\[PROJECT_ID\]/$PROJECT/g" source.json sed -i -e "s/\[PROJECT_ID\]/$PROJECT/g" sink.json
Erstellen Sie Kubernetes-Secrets aus der Datei
credentials.json
:gcloud container clusters get-credentials test-cluster \ --zone "${ZONE}" \ --project "${PROJECT}" && \ kubectl create secret generic service-account-creds \ --from-file credentials.json
Erstellen Sie zum Einrichten des Abfragejobs für Spanner einen
configmap
aus der Dateisource.json
:kubectl create configmap connector-source --from-file source.json
Erstellen Sie zum Einrichten des Schreibconnectors für BigQuery einen
configmap
aus der Dateisink.json
:kubectl create configmap connector-sink --from-file sink.json
Stellen Sie Debezium-Dienste im GKE-Cluster bereit:
kubectl apply -f kube/
Prüfen Sie, ob alle Debezium-Dienste bereitgestellt sind:
kubectl get pods -w
Es dauert einige Zeit, bis alle Dienste ausgeführt sind. Warten Sie, bis alle Pods ausgeführt sind, bevor Sie mit dem nächsten Schritt fortfahren.
Die finale Ausgabe sieht etwa so aus:
NAME READY STATUS RESTARTS AGE connect-5999cdb4f6-zcfw7 1/1 Running 0 1m kafka-0 1/1 Running 2 1m schema-registry-0 1/1 Running 0 1m zookeeper-0 1/1 Running 0 1m
Drücken Sie Strg+C bzw. Befehlstaste+C auf einem Mac, um die Warteschleife zu beenden.
Kafka, Zookeeper und Schema-Registry werden in GKE als StatefulSets bereitgestellt.
In dieser Anleitung erstellen Sie ein System, das eine einzelne Spanner-Tabelle abfragt. In der Praxis müssen bei der Einrichtung mehrerer Workflows die Aufnahmezeiten reduziert werden, wenn mehrere Tabellen abgefragt werden. Weitere Informationen finden Sie unter Erste Schritte mit Kafka Connect.
Lesevorgänge aus Cloud Spanner einrichten
Rufen Sie in Cloud Shell den Pod-Namen von Kafka Connect ab:
POD_NAME=$(kubectl get pods \ --selector=service=connect -o jsonpath='{.items..metadata.name}')
Rufen Sie die IP-Adresse des Kafka Connect-Dienstes ab:
POD_IP=$(kubectl get pods \ --selector=service=connect -o jsonpath='{.items..podIP}')
Konfigurieren Sie den Abfragejob:
kubectl exec -ti $POD_NAME -- curl -X POST \ http://$POD_IP:8083/connectors \ -H "Content-Type: application/json" \ -d @/config/source.json
Der Connector
spanner-connector
ist so konfiguriert, dass er jede Sekunde die TabelleTest
in Cloud Spanner abfragt. Die SpalteLastUpdateTime
wird verwendet, um die Daten schrittweise abzurufen. Weitere Informationen zum Einrichten des Connectors finden Sie unter Konfigurationsparameter.Konfiguration prüfen:
kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector
Die Ausgabe sieht etwa so aus:
{"name":"spanner-connector","config":{..."mode":"timestamp","timestamp.column.name":"LastUpdateTime","table.whitelist":"Test"...},"type":"source"...}
Prüfen Sie, ob der Kafka-Connector ausgeführt wird:
kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector/status
Die Ausgabe sieht etwa so aus:
{"name":"spanner-connector","connector":{"state":"RUNNING","worker_id":"10.8.1.6:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.1.6:8083"}],"type":"source"}
Schreibvorgänge in BigQuery einrichten
Konfigurieren Sie in Cloud Shell den BigQuery-Connector:
kubectl exec -ti $POD_NAME -- curl -X POST \ http://$POD_IP:8083/connectors \ -H "Content-Type: application/json" \ -d @/config/sink.json
Der Connector
bq-connector
ist so konfiguriert, dass automatisch einespanner_Test
-Tabelle im Datasettest_bq
erstellt wird. Alle Daten aus dem Kafka-Themaspanner_Test
werden in BigQuery in die Tabellespanner_Test
geschrieben.Konfiguration prüfen:
kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/bq-connector
Die Ausgabe sieht etwa so aus:
{"name":"bq-connector","config":{..."autoCreateTables":"true","topics":"spanner_Test","project":"<Project-Id>","datasets":".*=test_bq"...},"type":"sink"...}
Überprüfen Sie, ob der Connector ausgeführt wird:
kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/bq-connector/status
Die Ausgabe sieht etwa so aus:
{"name":"bq-connector","connector":{"state":"RUNNING","worker_id":"10.8.1.6:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.1.6:8083"}],"type":"sink"}
Daten in Cloud Spanner einfügen und aktualisieren
Nachdem Sie die Lesevorgänge von Cloud Spanner konfiguriert und in BigQuery geschrieben haben, können Sie einen Datensatz in Cloud Spanner einfügen und aktualisieren.
Fügen Sie in Cloud Shell eine Zeile in die Tabelle ein:
gcloud spanner databases execute-sql $SPANNER_DB \ --instance=$SPANNER_INSTANCE \ --sql="INSERT Test (Id, Firstname, Lastname, LastUpdateTime) VALUES (1, 'john', 'doe', PENDING_COMMIT_TIMESTAMP())"
Aktualisieren Sie die Zeile:
gcloud spanner databases execute-sql $SPANNER_DB \ --instance=$SPANNER_INSTANCE \ --sql="UPDATE Test SET Firstname = 'johny', LastUpdateTime = PENDING_COMMIT_TIMESTAMP() where Id = 1"
In BigQuery erfasste Änderungen prüfen
In diesem Abschnitt überprüfen Sie, ob die vorgenommenen Änderungen im Thema spanner_Test
Kafka erfasst und in die Tabelle spanner_Test
in BigQuery geschrieben werden.
Prüfen Sie in Cloud Shell, ob die Daten im Kafka-Thema erfasst werden:
kubectl exec -ti schema-registry-0 \ -- kafka-avro-console-consumer \ --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092 \ --topic spanner_Test --from-beginning \ --property schema.registry.url=http://schema-registry-0.schema-registry.default.svc.cluster.local:8081
Die Ausgabe sieht etwa so aus:
{"Id":{"long":1},"Firstname":{"string":"john"},"Lastname":{"string":"doe"},"LastUpdateTime":{"long":1585758520535}} {"Id":{"long":1},"Firstname":{"string":"johny"},"Lastname":{"string":"doe"},"LastUpdateTime":{"long":1585758545824}}
Drücken Sie Strg + C oder Mac + C auf einem Mac, um die Präsentation zu beenden.
Überprüfen Sie, ob die BigQuery-Tabelle
spanner_Test
das richtige Schema hat:bq show --format=prettyjson $PROJECT:$BQ_DATASET.spanner_Test | jq '.schema.fields'
Die Ausgabe sieht etwa so aus:
[ { "mode": "NULLABLE", "name": "Id", "type": "INTEGER" }, { "mode": "NULLABLE", "name": "Firstname", "type": "STRING" }, { "mode": "NULLABLE", "name": "Lastname", "type": "STRING" }, { "mode": "NULLABLE", "name": "LastUpdateTime", "type": "TIMESTAMP" } ]
Prüfen Sie, ob die Daten in die Tabelle
spanner_Test
geschrieben sind:bq --location=US query \ --use_legacy_sql=false 'select * from test_bq.spanner_Test'
Die Ausgabe sieht etwa so aus:
+----+-----------+----------+---------------------+ | Id | Firstname | Lastname | LastUpdateTime | +----+-----------+----------+---------------------+ | 1 | john | doe | 2020-04-01 16:28:40 | | 1 | johny | doe | 2020-04-01 16:29:05 | +----+-----------+----------+---------------------+
Schemaentwicklung überprüfen
In diesem Abschnitt ändern Sie das Tabellenschema in Cloud Spanner und prüfen, ob die Schemaänderungen korrekt an die BigQuery-Tabelle weitergegeben wurden.
Fügen Sie in Cloud Shell die Spalte
SoftDelete
in die Spanner-Tabelle ein, um das vorläufige Löschen der Zeile zu erfassen:gcloud spanner databases ddl update $SPANNER_DB \ --instance=$SPANNER_INSTANCE \ --ddl='ALTER TABLE Test ADD COLUMN SoftDelete BOOL'
Löschen Sie eine Zeile in der Tabelle:
gcloud spanner databases execute-sql $SPANNER_DB \ --instance=$SPANNER_INSTANCE \ --sql="UPDATE Test SET SoftDelete = TRUE, LastUpdateTime = PENDING_COMMIT_TIMESTAMP() where Id = 1"
Statt einen harten Löschvorgang auszuführen, geben Sie den Befehl
update
ein, der die SpalteSoftDelete
aufTrue
festlegt.Prüfen Sie, ob die Schemaänderungen in der BigQuery-Tabelle übernommen werden:
bq show --format=prettyjson $PROJECT:$BQ_DATASET.spanner_Test | jq '.schema.fields'
Die Ausgabe sieht etwa so aus:
[ ... { "mode": "NULLABLE", "name": "SoftDelete", "type": "BOOLEAN" } ]
Prüfen Sie, ob das vorläufige Löschen in der Tabelle angezeigt wird:
bq --location=US query \ --use_legacy_sql=false 'select * from test_bq.spanner_Test'
Die Ausgabe sieht etwa so aus:
+----+-----------+----------+---------------------+------------+ | Id | Firstname | Lastname | LastUpdateTime | SoftDelete | +----+-----------+----------+---------------------+------------+ | 1 | john | doe | 2020-04-01 16:28:40 | NULL | | 1 | johny | doe | 2020-04-01 16:29:05 | NULL | | 1 | johny | doe | 2020-04-01 16:31:04 | true | +----+-----------+----------+---------------------+------------+
Sie haben Debezium erfolgreich eingerichtet, um alle Änderungen in Cloud Spanner zu erfassen und an BigQuery zu senden.
Fehlerbehebung bei der Kafka Connect-Einrichtung
Wenn in der Anleitung Fehler aufgetreten sind, verwenden Sie die folgenden Vorschläge, um Fehler bei der Kafka Connect-Einrichtung zu beheben:
Kafka speichert alle Werte in Themen. Führen Sie den folgenden Befehl aus, um diese Themen aufzurufen:
kubectl exec -ti kafka-0 -- bin/kafka-topics.sh \ --list --zookeeper zookeeper-0.zookeeper.default.svc.cluster.local:2181
Der Connector kann so konfiguriert werden, dass mehrere Aufgaben generiert werden. Wenn z. B. mehrere Tabellen in der Datenbank vorhanden sind, kann pro Tabelle eine Aufgabe pro Abfrage zugewiesen werden. Sie können die für einen Connector konfigurierten Aufgaben mit dem folgenden Befehl anzeigen:
kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector/tasks
Die Ausgabe sieht etwa so aus:
[{"id":{"connector":"spanner-connector","task":0},"config":{...}}]
Kafka Connect erstellt automatisch interne Themen, wenn ein Job konfiguriert ist. In diesen Themen werden Informationen dazu gespeichert, wo der Job zuletzt im Quellsystem gelesen wurde. Führen Sie den folgenden Befehl aus, um den neuesten Offset abzurufen:
kubectl exec -ti kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092 --from-beginning --topic my_connect_offsets
Die Ausgabe sieht etwa so aus:
{"timestamp_nanos":923115000,"timestamp":1584535152923}
Nach dem Neustart einer Connector-Aufgabe kann die Verarbeitung ab dem letzten Offset fortgesetzt werden. Wenn Sie den Offset manuell ändern möchten, folgen Sie dieser Anleitung. Das Ändern des Offsets ist nützlich, wenn Sie Änderungen nicht von Anfang an streamen möchten.
Alternative Ansätze
In dieser Anleitung erfahren Sie, wie Sie die inkrementelle Replikation von Spanner in eine Zieldatenbank einrichten. Je nach Ihren Anforderungen müssen Sie Alternativen berücksichtigen. Diese Anforderungen können Replikationen nahezu in Echtzeit, genau nach der Datenübermittlung oder zur Aufbewahrung von Transaktionsaufträgen umfassen. Im Folgenden finden Sie eine kurze, nicht exklusive Liste von Alternativen:
- Verwenden Sie das verwaltete Angebot von Striim, um eine Batchreplikation aus Cloud Spanner einzurichten.
- Verwenden Sie Spanner Change Watcher.
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Am einfachsten können Sie die Abrechnung deaktivieren, wenn Sie das Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben. Alternativ haben Sie die Möglichkeit, die einzelnen Ressourcen zu löschen.
Projekt löschen
- Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
- Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.
Ressourcen löschen
Wenn Sie das in dieser Anleitung verwendete Cloud-Projekt beibehalten möchten, löschen Sie die einzelnen Ressourcen:
Löschen Sie den GKE-Cluster in Cloud Shell:
gcloud container clusters delete $CLUSTER --zone $ZONE --async
Löschen Sie den heruntergeladenen Code, die Artefakte, den privaten Schlüssel des Dienstkontos und andere Abhängigkeiten:
cd .. && rm -rf spanner-debezium-change-capture
Löschen Sie das Image in Container Registry:
gcloud container images list-tags \ gcr.io/$PROJECT/connect \ --format 'value(digest)' | \ xargs -I {} gcloud container images delete \ --force-delete-tags --quiet \ gcr.io/${PROJECT}/connect@sha256:{}
Löschen Sie das Dienstkonto:
gcloud iam service-accounts delete $SA_EMAIL
Löschen Sie die Spanner-Instanz:
gcloud spanner instances delete $SPANNER_INSTANCE`
Löschen Sie das BigQuery-Dataset:
bq rm -r -d $BQ_DATASET
Nächste Schritte
- Andere Alternativen zum Einrichten eines Änderungslogs in Cloud Spanner
- Weitere Informationen zum Ausführen von Kafka in GKE
- Weitere Informationen über eine Partnerlösung (Striim) zum Exportieren von Daten aus Cloud Spanner.
Mehr zum Erstellen eines eigenen ereignisbasierten Systems mit Cloud Spanner
Weitere Google Cloud-Features mit unseren Anleitungen testen