Kafka ist ein verteiltes Open Source-Messaging-System für die Verarbeitung von großen Datenmengen mit hohem Durchsatz und Echtzeitstreaming. Mit Kafka können Sie Streamingdaten-Pipelines erstellen, die Daten zuverlässig zwischen verschiedenen Systemen und Anwendungen für die Verarbeitung und Analyse verschieben.
Diese Anleitung richtet sich an Plattformadministratoren, Cloud-Architekten und Betriebsexperten, die daran interessiert sind, hochverfügbare Kafka-Cluster in Google Kubernetes Engine (GKE) bereitzustellen.
Lernziele
In dieser Anleitung erhalten Sie Informationen zu folgenden Themen:- Verwenden Sie Terraform, um einen regionalen GKE-Cluster zu erstellen.
- Stellen Sie ein hochverfügbares Kafka-Cluster bereit.
- Aktualisieren Sie Kafka-Binärdateien.
- Sichern und stellen Sie den Kafka-Cluster wieder her.
- Simulieren Sie die Störung des GKE-Knotens und einen Failover des Kafka-Brokers.
Architektur
In diesem Abschnitt wird die Architektur der Lösung beschrieben, die Sie in dieser Anleitung erstellen.
Ein Kafka-Cluster ist eine Gruppe von einem oder mehreren Servern (so genannten Brokern), die zusammenarbeiten, um eingehende Datenstreams und Publish/Subscribe-Messaging für Kafka-Clients (so genannte Verbraucher) zu verarbeiten.
Jede Datenpartition in einem Kafka-Cluster hat einen Leader-Broker und kann einen oder mehrere Follower-Broker haben. Der Leader-Broker verarbeitet alle Lese- und Schreibvorgänge in der Partition. Jeder Follower-Broker repliziert den Leader-Broker passiv.
In einer typischen Kafka-Einrichtung verwenden Sie auch einen Open Source-Dienst namens ZooKeeper, um Ihre Kafka-Cluster zu koordinieren. Dieser Dienst hilft Ihnen, einen Leader unter den Brokern zu wählen und bei Fehlern ein Failover auszulösen.
In dieser Anleitung stellen Sie die Kafka-Cluster in GKE bereit. Dazu konfigurieren Sie die Kafka-Broker und den Zookeeper-Dienst als einzelne StatefulSets. Um hochverfügbare Kafka-Cluster bereitzustellen und die Notfallwiederherstellung vorzubereiten, konfigurieren Sie Ihre Kafka- und Zookeeper-StatefulSets für die Verwendung separater Knotenpools und Zonen.
Das folgende Diagramm zeigt, wie Ihr Kafka-StatefulSet auf mehreren Knoten und Zonen in Ihrem GKE-Cluster ausgeführt wird.
Das folgende Diagramm zeigt, wie Ihr Zookeeper-StatefulSet auf mehreren Knoten und Zonen in Ihrem GKE-Cluster ausgeführt wird.
Knotenbereitstellung und Pod-Planung
Wenn Sie Autopilot-Cluster verwenden, übernimmt Autopilot die Bereitstellung von Knoten und die Planung der Pods für Ihre Arbeitslasten. Sie verwenden die Anti-Affinität eines Pods, damit zwei Pods desselben StatefulSets nicht auf demselben Knoten und in derselben Zone geplant sind.
Wenn Sie Standardcluster verwenden, müssen Sie die Pod-Toleranz und die Knotenaffinität konfigurieren. Weitere Informationen finden Sie unter Arbeitslasten in dedizierten Knotenpools isolieren.
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweis
Projekt einrichten
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
Rollen einrichten
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin
$ gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Umgebung einrichten
In dieser Anleitung verwenden Sie Cloud Shell zum Verwalten von Ressourcen, die in Google Cloud gehostet werden. Die Software, die Sie für diese Anleitung benötigen, ist in Cloud Shell vorinstalliert, einschließlich Docker, kubectl
und der gcloud CLI, Helm und Terraform.
So richten Sie Ihre Umgebung mit Cloud Shell ein:
Starten Sie eine Cloud Shell-Sitzung über die Google Cloud Console. Klicken Sie dazu in der Google Cloud Console auf Cloud Shell aktivieren. Dadurch wird im unteren Bereich der Google Cloud Console eine Sitzung gestartet.
Umgebungsvariablen festlegen
export PROJECT_ID=PROJECT_ID export REGION=us-central1
Ersetzen Sie die folgenden Werte:
- PROJECT_ID: Ihre Google Cloud-Projekt-ID.
Legen Sie die Standardumgebungsvariablen fest.
gcloud config set project PROJECT_ID
Klonen Sie das Code-Repository.
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
Wechseln Sie in das Arbeitsverzeichnis.
cd kubernetes-engine-samples/streaming/gke-stateful-kafka
Clusterinfrastruktur erstellen
In diesem Abschnitt führen Sie ein Terraform-Skript aus, um zwei regionale GKE-Cluster zu erstellen.
Der primäre Cluster wird in us-central1
bereitgestellt.
So erstellen Sie den Cluster:
Autopilot
Führen Sie in Cloud Shell die folgenden Befehle aus:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
Geben Sie bei Aufforderung yes
ein.
Standard
Führen Sie in Cloud Shell die folgenden Befehle aus:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
Geben Sie bei Aufforderung yes
ein.
Die Terraform-Konfigurationsdateien erstellen die folgenden Ressourcen, um Ihre Infrastruktur bereitzustellen:
- Erstellen Sie ein Artifact Registry-Repository zum Speichern der Docker-Images.
- Erstellen Sie das VPC-Netzwerk und das Subnetz für die Netzwerkschnittstelle der VM.
- Erstellen Sie zwei GKE-Cluster.
Terraform erstellt einen privaten Cluster in beiden Regionen und aktiviert Backup for GKE für die Notfallwiederherstellung.
Kafka auf einem Cluster bereitstellen
In diesem Abschnitt stellen Sie Kafka in GKE mithilfe eines Helm-Diagramms bereit. Der Vorgang erstellt die folgenden Ressourcen:
- Die Kafka- und Zookeeper-StatefulSets.
- Eine Bereitstellung von Kafka-Exporter. Der Exporter erfasst Kafka-Messwerte für die Prometheus-Nutzung.
- Ein Budget für Pod-Störungen (PDB), das die Anzahl der Offline-Pods während einer freiwilligen Unterbrechung begrenzt.
So verwenden Sie das Helm-Diagramm, um Kafka bereitzustellen:
Konfigurieren Sie den Docker-Zugriff.
gcloud auth configure-docker us-docker.pkg.dev
Füllen Sie Artifact Registry mit den Kafka- und Zookeeper-Images.
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
Konfigurieren Sie den Zugriff auf die
kubectl
-Befehlszeile für den primären Cluster.gcloud container clusters get-credentials gke-kafka-us-central1 \ --region=${REGION} \ --project=${PROJECT_ID}
Namespace erstellen
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
Installieren Sie Kafka mit der Helm-Diagrammversion 20.0.6.
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
Die Ausgabe sieht in etwa so aus:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
Prüfen Sie, ob Ihre Kafka-Replikate ausgeführt werden. Dies kann einige Minuten dauern.
kubectl get all -n kafka
Die Ausgabe sieht in etwa so aus:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
Testdaten erstellen
In diesem Abschnitt testen Sie die Kafka-Anwendung und generieren Nachrichten.
Erstellen Sie einen Consumer-Client-Pod für die Interaktion mit der Kafka-Anwendung.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
Erstellen Sie ein Thema mit dem Namen
topic1
mit drei Partitionen und einem Replikationsfaktor von drei.kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Prüfen Sie, ob die Themenpartitionen über alle drei Broker repliziert werden.
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Die Ausgabe sieht in etwa so aus:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
In der Beispielausgabe sehen Sie, dass
topic1
drei Partitionen mit jeweils einem anderen Leader und Replikatsatz hat. Dies liegt daran, dass Kafka die Partitionierung verwendet, um die Daten auf mehrere Broker zu verteilen, was eine höhere Skalierbarkeit und Fehlertoleranz ermöglicht. Mit dem Replikationsfaktor von drei wird sichergestellt, dass jede Partition drei Replikate hat, sodass Daten auch dann verfügbar sind, wenn ein oder zwei Broker ausfallen.Führen Sie den folgenden Befehl aus, um Nachrichtennummern im Bulk in
topic1
zu generieren.ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
Führen Sie den folgenden Befehl aus, um
topic1
aus allen Partitionen zu nutzen.kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
Geben Sie
CTRL+C
ein, um den Nutzervorgang zu beenden.
Benchmark Kafka
Um einen Anwendungsfall genau zu modellieren, können Sie eine Simulation der erwarteten Last auf dem Cluster ausführen. Zum Testen der Leistung verwenden Sie die Tools im Kafka-Paket, nämlich die Skripts kafka-producer-perf-test.sh
und kafka-consumer-perf-test.sh
im Ordner bin
.
Erstellen Sie ein Thema für das Benchmarking.
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Erstellen Sie eine Last auf dem Kafka-Cluster.
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
Der Ersteller generiert 10.000.000 Einträge im
topic-benchmark
. Die Ausgabe sieht etwa so aus:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
Nachdem alle Einträge gesendet wurden, sollten Sie in der Ausgabe zusätzliche Messwerte sehen, ähnlich dem folgenden:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
Geben Sie
CTRL + C
ein, um die Smartwatch zu beenden.Beenden Sie die Pod-Shell.
exit
Upgrades verwalten
Die Versionsupdates für Kafka und Kubernetes werden in regelmäßigen Abständen veröffentlicht. Halten Sie sich an die operativen Best Practices, um Ihre Softwareumgebung regelmäßig zu aktualisieren.
Für Kafka-Binärupgrades planen
In diesem Abschnitt aktualisieren Sie das Kafka-Image mit Helm und prüfen, ob Ihre Themen noch verfügbar sind.
So führen Sie ein Upgrade von der früheren Kafka-Version aus dem Helm-Diagramm durch, das Sie unter Kafka auf einem Cluster bereitstellen verwendet haben:
Füllen Sie Artifact Registry mit dem folgenden Image aus:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
Führen Sie die folgenden Schritte aus, um ein Helm-Diagramm mit den aktualisierten Kafka- und Zookeeper-Images bereitzustellen. Eine versionsspezifische Anleitung finden Sie in der Kafka-Anleitung für Versionsupgrades.
- Aktualisieren Sie die Abhängigkeitsversion
Chart.yaml
:
../scripts/chart.sh kafka 20.1.0
Stellen Sie das Helm-Diagramm mit den neuen Kafka- und Zookeeper-Images bereit, wie im folgenden Beispiel gezeigt:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
Sehen Sie sich an, wie die Kafka-Pods aktualisiert werden:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
Geben Sie
CTRL + C
ein, um die Smartwatch zu beenden.- Aktualisieren Sie die Abhängigkeitsversion
Stellen Sie über einen Client-Pod eine Verbindung zum Kafka-Cluster her.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
Prüfen Sie, ob Sie von
topic1
auf Nachrichten zugreifen können.kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Die Ausgabe sollte die generierten Nachrichten aus dem vorherigen Schritt enthalten. Geben Sie
CTRL+C
ein, um den Prozess zu beenden.Beenden Sie den Client-Pod.
exit
Vorbereitung für die Notfallwiederherstellung
Damit Ihre Produktionsarbeitslasten im Falle eines Dienstunterbrechungsereignisses verfügbar bleiben, sollten Sie einen Plan zur Notfallwiederherstellung (Disaster Recovery, DR) erstellen. Weitere Informationen zur Planung der Notfallwiederherstellung finden Sie im Leitfaden zur Planung der Notfallwiederherstellung.
Zum Sichern und Wiederherstellen Ihrer Arbeitslasten in GKE-Clustern können Sie Backup for GKE verwenden.
Beispielszenario für eine Kafka-Sicherung und -Wiederherstellung
In diesem Abschnitt erstellen Sie eine Sicherung des Clusters aus gke-kafka-us-central1
und stellen die Sicherung in gke-kafka-us-west1
wieder her. Sie führen den Sicherungs- und Wiederherstellungsvorgang auf der Anwendungsebene mit der benutzerdefinierten Ressource ProtectedApplication
aus.
Das folgende Diagramm veranschaulicht die Komponenten der Notfallwiederherstellungslösung und ihre Beziehung zueinander.
So bereiten Sie die Sicherung und Wiederherstellung Ihres Kafka-Clusters vor:
Richten Sie Umgebungsvariablen ein.
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
Prüfen Sie, ob der Cluster den Status
RUNNING
hat.gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
Erstellen Sie einen Sicherungsplan.
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
Erstellen Sie manuell eine Sicherung. Geplante Sicherungen unterliegen normalerweise dem Cron-Schedule im Sicherungsplan. Das folgende Beispiel zeigt, wie Sie einen einmaligen Sicherungsvorgang initiieren.
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
Erstellen Sie einen Wiederherstellungsplan.
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
Stellen Sie manuell aus einer Sicherung wieder her.
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
Sehen Sie sich die wiederhergestellte Anwendung im Sicherungscluster an. Es kann einige Minuten dauern, bis alle Pods ausgeführt werden und bereit sind.
gcloud container clusters get-credentials gke-kafka-us-west1 \ --region us-west1 kubectl get pod -n kafka --watch
Geben Sie
CTRL+C
ein, um die Überwachung zu beenden, wenn alle Pods ausgeführt werden.Prüfen Sie, ob vorherige Themen von einem Nutzer abgerufen werden können.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
Die Ausgabe sieht in etwa so aus:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
Geben Sie
CTRL+C
ein, um den Prozess zu beenden.Beenden Sie den Pod.
exit
Kafka-Dienstunterbrechung simulieren
In diesem Abschnitt simulieren Sie einen Knotenfehler. Ersetzen Sie dazu einen Kubernetes-Knoten, der den Broker hostet. Dieser Abschnitt gilt nur für Standard. Autopilot verwaltet Ihre Knoten für Sie, sodass der Knotenfehler nicht simuliert werden kann.
Erstellen Sie einen Client-Pod, um eine Verbindung zur Kafka-Anwendung herzustellen.
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
Erstellen Sie das Thema
topic-failover-test
und erstellen Sie Test-Traffic.kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Ermitteln Sie, welcher Broker für das Thema
topic-failover-test
verantwortlich ist.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Die Ausgabe sieht in etwa so aus:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
In der obigen Ausgabe bedeutet
Leader: 1
, dass der Leader fürtopic-failover-test
Broker 1 ist. Dies entspricht Podkafka-1
.Öffnen Sie ein neues Terminal und stellen Sie eine Verbindung zu demselben Cluster her.
gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
Suchen Sie nach dem Knoten-Pod
kafka-1
, auf dem der Pod ausgeführt wird.kubectl get pod -n kafka kafka-1 -o wide
Die Ausgabe sieht in etwa so aus:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
In der Ausgabe oben sehen Sie, dass der Pod
kafka-1
auf dem Knotengke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
ausgeführt wird.Leeren Sie den Knoten, um die Pods zu entfernen.
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
Ersetzen Sie NODE durch den Knoten-Pod, auf dem kafka-1 ausgeführt wird. In diesem Beispiel ist der Knoten
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.Die Ausgabe sieht in etwa so aus:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
Suchen Sie nach dem Knoten-Pod
kafka-1
, auf dem der Pod ausgeführt wird.kubectl get pod -n kafka kafka-1 -o wide
Die Ausgabe sollte in etwa so aussehen:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
In der obigen Ausgabe sehen Sie, dass die Anwendung auf einem neuen Knoten ausgeführt wird.
Ermitteln Sie in dem Terminal, das mit dem Pod
kafka-client
verbunden ist, ob der Broker fürtopic-failover-test
zuständig ist.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Die Ausgabe sollte in etwa so aussehen:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
In der Beispielausgabe ist der Leader immer noch 1 . Er wird jedoch jetzt auf einem neuen Knoten ausgeführt.
Fehler beim Kafka-Leader testen
Stellen Sie in Cloud Shell eine Verbindung zum Kafka-Client her und verwenden Sie
describe
, um den ausgewählten Leader für jede Partition intopic1
aufzurufen.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Die Ausgabe sieht in etwa so aus:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
Löschen Sie in der Cloud Shell, die nicht mit dem Kafka-Client verbunden ist, den Leader-Broker
kafka-0
, um eine neue Leader-Auswahl zu erzwingen. Sie sollten den Index löschen, der einem der Leader in der vorherigen Ausgabe zugeordnet ist.kubectl delete pod -n kafka kafka-0 --force
Die Ausgabe sieht in etwa so aus:
pod "kafka-0" force deleted
Verwenden Sie in der Cloud Shell die Verbindung zum Kafka-Client und verwenden Sie
describe
, um den ausgewählten Leader aufzurufen.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Die Ausgabe sieht in etwa so aus:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
In der Ausgabe ändert sich der neue Leader für jede Partition, wenn er dem Leader zugewiesen wurde, der unterbrochen wurde (
kafka-0
). Dies weist darauf hin, dass der ursprüngliche Leader beim Löschen und Neuerstellen des Pods ersetzt wurde.
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Projekt löschen
Sie vermeiden weitere Kosten am einfachsten, wenn Sie das für die Anleitung erstellte Projekt löschen.
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Nächste Schritte
- Einen vollständig verwalteten und skalierbaren Messaging-Dienst finden Sie unter Von Kafka zu Pub/Sub migrieren.