Kafka 是一个开源分布式发布-订阅消息传递系统,用于处理大量、高吞吐量和实时的流式数据。您可以使用 Kafka 构建流式数据流水线,以在不同系统和应用之间可靠地移动数据来进行处理和分析。
本教程适用于想要在 Google Kubernetes Engine (GKE) 上部署高可用性 Kafka 集群的平台管理员、云架构师和运维专业人员。
目标
在本教程中,您将学习如何完成以下操作:- 使用 Terraform 创建区域级 GKE 集群。
- 部署高可用性 Kafka 集群。
- 升级 Kafka 二进制文件。
- 备份和恢复 Kafka 集群。
- 模拟 GKE 节点中断和 Kafka 代理故障切换。
架构
本部分介绍了您将在本教程中构建的解决方案的架构。
Kafka 集群是一组(包含一个或多个)服务器(称为“代理”),这些服务器协同工作来处理 Kafka 客户端(称为“使用方”)的传入数据流和发布-订阅消息传递。
Kafka 集群中的每个数据分区都有一个主要代理,并且可以包含一个或多个跟随代理。主要代理会处理对分区的所有读取和写入操作。每个跟随代理都会被动地复制主要代理。
在典型的 Kafka 设置中,您还需要使用一个名为 ZooKeeper 的开源服务来协调 Kafka 集群。此服务有助于在代理之间选举主要代理,并在发生故障时触发故障切换。
在本教程中,您需要将 Kafka 代理和 ZooKeeper 服务配置为单个 StatefulSets,从而在 GKE 上部署 Kafka 集群。为了预配高可用性 Kafka 集群并为灾难恢复做好准备,您需要将 Kafka 和 Zookeeper StatefulSet 配置为使用单独的节点池和可用区。
下图展示了 Kafka StatefulSet 如何在 GKE 集群中的多个节点和可用区上运行。
下图展示了 Zookeeper StatefulSet 如何在 GKE 集群中的多个节点和可用区上运行。
节点预配和 Pod 调度
如果您使用的是 Autopilot 集群,则 Autopilot 会为工作负载处理节点预配和 Pod 调度。您需要使用 Pod 反亲和性来确保不会在同一节点和同一可用区上调度同一 StatefulSet 的两个 Pod。
如果您使用的是 Standard 集群,则需要配置 Pod 容忍设置和节点亲和性。如需了解详情,请参阅在专用节点池中隔离工作负载。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
设置项目
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
设置角色
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin
$ gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
设置您的环境
在本教程中,您将使用 Cloud Shell 来管理 Google Cloud 上托管的资源。Cloud Shell 预安装有本教程所需的软件,包括 Docker、kubectl
、gcloud CLI、Helm 和 Terraform。
如需使用 Cloud Shell 设置您的环境,请按照以下步骤操作:
点击 Google Cloud 控制台中的 激活 Cloud Shell,从 Google Cloud 控制台启动 Cloud Shell 会话。此操作会在 Google Cloud 控制台的底部窗格中启动会话。
设置环境变量。
export PROJECT_ID=PROJECT_ID export REGION=us-central1
替换以下值:
- PROJECT_ID:您的 Google Cloud 项目 ID。
设置默认环境变量。
gcloud config set project PROJECT_ID
克隆代码库。
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
切换到工作目录。
cd kubernetes-engine-samples/streaming/gke-stateful-kafka
创建集群基础架构
在本部分中,您将运行 Terraform 脚本来创建两个区域级 GKE 集群。主要集群将部署在 us-central1
中。
如需创建集群,请运行以下命令:
Autopilot
在 Cloud Shell 中,运行以下命令:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
出现提示时,请输入 yes
。
Standard
在 Cloud Shell 中,运行以下命令:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
出现提示时,请输入 yes
。
Terraform 配置文件会创建以下资源来部署基础架构:
- 创建 Artifact Registry 制品库以存储 Docker 映像。
- 为虚拟机的网络接口创建 VPC 网络和子网。
- 创建两个 GKE 集群。
Terraform 会在两个区域中创建专用集群,并启用 Backup for GKE 以用于灾难恢复。
在集群上部署 Kafka
在本部分中,您将使用 Helm 图表在 GKE 上部署 Kafka。该操作会创建以下资源:
- Kafka 和 Zookeeper StatefulSet。
- Kafka 导出器部署。导出器会收集 Kafka 指标以供 Prometheus 使用。
- Pod 中断预算 (PDB),用于限制在主动中断期间离线 Pod 的数量。
如需使用 Helm 图表部署 Kafka,请按照以下步骤操作:
配置 Docker 访问权限。
gcloud auth configure-docker us-docker.pkg.dev
使用 Kafka 和 Zookeeper 映像填充 Artifact Registry。
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
配置对主要集群的
kubectl
命令行访问权限。gcloud container clusters get-credentials gke-kafka-us-central1 \ --region=${REGION} \ --project=${PROJECT_ID}
创建命名空间。
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
使用 Helm 图表 20.0.6 版安装 Kafka。
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
输出类似于以下内容:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
验证 Kafka 副本是否正在运行(这可能需要几分钟的时间)。
kubectl get all -n kafka
输出内容类似如下:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
创建测试数据
在本部分中,您将测试 Kafka 应用并生成消息。
创建与 Kafka 应用进行交互的使用方客户端 Pod。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
创建一个名为
topic1
的主题,其包含三个分区且复制因子为 3。kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
验证是否已在所有三个代理中复制主题分区。
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
在示例输出中,请注意
topic1
有三个分区,每个分区都有不同的主要代理和副本集。这是因为 Kafka 使用分区将数据分布到多个代理,从而实现更高的可伸缩性和容错能力。复制因子为 3 可确保每个分区都有三个副本,因此,即使一个或两个代理发生故障,数据也仍然可用。运行以下命令以将消息编号批量生成到
topic1
中。ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
运行以下命令以在所有分区中使用
topic1
。kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
按
CTRL+C
以停止使用方进程。
对 Kafka 进行基准化分析
如需准确为用例建模,您可以对集群上的预期负载运行模拟。如需测试性能,您需要使用 Kafka 软件包中包含的工具,即 bin
文件夹中的 kafka-producer-perf-test.sh
和 kafka-consumer-perf-test.sh
脚本。
创建一个用于基准测试的主题。
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
在 Kafka 集群上创建负载。
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
提供方会为
topic-benchmark
生成 1000 万条记录。输出内容类似如下:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
所有记录都发送后,您应该会在输出中看到其他指标,如下所示:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
如需退出监控,请按
CTRL + C
。退出 Pod shell。
exit
管理升级
Kafka 和 Kubernetes 的版本更新会定期发布。请遵循运营最佳实践来定期更新您的软件环境。
规划 Kafka 二进制文件升级
在本部分中,您将使用 Helm 更新 Kafka 映像,并验证主题是否仍然可用。
如需通过在集群上部署 Kafka 中使用的 Helm 图表从早期 Kafka 版本进行升级,请按照以下步骤操作:
使用以下映像填充 Artifact Registry:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
执行以下步骤以使用升级后的 Kafka 和 ZooKeeper 映像部署 Helm 图表。如需了解特定于版本的指导信息,请参阅有关版本升级的 Kafka 说明。
- 更新
Chart.yaml
依赖项版本:
../scripts/chart.sh kafka 20.1.0
使用新的 Kafka 和 Zookeeper 映像部署 Helm 图表,如以下示例所示:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
监控 Kafka Pod 是否升级:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
如需退出监控,请按
CTRL + C
。- 更新
使用客户端 Pod 连接到 Kafka 集群。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
验证您是否可以访问来自
topic1
的消息。kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出应显示上一步中生成的消息。按
CTRL+C
以退出进程。退出客户端 pod。
exit
为灾难恢复做好准备
为确保生产工作负载在发生服务中断事件时仍然可用,您应该准备灾难恢复 (DR) 规划。如需详细了解灾难恢复规划,请参阅灾难恢复规划指南。
如需在 GKE 集群上备份和恢复工作负载,您可以使用 Backup for GKE。
Kafka 备份和恢复场景示例
在本部分中,您将从 gke-kafka-us-central1
备份集群,并将备份恢复到 gke-kafka-us-west1
。您将使用 ProtectedApplication
自定义资源在应用范围内执行备份和恢复操作。
下图展示了灾难恢复解决方案的组件,以及相互之间的关系。
如需准备备份和恢复 Kafka 集群,请按照以下步骤操作:
设置环境变量。
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
验证集群是否处于
RUNNING
状态。gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
创建备份方案。
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
手动创建备份。虽然计划备份通常受备份方案中的 cron-schedule 约束,但以下示例展示了如何启动一次性备份操作。
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
创建恢复方案。
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
通过备份手动进行恢复。
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
监控恢复的应用是否出现在备份集群中。所有 Pod 可能需要几分钟时间才能运行并准备就绪。
gcloud container clusters get-credentials gke-kafka-us-west1 \ --region us-west1 kubectl get pod -n kafka --watch
按
CTRL+C
以在所有 Pod 已启动并运行时退出监控。验证使用方是否可以提取之前的主题。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
输出内容类似如下:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
按
CTRL+C
以退出进程。退出 Pod。
exit
模拟 Kafka 服务中断
在本部分中,您将通过替换托管代理的 Kubernetes 节点来模拟节点故障。本部分仅适用于 Standard。Autopilot 会为您管理节点,因此无法模拟节点故障。
创建客户端 Pod 以连接到 Kafka 应用。
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
创建主题
topic-failover-test
并生成测试流量。kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
确定哪个代理是
topic-failover-test
主题的主要代理。kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
在上面的输出中,
Leader: 1
表示topic-failover-test
的主要代理是代理 1。此代理对应于 Podkafka-1
。打开新终端并连接到同一集群。
gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
查找 Pod
kafka-1
在哪个节点上运行。kubectl get pod -n kafka kafka-1 -o wide
输出内容类似如下:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
在上面的输出中,您可以看到 Pod
kafka-1
在节点gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
上运行。排空该节点以逐出 Pod。
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
将 NODE 替换为 Pod kafka-1 在其上运行的节点。在此示例中,该节点为
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
。输出内容类似如下:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
查找 Pod
kafka-1
在哪个节点上运行。kubectl get pod -n kafka kafka-1 -o wide
输出应类似如下所示:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
从上面的输出中,您可以看到应用在新节点上运行。
在连接到
kafka-client
Pod 的终端中,确定哪个代理是topic-failover-test
的主要代理。kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出应类似如下所示:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
在示例输出中,主要代理仍为 1。不过,它当前在新的节点上运行。
测试 Kafka 主要代理故障
在 Cloud Shell 中,连接到 Kafka 客户端,然后使用
describe
查看为topic1
中的每个分区选举的主要代理。kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
在未连接到 Kafka 客户端的 Cloud Shell 中,删除
kafka-0
主要代理以强制选举新的主要代理。您应该删除映射到上述输出中的一个主要代理的索引。kubectl delete pod -n kafka kafka-0 --force
输出内容类似如下:
pod "kafka-0" force deleted
在连接到 Kafka 客户端的 Cloud Shell 中,使用
describe
查看选举的主要代理。kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
在输出中,如果每个分区的新主要代理已分配给中断的主要代理 (
kafka-0
),则新主要代理会发生更改。这表示删除并重新创建 Pod 时,原始主要代理已被替换。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
为避免支付费用,最简单的方法是删除您为本教程创建的项目。
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
后续步骤
- 如需了解全代管式可伸缩的消息传递服务,请参阅从 Kafka 迁移到 Pub/Sub。