在 GKE 上部署高可用性 Kafka 集群


Kafka 是一个开源分布式发布-订阅消息传递系统,用于处理大量、高吞吐量和实时的流式数据。您可以使用 Kafka 构建流式数据流水线,以在不同系统和应用之间可靠地移动数据来进行处理和分析。

本教程适用于想要在 Google Kubernetes Engine (GKE) 上部署高可用性 Kafka 集群的平台管理员、云架构师和运维专业人员。

目标

在本教程中,您将学习如何完成以下操作:

  • 使用 Terraform 创建区域级 GKE 集群。
  • 部署高可用性 Kafka 集群。
  • 升级 Kafka 二进制文件。
  • 备份和恢复 Kafka 集群。
  • 模拟 GKE 节点中断和 Kafka 代理故障切换。

架构

本部分介绍了您将在本教程中构建的解决方案的架构。

Kafka 集群是一组(包含一个或多个)服务器(称为“代理”),这些服务器协同工作来处理 Kafka 客户端(称为“使用方”)的传入数据流和发布-订阅消息传递。

Kafka 集群中的每个数据分区都有一个主要代理,并且可以包含一个或多个跟随代理。主要代理会处理对分区的所有读取和写入操作。每个跟随代理都会被动地复制主要代理。

在典型的 Kafka 设置中,您还需要使用一个名为 ZooKeeper 的开源服务来协调 Kafka 集群。此服务有助于在代理之间选举主要代理,并在发生故障时触发故障切换。

在本教程中,您需要将 Kafka 代理和 ZooKeeper 服务配置为单个 StatefulSet,从而在 GKE 上部署 Kafka 集群。如需预配高可用性 Kafka 集群并为灾难恢复做好准备,请将 Kafka 和 ZooKeeper StatefulSet 配置为使用单独的节点池可用区

下图展示了 Kafka StatefulSet 如何在 GKE 集群中的多个节点和可用区上运行。

图表展示了 GKE 上跨多个可用区部署的 Kafka StatefulSet 的示例架构。
图 1:在三个不同可用区中的 GKE 节点上部署 Kafka StatefulSet。

下图展示了 Zookeeper StatefulSet 如何在 GKE 集群中的多个节点和可用区上运行。

图表展示了 GKE 上跨多个可用区部署的 Zookeeper StatefulSet 的示例架构。
图 2:在三个不同可用区中的 GKE 节点上部署 Kafka Zookeeper。

节点预配和 Pod 调度

如果您使用的是 Autopilot 集群,则 Autopilot 会为工作负载处理节点预配和 Pod 调度。您需要使用 Pod 反亲和性来确保不会在同一节点和同一可用区上调度同一 StatefulSet 的两个 Pod。

如果您使用的是 Standard 集群,则需要配置 Pod 容忍设置节点亲和性。如需了解详情,请参阅在专用节点池中隔离工作负载

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

设置项目

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM API。

    启用 API

  5. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  6. 确保您的 Google Cloud 项目已启用结算功能

  7. 启用 Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM API。

    启用 API

设置角色

  1. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    $ gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。

设置您的环境

在本教程中,您将使用 Cloud Shell 来管理 Google Cloud 上托管的资源。Cloud Shell 预安装有本教程所需的软件,包括 Dockerkubectlgcloud CLIHelmTerraform

如需使用 Cloud Shell 设置您的环境,请按照以下步骤操作:

  1. 点击 Google Cloud 控制台中的 Cloud Shell 激活图标 激活 Cloud Shell,从 Google Cloud 控制台启动 Cloud Shell 会话。此操作会在 Google Cloud 控制台的底部窗格中启动会话。

  2. 设置环境变量。

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    

    替换以下值:

    • PROJECT_ID:您的 Google Cloud 项目 ID
  3. 设置默认环境变量。

    gcloud config set project PROJECT_ID
    
  4. 克隆代码库。

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  5. 切换到工作目录。

    cd kubernetes-engine-samples/streaming/gke-stateful-kafka
    

创建集群基础架构

在本部分中,您将运行 Terraform 脚本来创建两个区域级 GKE 集群。主要集群将部署在 us-central1 中。

如需创建集群,请运行以下命令:

Autopilot

在 Cloud Shell 中,运行以下命令:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

出现提示时,请输入 yes

标准版

在 Cloud Shell 中,运行以下命令:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

出现提示时,请输入 yes

Terraform 配置文件会创建以下资源来部署基础架构:

  • 创建 Artifact Registry 制品库以存储 Docker 映像。
  • 为虚拟机的网络接口创建 VPC 网络和子网。
  • 创建两个 GKE 集群。

Terraform 会在两个区域中创建专用集群,并启用 Backup for GKE 以用于灾难恢复。

在集群上部署 Kafka

在本部分中,您将使用 Helm 图表在 GKE 上部署 Kafka。该操作会创建以下资源:

  • Kafka 和 Zookeeper StatefulSet。
  • Kafka 导出器部署。导出器会收集 Kafka 指标以供 Prometheus 使用。
  • Pod 中断预算 (PDB),用于限制自愿中断期间的离线 Pod 数量。

如需使用 Helm 图表部署 Kafka,请按照以下步骤操作:

  1. 配置 Docker 访问权限。

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. 使用 Kafka 和 Zookeeper 映像填充 Artifact Registry。

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. 配置对主要集群的 kubectl 命令行访问权限。

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --region=${REGION} \
        --project=${PROJECT_ID}
    
  4. 创建命名空间。

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. 使用 Helm 图表 20.0.6 版安装 Kafka。

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    输出类似于以下内容:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. 验证 Kafka 副本是否正在运行(这可能需要几分钟的时间)。

    kubectl get all -n kafka
    

    输出内容类似如下:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

创建测试数据

在本部分中,您将测试 Kafka 应用并生成消息。

  1. 创建与 Kafka 应用进行交互的使用方客户端 Pod。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. 创建一个名为 topic1 的主题,其包含三个分区且复制因子为 3。

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 验证主题分区是否已在所有三个代理中复制。

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    在示例输出中,请注意 topic1 有三个分区,每个分区都有不同的主要代理和副本集。这是因为 Kafka 使用分区将数据分布到多个代理,从而实现更高的可伸缩性和容错能力。复制因子为 3 可确保每个分区都有三个副本,因此,即使一个或两个代理发生故障,数据也仍然可用。

  4. 运行以下命令以将消息编号批量生成到 topic1 中。

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. 运行以下命令以在所有分区中使用 topic1

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    CTRL+C 以停止使用方进程。

对 Kafka 进行基准化分析

如需准确为用例建模,您可以对集群上的预期负载运行模拟。如需测试性能,您需要使用 Kafka 软件包中包含的工具,即 bin 文件夹中的 kafka-producer-perf-test.shkafka-consumer-perf-test.sh 脚本。

  1. 创建一个用于基准化分析的主题。

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. 在 Kafka 集群上创建负载。

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    提供方会为 topic-benchmark 生成 1000 万条记录。输出内容类似如下:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    所有记录都发送后,您应该会在输出中看到其他指标,如下所示:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    如需退出监控,请按 CTRL + C

  3. 退出 Pod shell。

    exit
    

管理升级

Kafka 和 Kubernetes 的版本更新会定期发布。请遵循运营最佳实践来定期更新您的软件环境。

规划 Kafka 二进制文件升级

在本部分中,您将使用 Helm 更新 Kafka 映像,并验证主题是否仍然可用。

如需通过在集群上部署 Kafka 中使用的 Helm 图表从早期 Kafka 版本进行升级,请按照以下步骤操作:

  1. 使用以下映像填充 Artifact Registry:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. 执行以下步骤以使用升级后的 Kafka 和 ZooKeeper 映像部署 Helm 图表。如需了解特定于版本的指导信息,请参阅有关版本升级的 Kafka 说明

    1. 更新 Chart.yaml 依赖项版本:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. 使用新的 Kafka 和 Zookeeper 映像部署 Helm 图表,如以下示例所示:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    监控 Kafka Pod 是否升级:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    如需退出监控,请按 CTRL + C

  3. 使用客户端 Pod 连接到 Kafka 集群。

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. 验证您是否可以访问来自 topic1 的消息。

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出应显示上一步中生成的消息。按 CTRL+C 以退出进程。

  5. 退出客户端 Pod。

    exit
    

为灾难恢复做好准备

为确保生产工作负载在发生服务中断事件时仍然可用,您应该准备灾难恢复 (DR) 规划。如需详细了解灾难恢复规划,请参阅灾难恢复规划指南

如需在 GKE 集群上备份和恢复工作负载,您可以使用 Backup for GKE

Kafka 备份和恢复场景示例

在本部分中,您将从 gke-kafka-us-central1 备份集群,并将备份恢复到 gke-kafka-us-west1。您将使用 ProtectedApplication 自定义资源在应用范围内执行备份和恢复操作。

下图展示了灾难恢复解决方案的组件,以及相互之间的关系。

图表展示了高可用性 PostgreSQL 集群的备份和恢复解决方案示例。
图 3:高可用性 Kafka 集群的备份和恢复解决方案示例。

如需准备备份和恢复 Kafka 集群,请按照以下步骤操作:

  1. 设置环境变量。

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. 验证集群是否处于 RUNNING 状态。

    gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
    
  3. 创建备份方案。

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. 手动创建备份。虽然计划备份通常受备份方案中的 Cron 时间表约束,但以下示例展示了如何启动一次性备份操作。

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. 创建恢复方案。

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. 通过备份手动进行恢复。

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. 监控恢复的应用是否出现在备份集群中。所有 Pod 可能需要几分钟时间才能运行并准备就绪。

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --region us-west1
    kubectl get pod -n kafka --watch
    

    CTRL+C 以在所有 Pod 已启动并运行时退出监控。

  8. 验证使用方是否可以提取之前的主题。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    输出内容类似如下:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    CTRL+C 以退出进程。

  9. 退出 Pod。

    exit
    

模拟 Kafka 服务中断

在本部分中,您将通过替换托管代理的 Kubernetes 节点来模拟节点故障。本部分仅适用于 Standard。Autopilot 会为您管理节点,因此无法模拟节点故障。

  1. 创建客户端 Pod 以连接到 Kafka 应用。

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. 创建主题 topic-failover-test 并生成测试流量。

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 确定哪个代理是 topic-failover-test 主题的主要代理。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    在上面的输出中,Leader: 1 表示 topic-failover-test 的主要代理是代理 1。此代理对应于 Pod kafka-1

  4. 打开新终端并连接到同一集群。

    gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
    
  5. 查找 Pod kafka-1 在哪个节点上运行。

    kubectl get pod -n kafka kafka-1 -o wide
    

    输出内容类似如下:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    在上面的输出中,您可以看到 Pod kafka-1 在节点 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 上运行。

  6. 排空该节点以逐出 Pod。

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    NODE 替换为 Pod kafka-1 在其上运行的节点。在此示例中,该节点为 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72

    输出内容类似如下:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. 查找 Pod kafka-1 在哪个节点上运行。

    kubectl get pod -n kafka kafka-1 -o wide
    

    输出应类似如下所示:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    从上面的输出中,您可以看到应用在新节点上运行。

  8. 在连接到 kafka-client Pod 的终端中,确定哪个代理是 topic-failover-test 的主要代理。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出应类似如下所示:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    在示例输出中,主要代理仍为 1。不过,它现在在新节点上运行。

测试 Kafka 主要代理故障

  1. 在 Cloud Shell 中,连接到 Kafka 客户端,然后使用 describe 查看为 topic1 中的每个分区选举的主要代理。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. 在未连接到 Kafka 客户端的 Cloud Shell 中,删除 kafka-0 主要代理以强制选举新的主要代理。您应该删除映射到上述输出中的一个主要代理的索引。

    kubectl delete pod -n kafka kafka-0 --force
    

    输出内容类似如下:

    pod "kafka-0" force deleted
    
  3. 在连接到 Kafka 客户端的 Cloud Shell 中,使用 describe 查看选举的主要代理。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    在输出中,如果每个分区的新主要代理已分配给中断的主要代理 (kafka-0),则新主要代理会发生更改。这表示删除并重新创建 Pod 时,原始主要代理已被替换。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

为避免支付费用,最简单的方法是删除您为本教程创建的项目。

删除 Google Cloud 项目:

gcloud projects delete PROJECT_ID

后续步骤