Hochverfügbaren Kafka-Cluster in GKE bereitstellen


Kafka ist ein verteiltes Open Source-Messaging-System für die Verarbeitung von großen Datenmengen mit hohem Durchsatz und Echtzeitstreaming. Mit Kafka können Sie Streamingdaten-Pipelines erstellen, die Daten zuverlässig zwischen verschiedenen Systemen und Anwendungen für die Verarbeitung und Analyse verschieben.

Diese Anleitung richtet sich an Plattformadministratoren, Cloud-Architekten und Betriebsexperten, die daran interessiert sind, hochverfügbare Kafka-Cluster in Google Kubernetes Engine (GKE) bereitzustellen.

Lernziele

In dieser Anleitung erhalten Sie Informationen zu folgenden Themen:

  • Verwenden Sie Terraform, um einen regionalen GKE-Cluster zu erstellen.
  • Stellen Sie ein hochverfügbares Kafka-Cluster bereit.
  • Aktualisieren Sie Kafka-Binärdateien.
  • Sichern und stellen Sie den Kafka-Cluster wieder her.
  • Simulieren Sie die Störung des GKE-Knotens und einen Failover des Kafka-Brokers.

Architektur

In diesem Abschnitt wird die Architektur der Lösung beschrieben, die Sie in dieser Anleitung erstellen.

Ein Kafka-Cluster ist eine Gruppe von einem oder mehreren Servern (so genannten Brokern), die zusammenarbeiten, um eingehende Datenstreams und Publish/Subscribe-Messaging für Kafka-Clients (so genannte Verbraucher) zu verarbeiten.

Jede Datenpartition in einem Kafka-Cluster hat einen Leader-Broker und kann einen oder mehrere Follower-Broker haben. Der Leader-Broker verarbeitet alle Lese- und Schreibvorgänge in der Partition. Jeder Follower-Broker repliziert den Leader-Broker passiv.

In einer typischen Kafka-Einrichtung verwenden Sie auch einen Open Source-Dienst namens ZooKeeper, um Ihre Kafka-Cluster zu koordinieren. Dieser Dienst hilft Ihnen, einen Leader unter den Brokern zu wählen und bei Fehlern ein Failover auszulösen.

In dieser Anleitung stellen Sie die Kafka-Cluster in GKE bereit. Dazu konfigurieren Sie die Kafka-Broker und den Zookeeper-Dienst als einzelne StatefulSets. Um hochverfügbare Kafka-Cluster bereitzustellen und die Notfallwiederherstellung vorzubereiten, konfigurieren Sie Ihre Kafka- und Zookeeper-StatefulSets für die Verwendung separater Knotenpools und Zonen.

Das folgende Diagramm zeigt, wie Ihr Kafka-StatefulSet auf mehreren Knoten und Zonen in Ihrem GKE-Cluster ausgeführt wird.

Das Diagramm zeigt eine Beispielarchitektur eines Kafka-StatefulSets in GKE, das in mehreren Zonen bereitgestellt wird.
Abbildung 1: Kafka-StatefulSet auf GKE-Knoten in drei verschiedenen Zonen bereitstellen.

Das folgende Diagramm zeigt, wie Ihr Zookeeper-StatefulSet auf mehreren Knoten und Zonen in Ihrem GKE-Cluster ausgeführt wird.

Das Diagramm zeigt eine Beispielarchitektur eines Zookeeper-StatefulSets in GKE, das in mehreren Zonen bereitgestellt wird.
Abbildung 2: Kafka Zookeeper auf GKE-Knoten in drei verschiedenen Zonen bereitstellen

Knotenbereitstellung und Pod-Planung

Wenn Sie Autopilot-Cluster verwenden, übernimmt Autopilot die Bereitstellung von Knoten und die Planung der Pods für Ihre Arbeitslasten. Sie verwenden die Anti-Affinität eines Pods, damit zwei Pods desselben StatefulSets nicht auf demselben Knoten und in derselben Zone geplant sind.

Wenn Sie Standardcluster verwenden, müssen Sie die Pod-Toleranz und die Knotenaffinität konfigurieren. Weitere Informationen finden Sie unter Arbeitslasten in dedizierten Knotenpools isolieren.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweis

Projekt einrichten

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

Rollen einrichten

  1. Grant roles to your user account. Run the following command once for each of the following IAM roles: role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Umgebung einrichten

In dieser Anleitung verwenden Sie Cloud Shell zum Verwalten von Ressourcen, die in Google Cloud gehostet werden. Die Software, die Sie für diese Anleitung benötigen, ist in Cloud Shell vorinstalliert, einschließlich Docker, kubectl und der gcloud CLI, Helm und Terraform.

So richten Sie Ihre Umgebung mit Cloud Shell ein:

  1. Starten Sie eine Cloud Shell-Sitzung über die Google Cloud Console. Klicken Sie dazu in der Google Cloud Console auf Symbol für die Cloud Shell-Aktivierung Cloud Shell aktivieren. Dadurch wird im unteren Bereich der Google Cloud Console eine Sitzung gestartet.

  2. Umgebungsvariablen festlegen

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    

    Ersetzen Sie die folgenden Werte:

  3. Legen Sie die Standardumgebungsvariablen fest.

    gcloud config set project PROJECT_ID
    
  4. Klonen Sie das Code-Repository.

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  5. Wechseln Sie in das Arbeitsverzeichnis.

    cd kubernetes-engine-samples/streaming/gke-stateful-kafka
    

Clusterinfrastruktur erstellen

In diesem Abschnitt führen Sie ein Terraform-Skript aus, um zwei regionale GKE-Cluster zu erstellen. Der primäre Cluster wird in us-central1 bereitgestellt.

So erstellen Sie den Cluster:

Autopilot

Führen Sie in Cloud Shell die folgenden Befehle aus:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

Geben Sie bei Aufforderung yes ein.

Standard

Führen Sie in Cloud Shell die folgenden Befehle aus:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

Geben Sie bei Aufforderung yes ein.

Die Terraform-Konfigurationsdateien erstellen die folgenden Ressourcen, um Ihre Infrastruktur bereitzustellen:

  • Erstellen Sie ein Artifact Registry-Repository zum Speichern der Docker-Images.
  • Erstellen Sie das VPC-Netzwerk und das Subnetz für die Netzwerkschnittstelle der VM.
  • Erstellen Sie zwei GKE-Cluster.

Terraform erstellt einen privaten Cluster in beiden Regionen und aktiviert Backup for GKE für die Notfallwiederherstellung.

Kafka auf einem Cluster bereitstellen

In diesem Abschnitt stellen Sie Kafka in GKE mithilfe eines Helm-Diagramms bereit. Der Vorgang erstellt die folgenden Ressourcen:

  • Die Kafka- und Zookeeper-StatefulSets.
  • Eine Bereitstellung von Kafka-Exporter. Der Exporter erfasst Kafka-Messwerte für die Prometheus-Nutzung.
  • Ein Budget für Pod-Störungen (PDB), das die Anzahl der Offline-Pods während einer freiwilligen Unterbrechung begrenzt.

So verwenden Sie das Helm-Diagramm, um Kafka bereitzustellen:

  1. Konfigurieren Sie den Docker-Zugriff.

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Füllen Sie Artifact Registry mit den Kafka- und Zookeeper-Images.

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. Konfigurieren Sie den Zugriff auf die kubectl-Befehlszeile für den primären Cluster.

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --region=${REGION} \
        --project=${PROJECT_ID}
    
  4. Namespace erstellen

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Installieren Sie Kafka mit der Helm-Diagrammversion 20.0.6.

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    Die Ausgabe sieht in etwa so aus:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Prüfen Sie, ob Ihre Kafka-Replikate ausgeführt werden. Dies kann einige Minuten dauern.

    kubectl get all -n kafka
    

    Die Ausgabe sieht in etwa so aus:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

Testdaten erstellen

In diesem Abschnitt testen Sie die Kafka-Anwendung und generieren Nachrichten.

  1. Erstellen Sie einen Consumer-Client-Pod für die Interaktion mit der Kafka-Anwendung.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. Erstellen Sie ein Thema mit dem Namen topic1 mit drei Partitionen und einem Replikationsfaktor von drei.

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Prüfen Sie, ob die Themenpartitionen über alle drei Broker repliziert werden.

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    Die Ausgabe sieht in etwa so aus:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    In der Beispielausgabe sehen Sie, dass topic1 drei Partitionen mit jeweils einem anderen Leader und Replikatsatz hat. Dies liegt daran, dass Kafka die Partitionierung verwendet, um die Daten auf mehrere Broker zu verteilen, was eine höhere Skalierbarkeit und Fehlertoleranz ermöglicht. Mit dem Replikationsfaktor von drei wird sichergestellt, dass jede Partition drei Replikate hat, sodass Daten auch dann verfügbar sind, wenn ein oder zwei Broker ausfallen.

  4. Führen Sie den folgenden Befehl aus, um Nachrichtennummern im Bulk in topic1 zu generieren.

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. Führen Sie den folgenden Befehl aus, um topic1 aus allen Partitionen zu nutzen.

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    Geben Sie CTRL+C ein, um den Nutzervorgang zu beenden.

Benchmark Kafka

Um einen Anwendungsfall genau zu modellieren, können Sie eine Simulation der erwarteten Last auf dem Cluster ausführen. Zum Testen der Leistung verwenden Sie die Tools im Kafka-Paket, nämlich die Skripts kafka-producer-perf-test.sh und kafka-consumer-perf-test.sh im Ordner bin.

  1. Erstellen Sie ein Thema für das Benchmarking.

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Erstellen Sie eine Last auf dem Kafka-Cluster.

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    Der Ersteller generiert 10.000.000 Einträge im topic-benchmark. Die Ausgabe sieht etwa so aus:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    Nachdem alle Einträge gesendet wurden, sollten Sie in der Ausgabe zusätzliche Messwerte sehen, ähnlich dem folgenden:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    Geben Sie CTRL + C ein, um die Smartwatch zu beenden.

  3. Beenden Sie die Pod-Shell.

    exit
    

Upgrades verwalten

Die Versionsupdates für Kafka und Kubernetes werden in regelmäßigen Abständen veröffentlicht. Halten Sie sich an die operativen Best Practices, um Ihre Softwareumgebung regelmäßig zu aktualisieren.

Für Kafka-Binärupgrades planen

In diesem Abschnitt aktualisieren Sie das Kafka-Image mit Helm und prüfen, ob Ihre Themen noch verfügbar sind.

So führen Sie ein Upgrade von der früheren Kafka-Version aus dem Helm-Diagramm durch, das Sie unter Kafka auf einem Cluster bereitstellen verwendet haben:

  1. Füllen Sie Artifact Registry mit dem folgenden Image aus:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. Führen Sie die folgenden Schritte aus, um ein Helm-Diagramm mit den aktualisierten Kafka- und Zookeeper-Images bereitzustellen. Eine versionsspezifische Anleitung finden Sie in der Kafka-Anleitung für Versionsupgrades.

    1. Aktualisieren Sie die Abhängigkeitsversion Chart.yaml:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. Stellen Sie das Helm-Diagramm mit den neuen Kafka- und Zookeeper-Images bereit, wie im folgenden Beispiel gezeigt:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Sehen Sie sich an, wie die Kafka-Pods aktualisiert werden:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    Geben Sie CTRL + C ein, um die Smartwatch zu beenden.

  3. Stellen Sie über einen Client-Pod eine Verbindung zum Kafka-Cluster her.

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. Prüfen Sie, ob Sie von topic1 auf Nachrichten zugreifen können.

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    Die Ausgabe sollte die generierten Nachrichten aus dem vorherigen Schritt enthalten. Geben Sie CTRL+C ein, um den Prozess zu beenden.

  5. Beenden Sie den Client-Pod.

    exit
    

Vorbereitung für die Notfallwiederherstellung

Damit Ihre Produktionsarbeitslasten im Falle eines Dienstunterbrechungsereignisses verfügbar bleiben, sollten Sie einen Plan zur Notfallwiederherstellung (Disaster Recovery, DR) erstellen. Weitere Informationen zur Planung der Notfallwiederherstellung finden Sie im Leitfaden zur Planung der Notfallwiederherstellung.

Zum Sichern und Wiederherstellen Ihrer Arbeitslasten in GKE-Clustern können Sie Backup for GKE verwenden.

Beispielszenario für eine Kafka-Sicherung und -Wiederherstellung

In diesem Abschnitt erstellen Sie eine Sicherung des Clusters aus gke-kafka-us-central1 und stellen die Sicherung in gke-kafka-us-west1 wieder her. Sie führen den Sicherungs- und Wiederherstellungsvorgang auf der Anwendungsebene mit der benutzerdefinierten Ressource ProtectedApplication aus.

Das folgende Diagramm veranschaulicht die Komponenten der Notfallwiederherstellungslösung und ihre Beziehung zueinander.

Das Diagramm zeigt ein Beispiel für eine Sicherungs- und Wiederherstellungslösung für einen hochverfügbaren Kafka-Cluster.
Abbildung 3: Beispiel für eine Sicherungs- und Wiederherstellungslösung für einen hochverfügbaren Kafka-Cluster.

So bereiten Sie die Sicherung und Wiederherstellung Ihres Kafka-Clusters vor:

  1. Richten Sie Umgebungsvariablen ein.

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. Prüfen Sie, ob der Cluster den Status RUNNING hat.

    gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
    
  3. Erstellen Sie einen Sicherungsplan.

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. Erstellen Sie manuell eine Sicherung. Geplante Sicherungen unterliegen normalerweise dem Cron-Schedule im Sicherungsplan. Das folgende Beispiel zeigt, wie Sie einen einmaligen Sicherungsvorgang initiieren.

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. Erstellen Sie einen Wiederherstellungsplan.

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. Stellen Sie manuell aus einer Sicherung wieder her.

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. Sehen Sie sich die wiederhergestellte Anwendung im Sicherungscluster an. Es kann einige Minuten dauern, bis alle Pods ausgeführt werden und bereit sind.

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --region us-west1
    kubectl get pod -n kafka --watch
    

    Geben Sie CTRL+C ein, um die Überwachung zu beenden, wenn alle Pods ausgeführt werden.

  8. Prüfen Sie, ob vorherige Themen von einem Nutzer abgerufen werden können.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    Die Ausgabe sieht in etwa so aus:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    Geben Sie CTRL+C ein, um den Prozess zu beenden.

  9. Beenden Sie den Pod.

    exit
    

Kafka-Dienstunterbrechung simulieren

In diesem Abschnitt simulieren Sie einen Knotenfehler. Ersetzen Sie dazu einen Kubernetes-Knoten, der den Broker hostet. Dieser Abschnitt gilt nur für Standard. Autopilot verwaltet Ihre Knoten für Sie, sodass der Knotenfehler nicht simuliert werden kann.

  1. Erstellen Sie einen Client-Pod, um eine Verbindung zur Kafka-Anwendung herzustellen.

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. Erstellen Sie das Thema topic-failover-test und erstellen Sie Test-Traffic.

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Ermitteln Sie, welcher Broker für das Thema topic-failover-test verantwortlich ist.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    Die Ausgabe sieht in etwa so aus:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    In der obigen Ausgabe bedeutet Leader: 1, dass der Leader für topic-failover-test Broker 1 ist. Dies entspricht Pod kafka-1.

  4. Öffnen Sie ein neues Terminal und stellen Sie eine Verbindung zu demselben Cluster her.

    gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
    
  5. Suchen Sie nach dem Knoten-Pod kafka-1, auf dem der Pod ausgeführt wird.

    kubectl get pod -n kafka kafka-1 -o wide
    

    Die Ausgabe sieht in etwa so aus:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    In der Ausgabe oben sehen Sie, dass der Pod kafka-1 auf dem Knoten gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 ausgeführt wird.

  6. Leeren Sie den Knoten, um die Pods zu entfernen.

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    Ersetzen Sie NODE durch den Knoten-Pod, auf dem kafka-1 ausgeführt wird. In diesem Beispiel ist der Knoten gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

    Die Ausgabe sieht in etwa so aus:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. Suchen Sie nach dem Knoten-Pod kafka-1, auf dem der Pod ausgeführt wird.

    kubectl get pod -n kafka kafka-1 -o wide
    

    Die Ausgabe sollte in etwa so aussehen:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    In der obigen Ausgabe sehen Sie, dass die Anwendung auf einem neuen Knoten ausgeführt wird.

  8. Ermitteln Sie in dem Terminal, das mit dem Pod kafka-client verbunden ist, ob der Broker für topic-failover-test zuständig ist.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    Die Ausgabe sollte in etwa so aussehen:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    In der Beispielausgabe ist der Leader immer noch 1 . Er wird jedoch jetzt auf einem neuen Knoten ausgeführt.

Fehler beim Kafka-Leader testen

  1. Stellen Sie in Cloud Shell eine Verbindung zum Kafka-Client her und verwenden Sie describe, um den ausgewählten Leader für jede Partition in topic1 aufzurufen.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    Die Ausgabe sieht in etwa so aus:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. Löschen Sie in der Cloud Shell, die nicht mit dem Kafka-Client verbunden ist, den Leader-Broker kafka-0, um eine neue Leader-Auswahl zu erzwingen. Sie sollten den Index löschen, der einem der Leader in der vorherigen Ausgabe zugeordnet ist.

    kubectl delete pod -n kafka kafka-0 --force
    

    Die Ausgabe sieht in etwa so aus:

    pod "kafka-0" force deleted
    
  3. Verwenden Sie in der Cloud Shell die Verbindung zum Kafka-Client und verwenden Sie describe, um den ausgewählten Leader aufzurufen.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    Die Ausgabe sieht in etwa so aus:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    In der Ausgabe ändert sich der neue Leader für jede Partition, wenn er dem Leader zugewiesen wurde, der unterbrochen wurde (kafka-0). Dies weist darauf hin, dass der ursprüngliche Leader beim Löschen und Neuerstellen des Pods ersetzt wurde.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

Sie vermeiden weitere Kosten am einfachsten, wenn Sie das für die Anleitung erstellte Projekt löschen.

Delete a Google Cloud project:

gcloud projects delete PROJECT_ID

Nächste Schritte