Apache Kafka mit Confluent in GKE bereitstellen


In dieser Anleitung erfahren Sie, wie Sie mit dem Operator Confluent for Kubernetes (CFK) Apache Kafka in Google Kubernetes Engine (GKE) bereitstellen.

Kafka ist ein verteiltes Open Source-Messaging-System für die Verarbeitung von großen Datenmengen mit hohem Durchsatz und Echtzeitstreaming. Mit Kafka können Sie Streamingdaten-Pipelines erstellen, die Daten zuverlässig zwischen verschiedenen Systemen und Anwendungen für die Verarbeitung und Analyse verschieben.

Dieser Leitfaden richtet sich an Plattformadministratoren, Cloud-Architekten und Betriebsexperten, die an der Bereitstellung von Kafka-Clustern in GKE interessiert sind.

Sie können auch den CFK-Operator verwenden, um andere Komponenten der Confluent Platform bereitzustellen, z. B. das webbasierte Confluent Control-Center, Schema Registry oder KsqlDB. In diesem Leitfaden geht es jedoch nur um Kafka-Bereitstellungen.

Ziele

  • GKE-Infrastruktur für Apache Kafka planen und bereitstellen
  • CFK-Operator bereitstellen und konfigurieren
  • Apache Kafka mit dem CFK-Operator konfigurieren, um Verfügbarkeit, Sicherheit, Beobachtbarkeit und Leistung zu gewährleisten

Vorteile

CFK bietet folgende Vorteile:

  • Automatisierte Rolling Updates für Konfigurationsänderungen.
  • Automatisierte Rolling Upgrades ohne Auswirkungen auf die Kafka-Verfügbarkeit.
  • Wenn ein Fehler auftritt, stellt CFK einen Kafka-Pod mit derselben Kafka-Broker-ID, Konfiguration und nichtflüchtigen Speicher-Volumes wieder her.
  • Automatisiertes Rack-Awareness, um Replikate einer Partition auf verschiedene Racks (oder Zonen) zu verteilen, die Verfügbarkeit von Kafka-Brokern zu verbessern und das Risiko eines Datenverlusts zu begrenzen.
  • Unterstützung für den Export aggregierter Messwerte nach Prometheus.

Bereitstellungsarchitektur

Jede Datenpartition in einem Kafka-Cluster hat einen Leader-Broker und kann einen oder mehrere Follower-Broker haben. Der Leader-Broker verarbeitet alle Lese- und Schreibvorgänge in der Partition. Jeder Follower-Broker repliziert den Leader-Broker passiv.

In einer typischen Kafka-Einrichtung verwenden Sie auch einen Open Source-Dienst namens ZooKeeper, um Ihre Kafka-Cluster zu koordinieren. Dieser Dienst hilft Ihnen, einen Leader unter den Brokern zu wählen und bei Fehlern ein Failover auszulösen.

Sie können die Kafka-Konfiguration auch ohne Zookeeper bereitstellen. Aktivieren Sie dazu den KRaft-Modus. Diese Methode wird jedoch aufgrund der fehlenden Unterstützung für KafkaTopic-Ressourcen und der Authentifizierung mit Anmeldeinformationen nicht als produktionsreif angesehen.

Verfügbarkeit und Notfallwiederherstellung

In dieser Anleitung werden separate Knotenpools und Zonen für Cluster von Kafka und ZooKeeper verwendet, um die Hochverfügbarkeit sicherzustellen und sich auf Notfallwiederherstellung vorzubereiten.

Hochverfügbare Kubernetes-Cluster in Google Cloud basieren auf regionalen Clustern, die sich über mehrere Knoten und Verfügbarkeitszonen erstrecken. Diese Konfiguration verbessert Fehlertoleranz, Skalierbarkeit und geografische Redundanz. Mit dieser Konfiguration können Sie auch Rolling Updates und Wartungsaufgaben durchführen und gleichzeitig SLAs für Betriebszeit und Verfügbarkeit bereitstellen. Weitere Informationen finden Sie unter Regionale Cluster.

Bereitstellungsdiagramm

Das folgende Diagramm zeigt einen Kafka-Cluster, der auf mehreren Knoten und Zonen in einem GKE-Cluster ausgeführt wird:

Im Diagramm wird das Kafka-StatefulSet auf drei Knoten in drei verschiedenen Zonen bereitgestellt. Sie können diese Konfiguration steuern. Legen Sie dazu die erforderlichen Regeln für Pod-Affinität und Topologieverteilung auf die benutzerdefinierte Ressourcenspezifikation Kafka fest.

Wenn bei der empfohlenen Konfiguration eine Zone ausfällt, plant GKE die Pods auf neue Knoten um und repliziert die Daten von den verbleibenden Replikaten, sowohl für Kafka als auch für Zookeeper.

Das folgende Diagramm zeigt ein ZooKeeper-StatefulSet, das auf drei Knoten in drei verschiedenen Zonen bereitgestellt wird:

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Vorbereitung

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  12. Grant roles to your user account. Run the following command once for each of the following IAM roles: role/storage.objectViewer, role/logging.logWriter, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Umgebung vorbereiten

In dieser Anleitung verwenden Sie Cloud Shell zum Verwalten von Ressourcen, die in Google Cloud gehostet werden. Die Software, die Sie für diese Anleitung benötigen, ist in Cloud Shell vorinstalliert, einschließlich kubectl, gcloud CLI, Helm und Terraform.

So richten Sie Ihre Umgebung mit Cloud Shell ein:

  1. Starten Sie eine Cloud Shell-Sitzung über die Google Cloud Console. Klicken Sie dazu in der Google Cloud Console auf Symbol für die Cloud Shell-Aktivierung Cloud Shell aktivieren. Dadurch wird im unteren Bereich der Google Cloud Console eine Sitzung gestartet.

  2. Legen Sie Umgebungsvariablen fest:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    Ersetzen Sie PROJECT_ID: Ihre Google Cloud mit Ihrer Projekt-ID.

  3. Klonen Sie das GitHub-Repository:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. Wechseln Sie in das Arbeitsverzeichnis:

    cd kubernetes-engine-samples/streaming
    

Clusterinfrastruktur erstellen

In diesem Abschnitt führen Sie ein Terraform-Skript aus, um einen privaten, hochverfügbaren regionalen GKE-Cluster zu erstellen. Die folgenden Schritte ermöglichen einen öffentlichen Zugriff auf die Steuerungsebene. Erstellen Sie einen privaten Cluster, um den Zugriff einzuschränken.

Sie können den Operator mit einem Standard- oder Autopilot-Cluster installieren.

Standard

Das folgende Diagramm zeigt einen privaten regionalen Standard-GKE-Cluster, der in drei verschiedenen Zonen bereitgestellt wird:

Führen Sie die folgenden Befehle in der Cloud Shell aus, um diese Infrastruktur bereitzustellen:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Geben Sie bei Aufforderung yes ein. Es kann einige Minuten dauern, bis dieser Befehl abgeschlossen ist und der Cluster den Status „Bereit“ anzeigt.

Terraform erstellt die folgenden Ressourcen:

  • Ein VPC-Netzwerk und ein privates Subnetz für die Kubernetes-Knoten.
  • Ein Router für den Zugriff auf das Internet über NAT.
  • Ein privater GKE-Cluster in der Region us-central1.
  • 2 Knotenpools mit aktiviertem Autoscaling (1-2 Knoten pro Zone, mindestens 1 Knoten pro Zone)
  • Ein ServiceAccount mit Logging- und Monitoring-Berechtigungen.
  • Backup for GKE zur Notfallwiederherstellung.
  • Google Cloud Managed Service for Prometheus für das Clustermonitoring.

Die Ausgabe sieht in etwa so aus:

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Autopilot

Das folgende Diagramm zeigt einen privaten regionalen Autopilot-GKE-Cluster:

Führen Sie die folgenden Befehle in der Cloud Shell aus, um die Infrastruktur bereitzustellen:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Geben Sie bei Aufforderung yes ein. Es kann einige Minuten dauern, bis dieser Befehl abgeschlossen ist und der Cluster den Status „Bereit“ anzeigt.

Terraform erstellt die folgenden Ressourcen:

  • VPC-Netzwerk und privates Subnetz für die Kubernetes-Knoten.
  • Ein Router für den Zugriff auf das Internet über NAT.
  • Ein privater GKE-Cluster in der Region us-central1.
  • Ein ServiceAccount mit Logging- und Monitoring-Berechtigungen.
  • Google Cloud Managed Service for Prometheus für das Clustermonitoring.

Die Ausgabe sieht in etwa so aus:

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Mit dem Cluster verbinden

Konfigurieren Sie kubectl für die Kommunikation mit dem Cluster:

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

Stellen Sie den CFK-Operator in Ihrem Cluster bereit:

In diesem Abschnitt stellen Sie den CFK-Operator (Confluent for Kubernetes) mit einem Helm-Diagramm bereit und stellen dann einen Kafka-Cluster bereit.

  1. Fügen Sie das Confluent Helm-Diagramm-Repository hinzu:

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. Fügen Sie einen Namespace für den CFK-Operator und den Kafka-Cluster hinzu:

    kubectl create ns kafka
    
  3. Stellen Sie den CFK-Cluster-Operator mit Helm bereit:

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    Damit CFK Ressourcen in allen Namespaces verwalten kann, fügen Sie den Parameter --set-namespaced=false zum Helm-Befehl hinzu.

  4. Prüfen Sie mit Helm, ob der Confluent-Operator erfolgreich bereitgestellt wurde:

    helm ls -n kafka
    

    Die Ausgabe sieht in etwa so aus:

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

Kafka bereitstellen

In diesem Abschnitt stellen Sie Kafka in einer grundlegenden Konfiguration bereit und testen dann verschiedene erweiterte Konfigurationsszenarien, um Anforderungen an Verfügbarkeit, Sicherheit und Beobachtbarkeit zu erfüllen.

Grundlegende Konfiguration

Die grundlegende Konfiguration für die Kafka-Instanz umfasst die folgenden Komponenten:

  • Drei Replikate von Kafka-Brokern, wobei mindestens zwei verfügbare Replikate für die Clusterkonsistenz erforderlich sind.
  • Drei Replikate von ZooKeeper-Knoten, die einen Cluster bilden.
  • Zwei Kafka-Listener: einer ohne Authentifizierung und einer mit TLS-Authentifizierung mit einem von CFK generierten Zertifikat.
  • Java MaxHeapSize und MinHeapSize sind für Kafka auf 4 GB festgelegt.
  • CPU-Ressourcenzuweisung von 1 CPU-Anfrage und 2 CPU-Limits sowie 5 GB Arbeitsspeicheranfragen und -limits für Kafka (4 GB für den Hauptdienst und 0,5 GB für den Messwertexporter) und 3 GB für Zookeeper (2 GB für den Hauptdienst und 0,5 GB für den Messwertexporter).
  • 100 GB Speicher für jeden Pod unter Verwendung der premium-rwo storageClass, 100 für Kafka Data und 90/10 für Zookeeper Data/Log.
  • Toleranzen, nodeAffinities und podAntiAffinities für jede Arbeitslast, wodurch eine ordnungsgemäße Verteilung zwischen Knoten gewährleistet wird. Dabei werden die entsprechenden Knotenpools und verschiedene Zonen verwendet.
  • Kommunikation im Cluster, die durch selbst signierte Zertifikate mit einer von Ihnen bereitgestellten Zertifizierungsstelle gesichert ist.

Diese Konfiguration stellt die minimale Einrichtung dar, die zum Erstellen eines produktionsfertigen Kafka-Clusters erforderlich ist. In den folgenden Abschnitten werden benutzerdefinierte Konfigurationen zu Aspekten wie Clustersicherheit, Access Control Lists (ACLs), Themenverwaltung, Zertifikatsverwaltung und mehr vorgestellt.

Einfachen Kafka-Cluster erstellen

  1. Generieren Sie ein CA-Paar:

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    Confluent für Kubernetes bietet automatisch generierte Zertifikate für Confluent Platform-Komponenten, die für die TLS-Netzwerkverschlüsselung verwendet werden können. Sie müssen eine Zertifizierungsstelle (CA) generieren und bereitstellen.

  2. Erstellen Sie ein Kubernetes-Secret für die Zertifizierungsstelle:

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    Der Name des Secrets ist vordefinierte

  3. Erstellen Sie mithilfe der grundlegenden Konfiguration einen neuen Kafka-Cluster:

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    Mit diesem Befehl werden eine benutzerdefinierte Kafka-Ressource und eine benutzerdefinierte Zookeeper-Ressource des CFK-Operators erstellt, die CPU- und Speicheranfragen und -limits, Blockspeicheranfragen sowie Markierungen und Affinitäten enthält, um die bereitgestellten Pods auf Kubernetes-Knoten zu verteilen.

  4. Warten Sie einige Minuten, während Kubernetes die erforderlichen Arbeitslasten startet:

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. Prüfen Sie, ob die Kafka-Arbeitslasten erstellt wurden:

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    Die Ausgabe sieht in etwa so aus:

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

Der Operator erstellt die folgenden Ressourcen:

  • Zwei StatefulSets für Kafka und ZooKeeper.
  • Drei Pods für Kafka-Broker-Replikate.
  • Drei Pods für ZooKeeper-Replikate.
  • Zwei PodDisruptionBudget-Ressourcen, die maximal ein nicht verfügbares Replikat für die Clusterkonsistenz gewährleisten.
  • Der Dienst my-cluster, der als Bootstrap-Server für Kafka-Clients dient, die eine Verbindung innerhalb des Kubernetes-Clusters herstellen. In diesem Dienst sind alle internen Kafka-Listener verfügbar.
  • Der Dienst zookeeper, mit dem Kafka-Broker eine Verbindung zu ZooKeeper-Knoten als Clients herstellen können.

Authentifizierung und Nutzerverwaltung

In diesem Abschnitt erfahren Sie, wie Sie die Authentifizierung und Autorisierung aktivieren, um Kafka-Listener zu sichern und Anmeldedaten mit Clients zu teilen.

Confluent für Kubernetes unterstützt verschiedene Authentifizierungsmethoden für Kafka, z. B.:

  • SASL/PLAIN-Authentifizierung: Clients verwenden für die Authentifizierung einen Nutzernamen und ein Passwort. Der Nutzername und das Passwort werden serverseitig in einem Kubernetes-Secret gespeichert.
  • SASL/PLAIN mit LDAP-Authentifizierung: Clients verwenden einen Nutzernamen und ein Passwort für die Authentifizierung. Die Anmeldedaten werden auf einem LDAP-Server gespeichert.
  • mTLS-Authentifizierung: Clients verwenden TLS-Zertifikate für die Authentifizierung.

Beschränkungen

  • CFK bietet keine benutzerdefinierten Ressourcen für die Nutzerverwaltung. Sie können jedoch Anmeldedaten in Secrets speichern und in den Spezifikationen des Listeners auf sie verweisen.
  • Obwohl es keine benutzerdefinierte Ressource zum direkten Verwalten von ACLs gibt, bietet das offizielle Confluent for Kubernetes eine Anleitung zum Konfigurieren von ACLs über die Kafka-Befehlszeile.

Nutzer erstellen

In diesem Abschnitt erfahren Sie, wie Sie einen CFK-Operator bereitstellen, der Funktionen zur Nutzerverwaltung zeigt, darunter:

  • Einen Kafka-Cluster mit passwortbasierter Authentifizierung (SASL/PLAIN) für einen der Listener
  • Ein KafkaTopic mit drei Replikaten
  • Nutzeranmeldedaten mit Lese- und Schreibberechtigungen
  1. Erstellen Sie ein Secret mit Nutzeranmeldedaten:

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    Anmeldedaten sollten im folgenden Format gespeichert werden:

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. Konfigurieren Sie den Kafka-Cluster so, dass ein Listener mit passwortbasierter Authentifizierung von SCRAM-SHA-512 an Port 9094 verwendet wird:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. Richten Sie ein Thema und einen Client-Pod ein, um mit Ihrem Kafka-Cluster zu interagieren und Kafka-Befehle auszuführen:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    GKE stellt das Secret my-user-credentials als Volume für den Client-Pod bereit.

  4. Wenn der Client-Pod bereit ist, stellen Sie eine Verbindung zu ihm her und erstellen und empfangen Sie Nachrichten mithilfe der bereitgestellten Anmeldedaten:

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. Erzeugt eine Nachricht mit den my-user-Anmeldedaten und verarbeitet die Nachricht dann, um den Empfang zu prüfen.

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    Die Ausgabe sieht in etwa so aus:

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    Geben Sie CTRL+C ein, um den Nutzervorgang zu beenden. Wenn Sie den Fehler Connect refused erhalten, warten Sie einige Minuten und versuchen Sie es dann noch einmal.

  6. Beenden Sie die Pod-Shell

    exit
    

Sicherungen und Notfallwiederherstellung

Mit dem Confluent-Operator können Sie effiziente Sicherungsstrategien anhand bestimmter Muster implementieren.

Sie können Backup for GKE verwenden, um Folgendes zu sichern:

  • Kubernetes-Ressourcenmanifeste.
  • Benutzerdefinierte API-Ressourcen von Confluent und ihre Definitionen, die vom Kubernetes API-Server des Clusters extrahiert wurden, der die Sicherung ausführt.
  • Volumes, die PersistentVolumeClaim-Ressourcen in den Manifesten entsprechen.

Weitere Informationen zum Sichern und Wiederherstellen von Kafka-Clustern mit Backup for GKE finden Sie unter Notfallwiederherstellung vorbereiten.

Sie können auch eine manuelle Sicherung Ihres Kafka-Clusters ausführen. Sichern Sie Folgendes:

  • Die Kafka-Konfiguration, die alle benutzerdefinierten Ressourcen der Confluent API enthält, z. B. KafkaTopics oder Connect
  • Die Daten, die in den PersistentVolumes der Kafka-Broker gespeichert werden

Das Speichern von Kubernetes-Ressourcenmanifesten, einschließlich Confluent-Konfigurationen, in Git-Repositories kann dazu führen, dass keine separate Sicherung für Kafka-Konfigurationen erforderlich ist, da die Ressourcen bei Bedarf auf einen neuen Kubernetes-Cluster angewendet werden können.

Um die Wiederherstellung von Kafka-Daten in Szenarien zu gewährleisten, in denen eine Kafka-Serverinstanz oder ein Kubernetes-Cluster, in dem Kafka bereitgestellt wird, verloren geht, sollten Sie die Kubernetes-Speicherklasse, die für die Bereitstellung von Volumes für Kafka-Broker verwendet wird, mit der Option reclaimPolicy mit dem Wert Retain konfigurieren. Außerdem empfehlen wir, Snapshots von Kafka-Broker-Volumes zu erstellen.

Das folgende Manifest beschreibt eine StorageClass, die die Option reclaimPolicy mit dem Wert Retain verwendet:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

Das folgende Beispiel zeigt die StorageClass, die der spec einer benutzerdefinierten Kafka-Clusterressource hinzugefügt wird:

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

Bei dieser Konfiguration werden PersistentVolumes, die mit der Speicherklasse bereitgestellt werden, nicht gelöscht, auch wenn der entsprechende PersistentVolumeClaim gelöscht wird.

So stellen Sie die Kafka-Instanz auf einem neuen Kubernetes-Cluster mithilfe der vorhandenen Konfigurations- und Broker-Instanzdaten wieder her:

  1. Wenden Sie die vorhandenen benutzerdefinierten Confluent-Ressourcen (Kafka, KafkaTopic, Zookeeper usw.) auf einen neuen Kubernetes-Cluster an.
  2. Aktualisieren Sie die PersistentVolumeClaims mit dem Namen der neuen Kafka-Broker-Instanzen auf die alten PersistentVolumes unter Verwendung des Attributs spec.volumeName im PersistentVolumeClaim.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Einzelne Ressourcen löschen

Wenn Sie ein vorhandenes Projekt verwendet haben und es nicht löschen möchten, können Sie die einzelnen Ressourcen löschen.

  1. Legen Sie Umgebungsvariablen fest:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    
  2. Führen Sie den Befehl terraform destroy aus:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID}   \
      -var region=${REGION}  \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
    

    Ersetzen Sie FOLDER durch gke-autopilot oder gke-standard.

    Geben Sie bei Aufforderung yes ein.

  3. Alle nicht angehängten Laufwerke suchen:

    export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
    
  4. Laufwerke löschen:

    for i in $disk_list; do
      disk_name=$(echo $i| cut -d'|' -f1)
      disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||')
      echo "Deleting $disk_name"
      gcloud compute disks delete $disk_name --zone $disk_zone --quiet
    done
    

Nächste Schritte

  • Referenzarchitekturen, Diagramme und Best Practices zu Google Cloud kennenlernen. Weitere Informationen zu Cloud Architecture Center