In dieser Anleitung erfahren Sie, wie Sie mit dem Operator Confluent for Kubernetes (CFK) Apache Kafka in Google Kubernetes Engine (GKE) bereitstellen.
Kafka ist ein verteiltes Open Source-Messaging-System für die Verarbeitung von großen Datenmengen mit hohem Durchsatz und Echtzeitstreaming. Mit Kafka können Sie Streamingdaten-Pipelines erstellen, die Daten zuverlässig zwischen verschiedenen Systemen und Anwendungen für die Verarbeitung und Analyse verschieben.
Dieser Leitfaden richtet sich an Plattformadministratoren, Cloud-Architekten und Betriebsexperten, die an der Bereitstellung von Kafka-Clustern in GKE interessiert sind.
Sie können auch den CFK-Operator verwenden, um andere Komponenten der Confluent Platform bereitzustellen, z. B. das webbasierte Confluent Control-Center, Schema Registry oder KsqlDB. In diesem Leitfaden geht es jedoch nur um Kafka-Bereitstellungen.
Ziele
- GKE-Infrastruktur für Apache Kafka planen und bereitstellen
- CFK-Operator bereitstellen und konfigurieren
- Apache Kafka mit dem CFK-Operator konfigurieren, um Verfügbarkeit, Sicherheit, Beobachtbarkeit und Leistung zu gewährleisten
Vorteile
CFK bietet folgende Vorteile:
- Automatisierte Rolling Updates für Konfigurationsänderungen.
- Automatisierte Rolling Upgrades ohne Auswirkungen auf die Kafka-Verfügbarkeit.
- Wenn ein Fehler auftritt, stellt CFK einen Kafka-Pod mit derselben Kafka-Broker-ID, Konfiguration und nichtflüchtigen Speicher-Volumes wieder her.
- Automatisiertes Rack-Awareness, um Replikate einer Partition auf verschiedene Racks (oder Zonen) zu verteilen, die Verfügbarkeit von Kafka-Brokern zu verbessern und das Risiko eines Datenverlusts zu begrenzen.
- Unterstützung für den Export aggregierter Messwerte nach Prometheus.
Bereitstellungsarchitektur
Jede Datenpartition in einem Kafka-Cluster hat einen Leader-Broker und kann einen oder mehrere Follower-Broker haben. Der Leader-Broker verarbeitet alle Lese- und Schreibvorgänge in der Partition. Jeder Follower-Broker repliziert den Leader-Broker passiv.
In einer typischen Kafka-Einrichtung verwenden Sie auch einen Open Source-Dienst namens ZooKeeper, um Ihre Kafka-Cluster zu koordinieren. Dieser Dienst hilft Ihnen, einen Leader unter den Brokern zu wählen und bei Fehlern ein Failover auszulösen.
Sie können die Kafka-Konfiguration auch ohne Zookeeper bereitstellen. Aktivieren Sie dazu den KRaft-Modus. Diese Methode wird jedoch aufgrund der fehlenden Unterstützung für KafkaTopic-Ressourcen und der Authentifizierung mit Anmeldeinformationen nicht als produktionsreif angesehen.
Verfügbarkeit und Notfallwiederherstellung
In dieser Anleitung werden separate Knotenpools und Zonen für Cluster von Kafka und ZooKeeper verwendet, um die Hochverfügbarkeit sicherzustellen und sich auf Notfallwiederherstellung vorzubereiten.
Hochverfügbare Kubernetes-Cluster in Google Cloud basieren auf regionalen Clustern, die sich über mehrere Knoten und Verfügbarkeitszonen erstrecken. Diese Konfiguration verbessert Fehlertoleranz, Skalierbarkeit und geografische Redundanz. Mit dieser Konfiguration können Sie auch Rolling Updates und Wartungsaufgaben durchführen und gleichzeitig SLAs für Betriebszeit und Verfügbarkeit bereitstellen. Weitere Informationen finden Sie unter Regionale Cluster.
Bereitstellungsdiagramm
Das folgende Diagramm zeigt einen Kafka-Cluster, der auf mehreren Knoten und Zonen in einem GKE-Cluster ausgeführt wird:
Im Diagramm wird das Kafka-StatefulSet auf drei Knoten in drei verschiedenen Zonen bereitgestellt. Sie können diese Konfiguration steuern. Legen Sie dazu die erforderlichen Regeln für Pod-Affinität und Topologieverteilung auf die benutzerdefinierte Ressourcenspezifikation Kafka
fest.
Wenn bei der empfohlenen Konfiguration eine Zone ausfällt, plant GKE die Pods auf neue Knoten um und repliziert die Daten von den verbleibenden Replikaten, sowohl für Kafka als auch für Zookeeper.
Das folgende Diagramm zeigt ein ZooKeeper-StatefulSet
, das auf drei Knoten in drei verschiedenen Zonen bereitgestellt wird:
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Vorbereitung
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:
gcloud services enable compute.googleapis.com
iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:
gcloud services enable compute.googleapis.com
iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com -
Grant roles to your user account. Run the following command once for each of the following IAM roles:
role/storage.objectViewer, role/logging.logWriter, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Umgebung vorbereiten
In dieser Anleitung verwenden Sie Cloud Shell zum Verwalten von Ressourcen, die in Google Cloud gehostet werden. Die Software, die Sie für diese Anleitung benötigen, ist in Cloud Shell vorinstalliert, einschließlich kubectl
, gcloud CLI, Helm und Terraform.
So richten Sie Ihre Umgebung mit Cloud Shell ein:
Starten Sie eine Cloud Shell-Sitzung über die Google Cloud Console. Klicken Sie dazu in der Google Cloud Console auf Cloud Shell aktivieren. Dadurch wird im unteren Bereich der Google Cloud Console eine Sitzung gestartet.
Legen Sie Umgebungsvariablen fest:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
Ersetzen Sie
PROJECT_ID
: Ihre Google Cloud mit Ihrer Projekt-ID.Klonen Sie das GitHub-Repository:
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
Wechseln Sie in das Arbeitsverzeichnis:
cd kubernetes-engine-samples/streaming
Clusterinfrastruktur erstellen
In diesem Abschnitt führen Sie ein Terraform-Skript aus, um einen privaten, hochverfügbaren regionalen GKE-Cluster zu erstellen. Die folgenden Schritte ermöglichen einen öffentlichen Zugriff auf die Steuerungsebene. Erstellen Sie einen privaten Cluster, um den Zugriff einzuschränken.
Sie können den Operator mit einem Standard- oder Autopilot-Cluster installieren.
Standard
Das folgende Diagramm zeigt einen privaten regionalen Standard-GKE-Cluster, der in drei verschiedenen Zonen bereitgestellt wird:
Führen Sie die folgenden Befehle in der Cloud Shell aus, um diese Infrastruktur bereitzustellen:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Geben Sie bei Aufforderung yes
ein. Es kann einige Minuten dauern, bis dieser Befehl abgeschlossen ist und der Cluster den Status „Bereit“ anzeigt.
Terraform erstellt die folgenden Ressourcen:
- Ein VPC-Netzwerk und ein privates Subnetz für die Kubernetes-Knoten.
- Ein Router für den Zugriff auf das Internet über NAT.
- Ein privater GKE-Cluster in der Region
us-central1
. - 2 Knotenpools mit aktiviertem Autoscaling (1-2 Knoten pro Zone, mindestens 1 Knoten pro Zone)
- Ein
ServiceAccount
mit Logging- und Monitoring-Berechtigungen. - Backup for GKE zur Notfallwiederherstellung.
- Google Cloud Managed Service for Prometheus für das Clustermonitoring.
Die Ausgabe sieht in etwa so aus:
...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
Autopilot
Das folgende Diagramm zeigt einen privaten regionalen Autopilot-GKE-Cluster:
Führen Sie die folgenden Befehle in der Cloud Shell aus, um die Infrastruktur bereitzustellen:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Geben Sie bei Aufforderung yes
ein. Es kann einige Minuten dauern, bis dieser Befehl abgeschlossen ist und der Cluster den Status „Bereit“ anzeigt.
Terraform erstellt die folgenden Ressourcen:
- VPC-Netzwerk und privates Subnetz für die Kubernetes-Knoten.
- Ein Router für den Zugriff auf das Internet über NAT.
- Ein privater GKE-Cluster in der Region
us-central1
. - Ein
ServiceAccount
mit Logging- und Monitoring-Berechtigungen. - Google Cloud Managed Service for Prometheus für das Clustermonitoring.
Die Ausgabe sieht in etwa so aus:
...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
Mit dem Cluster verbinden
Konfigurieren Sie kubectl
für die Kommunikation mit dem Cluster:
gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}
Stellen Sie den CFK-Operator in Ihrem Cluster bereit:
In diesem Abschnitt stellen Sie den CFK-Operator (Confluent for Kubernetes) mit einem Helm-Diagramm bereit und stellen dann einen Kafka-Cluster bereit.
Fügen Sie das Confluent Helm-Diagramm-Repository hinzu:
helm repo add confluentinc https://packages.confluent.io/helm
Fügen Sie einen Namespace für den CFK-Operator und den Kafka-Cluster hinzu:
kubectl create ns kafka
Stellen Sie den CFK-Cluster-Operator mit Helm bereit:
helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
Damit CFK Ressourcen in allen Namespaces verwalten kann, fügen Sie den Parameter
--set-namespaced=false
zum Helm-Befehl hinzu.Prüfen Sie mit Helm, ob der Confluent-Operator erfolgreich bereitgestellt wurde:
helm ls -n kafka
Die Ausgabe sieht in etwa so aus:
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION confluent-operator kafka 1 2023-07-07 10:57:45.409158 +0200 CEST deployed confluent-for-kubernetes-0.771.13 2.6.0
Kafka bereitstellen
In diesem Abschnitt stellen Sie Kafka in einer grundlegenden Konfiguration bereit und testen dann verschiedene erweiterte Konfigurationsszenarien, um Anforderungen an Verfügbarkeit, Sicherheit und Beobachtbarkeit zu erfüllen.
Grundlegende Konfiguration
Die grundlegende Konfiguration für die Kafka-Instanz umfasst die folgenden Komponenten:
- Drei Replikate von Kafka-Brokern, wobei mindestens zwei verfügbare Replikate für die Clusterkonsistenz erforderlich sind.
- Drei Replikate von ZooKeeper-Knoten, die einen Cluster bilden.
- Zwei Kafka-Listener: einer ohne Authentifizierung und einer mit TLS-Authentifizierung mit einem von CFK generierten Zertifikat.
- Java MaxHeapSize und MinHeapSize sind für Kafka auf 4 GB festgelegt.
- CPU-Ressourcenzuweisung von 1 CPU-Anfrage und 2 CPU-Limits sowie 5 GB Arbeitsspeicheranfragen und -limits für Kafka (4 GB für den Hauptdienst und 0,5 GB für den Messwertexporter) und 3 GB für Zookeeper (2 GB für den Hauptdienst und 0,5 GB für den Messwertexporter).
- 100 GB Speicher für jeden Pod unter Verwendung der
premium-rwo
storageClass, 100 für Kafka Data und 90/10 für Zookeeper Data/Log. - Toleranzen, nodeAffinities und podAntiAffinities für jede Arbeitslast, wodurch eine ordnungsgemäße Verteilung zwischen Knoten gewährleistet wird. Dabei werden die entsprechenden Knotenpools und verschiedene Zonen verwendet.
- Kommunikation im Cluster, die durch selbst signierte Zertifikate mit einer von Ihnen bereitgestellten Zertifizierungsstelle gesichert ist.
Diese Konfiguration stellt die minimale Einrichtung dar, die zum Erstellen eines produktionsfertigen Kafka-Clusters erforderlich ist. In den folgenden Abschnitten werden benutzerdefinierte Konfigurationen zu Aspekten wie Clustersicherheit, Access Control Lists (ACLs), Themenverwaltung, Zertifikatsverwaltung und mehr vorgestellt.
Einfachen Kafka-Cluster erstellen
Generieren Sie ein CA-Paar:
openssl genrsa -out ca-key.pem 2048 openssl req -new -key ca-key.pem -x509 \ -days 1000 \ -out ca.pem \ -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
Confluent für Kubernetes bietet automatisch generierte Zertifikate für Confluent Platform-Komponenten, die für die TLS-Netzwerkverschlüsselung verwendet werden können. Sie müssen eine Zertifizierungsstelle (CA) generieren und bereitstellen.
Erstellen Sie ein Kubernetes-Secret für die Zertifizierungsstelle:
kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
Der Name des Secrets ist vordefinierte
Erstellen Sie mithilfe der grundlegenden Konfiguration einen neuen Kafka-Cluster:
kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
Mit diesem Befehl werden eine benutzerdefinierte Kafka-Ressource und eine benutzerdefinierte Zookeeper-Ressource des CFK-Operators erstellt, die CPU- und Speicheranfragen und -limits, Blockspeicheranfragen sowie Markierungen und Affinitäten enthält, um die bereitgestellten Pods auf Kubernetes-Knoten zu verteilen.
Warten Sie einige Minuten, während Kubernetes die erforderlichen Arbeitslasten startet:
kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
Prüfen Sie, ob die Kafka-Arbeitslasten erstellt wurden:
kubectl get pod,svc,statefulset,deploy,pdb -n kafka
Die Ausgabe sieht in etwa so aus:
NAME READY STATUS RESTARTS AGE pod/confluent-operator-864c74d4b4-fvpxs 1/1 Running 0 49m pod/my-cluster-0 1/1 Running 0 17m pod/my-cluster-1 1/1 Running 0 17m pod/my-cluster-2 1/1 Running 0 17m pod/zookeeper-0 1/1 Running 0 18m pod/zookeeper-1 1/1 Running 0 18m pod/zookeeper-2 1/1 Running 0 18m NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/confluent-operator ClusterIP 10.52.13.164 <none> 7778/TCP 49m service/my-cluster ClusterIP None <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-0-internal ClusterIP 10.52.2.242 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-1-internal ClusterIP 10.52.7.98 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-2-internal ClusterIP 10.52.4.226 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/zookeeper ClusterIP None <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-0-internal ClusterIP 10.52.8.52 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-1-internal ClusterIP 10.52.12.44 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-2-internal ClusterIP 10.52.12.134 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m NAME READY AGE statefulset.apps/my-cluster 3/3 17m statefulset.apps/zookeeper 3/3 18m NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/confluent-operator 1/1 1 1 49m NAME MIN AVAILABLE MAX UNAVAILABLE ALLOWED DISRUPTIONS AGE poddisruptionbudget.policy/my-cluster N/A 1 1 17m poddisruptionbudget.policy/zookeeper N/A 1 1 18m
Der Operator erstellt die folgenden Ressourcen:
- Zwei StatefulSets für Kafka und ZooKeeper.
- Drei Pods für Kafka-Broker-Replikate.
- Drei Pods für ZooKeeper-Replikate.
- Zwei
PodDisruptionBudget
-Ressourcen, die maximal ein nicht verfügbares Replikat für die Clusterkonsistenz gewährleisten. - Der Dienst
my-cluster
, der als Bootstrap-Server für Kafka-Clients dient, die eine Verbindung innerhalb des Kubernetes-Clusters herstellen. In diesem Dienst sind alle internen Kafka-Listener verfügbar. - Der Dienst
zookeeper
, mit dem Kafka-Broker eine Verbindung zu ZooKeeper-Knoten als Clients herstellen können.
Authentifizierung und Nutzerverwaltung
In diesem Abschnitt erfahren Sie, wie Sie die Authentifizierung und Autorisierung aktivieren, um Kafka-Listener zu sichern und Anmeldedaten mit Clients zu teilen.
Confluent für Kubernetes unterstützt verschiedene Authentifizierungsmethoden für Kafka, z. B.:
- SASL/PLAIN-Authentifizierung: Clients verwenden für die Authentifizierung einen Nutzernamen und ein Passwort. Der Nutzername und das Passwort werden serverseitig in einem Kubernetes-Secret gespeichert.
- SASL/PLAIN mit LDAP-Authentifizierung: Clients verwenden einen Nutzernamen und ein Passwort für die Authentifizierung. Die Anmeldedaten werden auf einem LDAP-Server gespeichert.
- mTLS-Authentifizierung: Clients verwenden TLS-Zertifikate für die Authentifizierung.
Beschränkungen
- CFK bietet keine benutzerdefinierten Ressourcen für die Nutzerverwaltung. Sie können jedoch Anmeldedaten in Secrets speichern und in den Spezifikationen des Listeners auf sie verweisen.
- Obwohl es keine benutzerdefinierte Ressource zum direkten Verwalten von ACLs gibt, bietet das offizielle Confluent for Kubernetes eine Anleitung zum Konfigurieren von ACLs über die Kafka-Befehlszeile.
Nutzer erstellen
In diesem Abschnitt erfahren Sie, wie Sie einen CFK-Operator bereitstellen, der Funktionen zur Nutzerverwaltung zeigt, darunter:
- Einen Kafka-Cluster mit passwortbasierter Authentifizierung (SASL/PLAIN) für einen der Listener
- Ein
KafkaTopic
mit drei Replikaten - Nutzeranmeldedaten mit Lese- und Schreibberechtigungen
Erstellen Sie ein Secret mit Nutzeranmeldedaten:
export USERNAME=my-user export PASSWORD=$(openssl rand -base64 12) kubectl create secret generic my-user-credentials -n kafka \ --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
Anmeldedaten sollten im folgenden Format gespeichert werden:
{ "username1": "password1", "username2": "password2", ... "usernameN": "passwordN" }
Konfigurieren Sie den Kafka-Cluster so, dass ein Listener mit passwortbasierter Authentifizierung von SCRAM-SHA-512 an Port 9094 verwendet wird:
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
Richten Sie ein Thema und einen Client-Pod ein, um mit Ihrem Kafka-Cluster zu interagieren und Kafka-Befehle auszuführen:
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
GKE stellt das Secret
my-user-credentials
als Volume für den Client-Pod bereit.Wenn der Client-Pod bereit ist, stellen Sie eine Verbindung zu ihm her und erstellen und empfangen Sie Nachrichten mithilfe der bereitgestellten Anmeldedaten:
kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka kubectl exec -it kafkacat -n kafka -- /bin/sh
Erzeugt eine Nachricht mit den
my-user
-Anmeldedaten und verarbeitet die Nachricht dann, um den Empfang zu prüfen.export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2) export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4) echo "Message from my-user" |kcat \ -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -P kcat -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -C
Die Ausgabe sieht in etwa so aus:
Message from my-user % Reached end of topic my-topic [1] at offset 1 % Reached end of topic my-topic [2] at offset 0 % Reached end of topic my-topic [0] at offset 0
Geben Sie
CTRL+C
ein, um den Nutzervorgang zu beenden. Wenn Sie den FehlerConnect refused
erhalten, warten Sie einige Minuten und versuchen Sie es dann noch einmal.Beenden Sie die Pod-Shell
exit
Sicherungen und Notfallwiederherstellung
Mit dem Confluent-Operator können Sie effiziente Sicherungsstrategien anhand bestimmter Muster implementieren.
Sie können Backup for GKE verwenden, um Folgendes zu sichern:
- Kubernetes-Ressourcenmanifeste.
- Benutzerdefinierte API-Ressourcen von Confluent und ihre Definitionen, die vom Kubernetes API-Server des Clusters extrahiert wurden, der die Sicherung ausführt.
- Volumes, die PersistentVolumeClaim-Ressourcen in den Manifesten entsprechen.
Weitere Informationen zum Sichern und Wiederherstellen von Kafka-Clustern mit Backup for GKE finden Sie unter Notfallwiederherstellung vorbereiten.
Sie können auch eine manuelle Sicherung Ihres Kafka-Clusters ausführen. Sichern Sie Folgendes:
- Die Kafka-Konfiguration, die alle benutzerdefinierten Ressourcen der Confluent API enthält, z. B.
KafkaTopics
oderConnect
- Die Daten, die in den PersistentVolumes der Kafka-Broker gespeichert werden
Das Speichern von Kubernetes-Ressourcenmanifesten, einschließlich Confluent-Konfigurationen, in Git-Repositories kann dazu führen, dass keine separate Sicherung für Kafka-Konfigurationen erforderlich ist, da die Ressourcen bei Bedarf auf einen neuen Kubernetes-Cluster angewendet werden können.
Um die Wiederherstellung von Kafka-Daten in Szenarien zu gewährleisten, in denen eine Kafka-Serverinstanz oder ein Kubernetes-Cluster, in dem Kafka bereitgestellt wird, verloren geht, sollten Sie die Kubernetes-Speicherklasse, die für die Bereitstellung von Volumes für Kafka-Broker verwendet wird, mit der Option reclaimPolicy
mit dem Wert Retain
konfigurieren. Außerdem empfehlen wir, Snapshots von Kafka-Broker-Volumes zu erstellen.
Das folgende Manifest beschreibt eine StorageClass, die die Option reclaimPolicy
mit dem Wert Retain
verwendet:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
Das folgende Beispiel zeigt die StorageClass, die der spec
einer benutzerdefinierten Kafka-Clusterressource hinzugefügt wird:
...
spec:
...
dataVolumeCapacity: 100Gi
storageClass:
name: premium-rwo-retain
Bei dieser Konfiguration werden PersistentVolumes, die mit der Speicherklasse bereitgestellt werden, nicht gelöscht, auch wenn der entsprechende PersistentVolumeClaim gelöscht wird.
So stellen Sie die Kafka-Instanz auf einem neuen Kubernetes-Cluster mithilfe der vorhandenen Konfigurations- und Broker-Instanzdaten wieder her:
- Wenden Sie die vorhandenen benutzerdefinierten Confluent-Ressourcen (
Kafka
,KafkaTopic
,Zookeeper
usw.) auf einen neuen Kubernetes-Cluster an. - Aktualisieren Sie die PersistentVolumeClaims mit dem Namen der neuen Kafka-Broker-Instanzen auf die alten PersistentVolumes unter Verwendung des Attributs
spec.volumeName
im PersistentVolumeClaim.
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Projekt löschen
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Wenn Sie ein vorhandenes Projekt verwendet haben und es nicht löschen möchten, können Sie die einzelnen Ressourcen löschen.
Legen Sie Umgebungsvariablen fest:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
Führen Sie den Befehl
terraform destroy
aus:export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token) terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID} \ -var region=${REGION} \ -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Ersetzen Sie
FOLDER
durchgke-autopilot
odergke-standard
.Geben Sie bei Aufforderung
yes
ein.Alle nicht angehängten Laufwerke suchen:
export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
Laufwerke löschen:
for i in $disk_list; do disk_name=$(echo $i| cut -d'|' -f1) disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||') echo "Deleting $disk_name" gcloud compute disks delete $disk_name --zone $disk_zone --quiet done
Nächste Schritte
- Referenzarchitekturen, Diagramme und Best Practices zu Google Cloud kennenlernen. Weitere Informationen zu Cloud Architecture Center