Kafka è un sistema di messaggistica distribuito open source di tipo publish-subscribe per la gestione di dati in streaming in tempo reale di elevato volume e con un'elevata velocità in transito. Puoi utilizzare Kafka per creare pipeline di dati in streaming che spostano i dati in modo affidabile tra diversi sistemi e applicazioni per l'elaborazione e l'analisi.
Questo tutorial è rivolto ad amministratori della piattaforma, architetti del cloud e professionisti delle operazioni interessati a eseguire il deployment di cluster Kafka a disponibilità elevata su Google Kubernetes Engine (GKE).
Obiettivi
In questo tutorial imparerai a:- Utilizza Terraform per creare un cluster GKE a livello di regione.
- Esegui il deployment di un cluster Kafka ad alta disponibilità.
- Esegui l'upgrade dei binari di Kafka.
- Esegui il backup e il ripristino del cluster Kafka.
- Simula l'interruzione del nodo GKE e il failover del broker Kafka.
Architettura
Questa sezione descrive l'architettura della soluzione che creerai in questo tutorial.
Un cluster Kafka è un gruppo di uno o più server (chiamati broker) che lavorano insieme per gestire i flussi di dati in entrata e la messaggistica publish-subscribe per i client Kafka (chiamati consumer).
Ogni partizione di dati in un cluster Kafka ha un broker leader e può avere uno o più broker follower. Il broker leader gestisce tutte le letture e le scritture nella partizione. Ogni broker follower replica passivamente il broker leader.
In una configurazione Kafka tipica, utilizzi anche un servizio open source chiamato ZooKeeper per coordinare i tuoi cluster Kafka. Questo servizio consente di eleggere un leader tra i broker e di attivare il failover in caso di errori.
In questo tutorial esegui il deployment dei cluster Kafka su GKE configurando i broker Kafka e il servizio Zookeeper come singoli StatefulSets. Per eseguire il provisioning di cluster Kafka ad alta disponibilità e prepararti al ripristino di emergenza, configurerai gli StatefulSet di Kafka e Zookeeper in modo che utilizzino pool di nodi e zone distinti.
Il seguente diagramma mostra come il StatefulSet Kafka viene eseguito su più nodi e zone nel cluster GKE.
Il seguente diagramma mostra come StatefulSet di Zookeeper viene eseguito su più nodi e zone nel cluster GKE.
Provisioning dei nodi e pianificazione dei pod
Se utilizzi i cluster Autopilot, Autopilot gestisce il provisioning dei nodi e la pianificazione dei pod per i tuoi carichi di lavoro. Utilizzerai l'anti-affinità dei pod per assicurarti che nessun altro pod dello stesso StatefulSet sia pianificato sullo stesso nodo e nella stessa zona.
Se utilizzi i cluster standard, devi configurare la tolleranza e l'affinità dei nodi dei pod. Per scoprire di più, consulta Isolare i workload in pool di nodi dedicati.
Costi
In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il Calcolatore prezzi.
Al termine delle attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la sezione Pulizia.
Prima di iniziare
Configura il progetto
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
Configurare i ruoli
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Configura l'ambiente
In questo tutorial utilizzerai Cloud Shell per gestire le risorse ospitate su Google Cloud. Cloud Shell è preinstallato con il software necessario per questo tutorial, tra cui Docker, kubectl
, gcloud CLI, Helm e Terraform.
Per configurare l'ambiente con Cloud Shell:
Avvia una sessione Cloud Shell dalla console Google Cloud facendo clic su Attiva Cloud Shell nella console Google Cloud. Viene avviata una sessione nel riquadro inferiore della console Google Cloud.
Imposta le variabili di ambiente.
export PROJECT_ID=PROJECT_ID export REGION=us-central1
Sostituisci i seguenti valori:
- PROJECT_ID: il tuo ID progetto Google Cloud.
Imposta le variabili di ambiente predefinite.
gcloud config set project PROJECT_ID
Clona il repository di codice.
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
Passa alla directory di lavoro.
cd kubernetes-engine-samples/streaming/gke-stateful-kafka
Crea l'infrastruttura del cluster
In questa sezione eseguirai uno script Terraform per creare due
cluster GKE regionali.
Il deployment del cluster principale verrà eseguito in us-central1
.
Per creare il cluster:
Autopilot
In Cloud Shell, esegui i seguenti comandi:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
Quando richiesto, digita yes
.
Standard
In Cloud Shell, esegui i seguenti comandi:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
Quando richiesto, digita yes
.
I file di configurazione Terraform creano le seguenti risorse per eseguire il deployment dell'infrastruttura:
- Crea un repository Artifact Registry per archiviare le immagini Docker.
- Crea la rete e la subnet VPC per l'interfaccia di rete della VM.
- Crea due cluster GKE.
Terraform crea un cluster privato nelle due regioni e attiva Backup per GKE per il ripristino di emergenza.
Esegui il deployment di Kafka nel cluster
In questa sezione eseguirai il deployment di Kafka su GKE utilizzando un grafico Helm. L'operazione crea le seguenti risorse:
- I StatefulSet di Kafka e Zookeeper.
- Un deployment di un esportatore Kafka. L'esportatore raccoglie le metriche di Kafka per il consumo di Prometheus.
- Un budget per l'interruzione dei pod (PDB) che limita il numero di pod offline durante un'interruzione volontaria.
Per utilizzare il grafico Helm per eseguire il deployment di Kafka:
Configura l'accesso a Docker.
gcloud auth configure-docker us-docker.pkg.dev
Compila Artifact Registry con le immagini Kafka e Zookeeper.
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
Configura l'accesso alla riga di comando
kubectl
al cluster principale.gcloud container clusters get-credentials gke-kafka-us-central1 \ --region=${REGION} \ --project=${PROJECT_ID}
Crea uno spazio dei nomi.
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
Installa Kafka utilizzando la versione 20.0.6 del grafico Helm.
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
L'output è simile al seguente:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
Verifica che le repliche di Kafka siano in esecuzione (l'operazione potrebbe richiedere alcuni minuti).
kubectl get all -n kafka
L'output è simile al seguente:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
Creare dati di test
In questa sezione, testerai l'applicazione Kafka e genererai messaggi.
Crea un pod client consumer per interagire con l'applicazione Kafka.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
Crea un argomento denominato
topic1
con tre partizioni e un fattore di replica di tre.kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Verifica che le partizioni degli argomenti siano replicate su tutti e tre i broker.
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
L'output è simile al seguente:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Nell'output di esempio, tieni presente che
topic1
ha tre partizioni, ciascuna con un leader e un insieme di repliche diversi. Questo perché Kafka utilizza la suddivisione in parti per distribuire i dati su più broker, consentendo una maggiore scalabilità e tolleranza di errore. Il fattore di replica pari a tre garantisce che ogni partizione abbia tre repliche, in modo che i dati siano ancora disponibili anche se uno o due broker si arrestano in modo anomalo.Esegui il seguente comando per generare collettivamente i numeri di messaggio in
topic1
.ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
Esegui questo comando per consumare
topic1
da tutte le partizioni.kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
Digita
CTRL+C
per interrompere il processo consumer.
Benchmark Kafka
Per creare un modello accurato di un caso d'uso, puoi eseguire una simulazione del carico previsto sul cluster. Per testare le prestazioni, utilizzerai gli strumenti inclusi nel pacchetto Kafka, ovvero gli script kafka-producer-perf-test.sh
e kafka-consumer-perf-test.sh
nella cartella bin
.
Crea un argomento per il benchmarking.
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Creare un carico sul cluster Kafka.
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
Il produttore genererà 10.000.000 di record il giorno
topic-benchmark
. L'output è simile al seguente:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
Una volta inviati tutti i record, dovresti visualizzare altre metriche nell'output, simili alle seguenti:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
Per uscire dallo smartwatch, digita
CTRL + C
.Esci dalla shell del pod.
exit
Gestire gli upgrade
Gli aggiornamenti delle versioni di Kafka e Kubernetes vengono rilasciati con una frequenza regolare. Segui le best practice operative per eseguire regolarmente l'upgrade del tuo ambiente software.
Pianifica gli upgrade dei file binari di Kafka
In questa sezione aggiornerai l'immagine Kafka utilizzando Helm e verificherai che i temi siano ancora disponibili.
Per eseguire l'upgrade dalla versione precedente di Kafka dal grafico Helm che hai utilizzato in Eseguire il deployment di Kafka sul cluster, segui questi passaggi:
Compila Artifact Registry con la seguente immagine:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
Esegui questi passaggi per eseguire il deployment di un grafico Helm con le immagini Kafka e Zookeeper di cui è stato eseguito l'upgrade. Per indicazioni specifiche per la versione, consulta le istruzioni di Kafka per gli upgrade delle versioni.
- Aggiorna la versione della dipendenza
Chart.yaml
:
../scripts/chart.sh kafka 20.1.0
Esegui il deployment del grafico Helm con le nuove immagini Kafka e Zookeeper, come mostrato nell'esempio seguente:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
Guarda l'upgrade dei pod Kafka:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
Per uscire dallo smartwatch, digita
CTRL + C
.- Aggiorna la versione della dipendenza
Connettiti al cluster Kafka utilizzando un pod client.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
Verifica di poter accedere ai messaggi da
topic1
.kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
L'output dovrebbe mostrare i messaggi generati dal passaggio precedente. Digita
CTRL+C
per uscire dalla procedura.Esci dal pod del client.
exit
Prepararsi al ripristino di emergenza
Per assicurarti che i tuoi carichi di lavoro di produzione rimangano disponibili in caso di un evento che interrompa il servizio, devi preparare un piano di ripristino di emergenza (RE). Per scoprire di più sulla pianificazione del RE, consulta la Guida alla pianificazione del ripristino di emergenza.
Per eseguire il backup e il ripristino dei carichi di lavoro sui cluster GKE, puoi utilizzare Backup per GKE.
Esempio di scenario di backup e ripristino di Kafka
In questa sezione, esegui un backup del cluster da gke-kafka-us-central1
e ripristinalo in gke-kafka-us-west1
. Eseguirai l'operazione di backup
e ripristino nell'ambito dell'applicazione utilizzando la
ProtectedApplication
risorsa personalizzata.
Il seguente diagramma illustra i componenti della soluzione di ripristino di emergenza e la loro interazione.
Per prepararti a eseguire il backup e il ripristino del cluster Kafka:
Configura le variabili di ambiente.
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
Verifica che il cluster sia in uno stato
RUNNING
.gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
Crea un piano di backup.
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
Creare manualmente un backup. Sebbene i backup pianificati siano in genere regolati dalla pianificazione cron nel piano di backup, l'esempio seguente mostra come avviare un'operazione di backup una tantum.
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
Crea un piano di ripristino.
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
Ripristina manualmente da un backup.
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
Guarda l'applicazione ripristinata nel cluster di backup. Potrebbero essere necessari alcuni minuti prima che tutti i pod siano in esecuzione e pronti.
gcloud container clusters get-credentials gke-kafka-us-west1 \ --region us-west1 kubectl get pod -n kafka --watch
Digita
CTRL+C
per uscire dallo smartwatch quando tutti i pod sono attivi e in esecuzione.Verifica che gli argomenti precedenti possano essere recuperati da un consumatore.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
L'output è simile al seguente:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
Digita
CTRL+C
per uscire dalla procedura.Esci dal pod.
exit
Simula un'interruzione del servizio Kafka
In questa sezione simulerai un errore del nodo sostituendo un nodo Kubernetes che ospita il broker. Questa sezione si applica solo a Standard. Autopilot gestisce i tuoi nodi per te, pertanto non è possibile simulare un errore del nodo.
Crea un pod client per connetterti all'applicazione Kafka.
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
Crea l'argomento
topic-failover-test
e genera traffico di test.kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Determina quale broker è il leader per l'argomento
topic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
L'output è simile al seguente:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Nell'output riportato sopra,
Leader: 1
indica che il leader pertopic-failover-test
è il broker 1. Corrisponde al podkafka-1
.Apri un nuovo terminale e connettiti allo stesso cluster.
gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
Trova il nodo su cui è in esecuzione il pod
kafka-1
.kubectl get pod -n kafka kafka-1 -o wide
L'output è simile al seguente:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
Nell'output riportato sopra, puoi vedere che il pod
kafka-1
è in esecuzione sul nodogke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.Scarica il nodo per sfollare i pod.
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
Sostituisci NODE con il pod del nodo su cui è in esecuzione kafka-1. In questo esempio, il nodo è
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.L'output è simile al seguente:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
Trova il nodo su cui è in esecuzione il pod
kafka-1
.kubectl get pod -n kafka kafka-1 -o wide
L'output dovrebbe essere simile al seguente:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
Dall'output riportato sopra, puoi vedere che l'applicazione è in esecuzione su un nuovo nodo.
Nel terminale collegato al pod
kafka-client
, determina quale broker è il leader pertopic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
L'output dovrebbe essere simile al seguente:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
Nell'output di esempio, il leader è ancora 1 . Tuttavia, ora è in esecuzione su un nuovo nodo.
Test dell'errore del leader Kafka
In Cloud Shell, connettiti al client Kafka e utilizza
describe
per visualizzare il leader eletto per ogni partizione intopic1
.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
L'output è simile al seguente:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
In Cloud Shell non connesso al client Kafka, elimina il broker leader
kafka-0
per forzare l'elezione di un nuovo leader. Devi eliminare l'indice che mappa a uno dei leader nell'output precedente.kubectl delete pod -n kafka kafka-0 --force
L'output è simile al seguente:
pod "kafka-0" force deleted
In Cloud Shell, connettiti al client Kafka e utilizza
describe
per visualizzare il leader eletto.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
L'output è simile al seguente:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
Nell'output, il nuovo leader per ogni partizione cambia, se è stato assegnato al leader interrotto (
kafka-0
). Ciò indica che il leader originale è stato sostituito quando il pod è stato eliminato e ricreato.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.
Elimina il progetto
Il modo più semplice per evitare la fatturazione è eliminare il progetto che hai creato per il tutorial.
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Passaggi successivi
- Per un servizio di messaggistica completamente gestito e scalabile, consulta Eseguire la migrazione da Kafka a Pub/Sub.