Implementa un clúster de Kafka con alta disponibilidad en GKE


Kafka es un sistema de mensajería de publicación y suscripción distribuido y de código abierto para administrar datos de transmisión de gran volumen, alta capacidad de procesamiento y tiempo real. Puedes usar Kafka para compilar canalizaciones de datos de transmisión que muevan datos de manera confiable en diferentes sistemas y aplicaciones para su procesamiento y análisis.

Este instructivo está dirigido a los administradores de plataformas, arquitectos de nube y profesionales de operaciones interesados en implementar clústeres de Kafka con alta disponibilidad en Google Kubernetes Engine (GKE).

Objetivos

En este instructivo aprenderás realizar las siguientes tareas:

  • Usar Terraform para crear un clúster de GKE regional.
  • Implementa un clúster de Kafka con alta disponibilidad.
  • Actualizar los objetos binarios de Kafka.
  • Crear una copia de seguridad del clúster de Kafka y restablecerla.
  • Simular la interrupción de los nodos de GKE y la conmutación por error del agente de Kafka

Arquitectura

En esta sección, se describe la arquitectura de la solución que compilarás en este instructivo.

Un clúster de Kafka es un grupo de uno o más servidores (llamados agentes) que trabajan juntos para manejar transmisiones de datos entrantes y mensajes de publicación y suscripción de clientes de Kafka (llamados consumidores).

Cada partición de datos en un clúster de Kafka tiene un agente líder y puede tener uno o más agentes de seguidores. El agente líder maneja todas las operaciones de lectura y escritura en la partición. Cada agente de seguidores replica de forma pasiva el agente líder.

En una configuración típica de Kafka, también usas un servicio de código abierto llamado ZooKeeper para coordinar tus clústeres de Kafka. Este servicio ayuda a elegir un líder entre los agentes y activar la conmutación por error en caso de fallas.

En este instructivo, implementarás los clústeres de Kafka en GKE mediante la configuración de los agentes de Kafka y el servicio de Zookeeper como StatefulSets individuales. A fin de aprovisionar clústeres de Kafka con alta disponibilidad y prepararte para la recuperación ante desastres, configurarás los StatefulSets de Kafka y Zookeeper a fin de usar grupos y zonas de nodos independientes.

En el siguiente diagrama, se muestra cómo se ejecuta el StatefulSet de Kafka en varios nodos y zonas en tu clúster de GKE.

En el diagrama, se muestra una arquitectura de ejemplo de un StatefulSet de Kafka en GKE implementado en varias zonas.
Figura 1: Implementa tu StatefulSet de Kafka en nodos de GKE en tres zonas diferentes.

En el siguiente diagrama, se muestra cómo se ejecuta el StatefulSet de Zookeeper en varios nodos y zonas de tu clúster de GKE.

En el diagrama, se muestra un ejemplo de arquitectura de un StatefulSet de Zookeeper en GKE implementado en varias zonas.
Figura 2: Implementa tu Zookeeper de Kafka en nodos de GKE en tres zonas diferentes.

Aprovisionamiento de nodos y programación de Pods

Si usas clústeres de Autopilot, Autopilot controla los nodos de aprovisionamiento y programa los Pods para tus cargas de trabajo. Usarás la antiafinidad de Pods para asegurarte de que no haya dos Pods del mismo StatefulSet en el mismo nodo y la misma zona.

Si usas clústeres estándar, deberás configurar la tolerancia del Pod y la afinidad de nodos. Para obtener más información, consulta Aísla tus cargas de trabajo en grupos de nodos dedicados.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

Configurar tu proyecto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

Configura funciones

  1. Grant roles to your user account. Run the following command once for each of the following IAM roles: role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    $ gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Configure su entorno

En este instructivo, usarás Cloud Shell para administrar recursos alojados en Google Cloud. Cloud Shell tiene preinstalado el software que necesitarás para este instructivo, incluido Docker, kubectl y la CLI de gcloud, Helm y Terraform.

Para configurar tu entorno con Cloud Shell, sigue estos pasos:

  1. Para iniciar una sesión de Cloud Shell desde la consola de Google Cloud, haz clic en Ícono de activación de Cloud ShellActivar Cloud Shell en la consola de Google Cloud. Esto inicia una sesión en el panel inferior de la consola de Google Cloud.

  2. Configurar variables de entorno

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    

    Reemplaza los siguientes valores:

  3. Configura las variables de entorno predeterminadas.

    gcloud config set project PROJECT_ID
    
  4. Clona el repositorio de código.

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  5. Cambia al directorio de trabajo.

    cd kubernetes-engine-samples/streaming/gke-stateful-kafka
    

Crea la infraestructura del clúster

En esta sección, ejecutarás una secuencia de comandos de Terraform para crear dos clústeres regionales de GKE. El clúster principal se implementará en us-central1.

Sigue estos pasos para crear el clúster:

Autopilot

En Cloud Shell, ejecute los siguientes comandos:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

Cuando se te solicite, escribe yes.

Standard

En Cloud Shell, ejecute los siguientes comandos:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

Cuando se te solicite, escribe yes.

Los archivos de configuración de Terraform crean los siguientes recursos para implementar la infraestructura:

  • Crea un repositorio de Artifact Registry para almacenar las imágenes de Docker.
  • Crea la red de VPC y la subred para la interfaz de red de la VM.
  • Crea dos clústeres de GKE.

Terraform crea un clúster privado en las dos regiones y habilita la Copia de seguridad para GKE para la recuperación ante desastres.

Implementa Kafka en tu clúster

En esta sección, implementarás Kafka en GKE con un gráfico de Helm. La operación crea los siguientes recursos:

Para usar el gráfico de Helm para implementar Kafka, sigue estos pasos:

  1. Configura el acceso a Docker.

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Propaga Artifact Registry con las imágenes de Kafka y Zookeeper.

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. Configura el acceso mediante la línea de comandos de kubectl al clúster principal.

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --region=${REGION} \
        --project=${PROJECT_ID}
    
  4. Crea un espacio de nombres.

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Instala Kafka con la versión 20.0.6 del gráfico de Helm.

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    El resultado es similar al siguiente:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Verifica que las réplicas de Kafka estén en ejecución (esto puede tardar unos minutos).

    kubectl get all -n kafka
    

    El resultado es similar a este:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

Cree datos de prueba

En esta sección, probarás la aplicación de Kafka y generarás mensajes.

  1. Crea un Pod de cliente para el consumidor para interactuar con la aplicación Kafka.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. Crea un tema llamado topic1 con tres particiones y un factor de replicación de tres.

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Verifica que las particiones del tema se repliquen en los tres agentes.

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado es similar a este:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    En el resultado de ejemplo, observa que topic1 tiene tres particiones, cada una con un líder y un conjunto de réplicas diferentes. Esto se debe a que Kafka usa la partición para distribuir los datos en varios agentes, lo que permite una mayor escalabilidad y tolerancia a errores. El factor de replicación de tres garantiza que cada partición tenga tres réplicas, por lo que los datos aún estarán disponibles, incluso si uno o dos agentes fallan.

  4. Ejecuta el siguiente comando para generar números de mensajes de forma masiva en topic1.

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. Ejecuta el siguiente comando para consumir topic1 de todas las particiones.

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    Escribe CTRL+C para detener el proceso del consumidor.

Comparativa de Kafka

Para modelar con precisión un caso práctico, puedes ejecutar una simulación de la carga esperada en el clúster. Para probar el rendimiento, usa las herramientas incluidas en el paquete de Kafka, es decir, las secuencias de comandos kafka-producer-perf-test.sh y kafka-consumer-perf-test.sh en la carpeta bin.

  1. Crea un tema para realizar comparativas.

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Crea una carga en el clúster de Kafka.

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    El productor generará 10,000,000 registros en topic-benchmark. La salida es similar a lo siguiente:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    Una vez que se hayan enviado todos los registros, deberías ver métricas adicionales en el resultado, similares a las siguientes:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    Para salir de la visualización, escribe CTRL + C.

  3. Sal de la shell del Pod.

    exit
    

Administra las actualizaciones

Las actualizaciones de versión para Kafka y Kubernetes se lanzan con regularidad. Sigue las prácticas recomendadas operativas para actualizar el entorno de software con regularidad.

Planifica las actualizaciones binarias de Kafka

En esta sección, actualizarás la imagen de Kafka con Helm y verificarás que tus temas sigan disponibles.

Para actualizar desde la versión anterior de Kafka del gráfico de Helm que usaste en Implementa Kafka en tu clúster, sigue estos pasos:

  1. Propaga Artifact Registry con la siguiente imagen:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. Realiza estos pasos para implementar un gráfico de Helm con las imágenes actualizadas de Kafka y Zookeeper. Si deseas obtener orientación específica de la versión, consulta las instrucciones de Kafka para actualizaciones de versión.

    1. Actualiza la versión de dependencia Chart.yaml:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. Implementa el gráfico de Helm con las imágenes nuevas de Kafka y Zookeeper, como se muestra en el siguiente ejemplo:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Mira cómo se actualizan los Pods de Kafka:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    Para salir de la visualización, escribe CTRL + C.

  3. Conéctate al clúster de Kafka con un Pod de cliente.

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. Verifica que puedas acceder a los mensajes de topic1.

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado debe mostrar los mensajes generados del paso anterior. Escribe CTRL+C para salir del proceso.

  5. Sal del Pod del cliente.

    exit
    

Prepárate para la recuperación ante desastres

Para asegurarte de que las cargas de trabajo de producción permanezcan disponibles en caso de un evento que interrumpa el servicio, debes preparar un plan de recuperación ante desastres (DR). Para obtener más información sobre la planificación de DR, consulta la Guía de planificación de recuperación ante desastres.

Para crear una copia de seguridad de tus cargas de trabajo en clústeres de GKE y restablecerlas, puedes usar Copia de seguridad para GKE.

Ejemplo de situación de copia de seguridad y restablecimiento de Kafka

En esta sección, realizarás una copia de seguridad de tu clúster desde gke-kafka-us-central1 y restablecerás la copia de seguridad en gke-kafka-us-west1. Realizarás la operación de copia de seguridad y restablecimiento en el alcance de la aplicación con el recurso personalizado ProtectedApplication.

En el siguiente diagrama, se ilustran los componentes de la solución de recuperación ante desastres y cómo se relacionan entre sí.

En el diagrama, se muestra un ejemplo de solución de copia de seguridad y recuperación para un clúster de Kafka con alta disponibilidad.
Figura 3: Ejemplo de solución de copia de seguridad y recuperación para un clúster de Kafka con alta disponibilidad.

A fin de prepararte para crear una copia de seguridad y restablecer tu clúster de Kafka, sigue estos pasos:

  1. Configura las variables de entorno.

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. Verifica que el clúster esté en estado RUNNING.

    gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
    
  3. Crear un plan de creación de copias de seguridad.

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. Crea una copia de seguridad de forma manual. Si bien las copias de seguridad programadas, por lo general, se rigen por el programa cron en el plan de copia de seguridad, en el siguiente ejemplo, se muestra cómo iniciar una operación de copia de seguridad única.

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. Crea un plan de restablecimiento.

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. Realiza un restablecimiento manual desde una copia de seguridad.

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. Mira cómo aparece la aplicación restablecida en el clúster de la copia de seguridad. Es posible que todos los Pods tarden unos minutos en ejecutarse y estar listos.

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --region us-west1
    kubectl get pod -n kafka --watch
    

    Escribe CTRL+C para salir del reloj cuando todos los Pods estén en funcionamiento.

  8. Verifica que un consumidor pueda recuperar los temas anteriores.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    El resultado es similar a este:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    Escribe CTRL+C para salir del proceso.

  9. Sal del Pod.

    exit
    

Simula una interrupción del servicio de Kafka

En esta sección, simularás una falla del nodo mediante el reemplazo de un nodo de Kubernetes que aloja el agente. Esta sección se aplica solo a la versión Estándar. Autopilot administra tus nodos, por lo que no se puede simular la falla de nodos.

  1. Crea un Pod de cliente para conectarte a la aplicación de Kafka.

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. Crea el tema topic-failover-test y produce tráfico de prueba.

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Determina qué agente es el líder del tema topic-failover-test.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado es similar a este:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    En el resultado anterior, Leader: 1 significa que el líder de topic-failover-test es el agente 1. Esto corresponde al Pod kafka-1.

  4. Abre una terminal nueva y conéctate al mismo clúster.

    gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
    
  5. Descubre en qué nodo de Pod se está ejecutando kafka-1.

    kubectl get pod -n kafka kafka-1 -o wide
    

    El resultado es similar a este:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    En el resultado anterior, verás que el Pod kafka-1 se ejecuta en el nodo gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

  6. Vacía el nodo para expulsar los Pods.

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    Reemplaza NODE por el Pod de nodo en el que se ejecuta kafka-1. En este ejemplo, el nodo es gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

    El resultado es similar a este:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. Descubre en qué nodo de Pod se está ejecutando kafka-1.

    kubectl get pod -n kafka kafka-1 -o wide
    

    El resultado debería ser similar al siguiente:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    En el resultado anterior, verás que la aplicación se ejecuta en un nodo nuevo.

  8. En la terminal conectada al Pod kafka-client, determina qué agente es el líder para topic-failover-test.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado debería ser similar al siguiente:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    En el resultado de ejemplo, el líder sigue siendo 1. Sin embargo, ahora se ejecuta en un nodo nuevo.

Prueba la falla del líder de Kafka

  1. En Cloud Shell, conéctate al cliente de Kafka y usa describe para ver el líder elegido de cada partición en topic1.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado es similar a este:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. En Cloud Shell que no está conectado al cliente de Kafka, borra el agente líder kafka-0 para forzar una nueva elección de líder. Debes borrar el índice que se asigna a uno de los líderes en el resultado anterior.

    kubectl delete pod -n kafka kafka-0 --force
    

    El resultado es similar a este:

    pod "kafka-0" force deleted
    
  3. En Cloud Shell que está conectado al cliente de Kafka, usa describe para ver el líder elegido.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado es similar a este:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    En el resultado, el nuevo líder para cada partición cambia, si se asignó al líder que se interrumpió (kafka-0). Esto indica que el líder original se reemplazó cuando se borró y se volvió a crear el Pod.

Realiza una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

La manera más fácil de evitar la facturación es borrar el proyecto que creaste para el instructivo.

Delete a Google Cloud project:

gcloud projects delete PROJECT_ID

¿Qué sigue?