本指南介绍如何使用 Confluent for Kubernetes (CFK) 操作器在 Google Kubernetes Engine (GKE) 上部署 Apache Kafka 集群。
Kafka 是一个开源分布式发布-订阅消息传递系统,用于处理大量、高吞吐量和实时的流式数据。您可以使用 Kafka 构建流式数据流水线,以在不同系统和应用之间可靠地移动数据来进行处理和分析。
本指南适用于有意在 GKE 上部署 Kafka 集群的平台管理员、云架构师和运营专家。
您还可以使用 CFK 操作器部署 Confluent Platform 的其他组件,例如基于网络的 Confluent 控制中心、Schema Registry 或 KsqlDB。但是,本指南仅重点介绍 Kafka 部署。
目标
- 为 Apache Kafka 规划和部署 GKE 基础架构
- 部署和配置 CFK 操作器
- 使用 CFK 操作器配置 Apache Kafka,以确保可用性、安全性、可观测性和性能
优势
CFK 具有以下优势:
- 配置更改可自动滚动更新。
- 自动进行滚动升级,不会影响 Kafka 可用性。
- 如果发生故障,CFK 将恢复具有相同 Kafka 代理 ID、配置和永久性存储卷的 Kafka Pod。
- 自动机架感知功能可将分区副本分布到不同的机架(或可用区),从而提高 Kafka 代理的可用性并限制数据丢失的风险。
- 支持将汇总指标导出到 Prometheus。
部署架构
Kafka 集群中的每个数据分区都有一个主要代理,并且可以包含一个或多个跟随代理。主要代理会处理对分区的所有读取和写入操作。每个跟随代理都会被动地复制主要代理。
在典型的 Kafka 设置中,您还需要使用一个名为 ZooKeeper 的开源服务来协调 Kafka 集群。此服务可在代理之间选举主要代理,并在发生故障时触发故障切换,从而提供帮助。
您也可以通过激活 KRaft 模式,在不使用 Zookeeper 的情况下部署 Kafka 配置,但由于不支持 KafkaTopic 资源和凭据身份验证,此方法不被视为适合生产环境。
可用性和灾难恢复
本教程为 Kafka 和 ZooKeeper 集群使用单独的节点池和可用区,以确保高可用性并为灾难恢复做好准备。
Google Cloud 中的高可用性 Kubernetes 集群依赖于跨多个节点和可用区的区域级集群。此配置可提高容错能力、可伸缩性和地理冗余。此配置还允许您执行滚动更新和维护,同时提供 SLA 以保证正常运行时间和可用性。如需了解详情,请参阅区域级集群。
部署图示
下图显示了在 GKE 集群中的多个节点和可用区上运行的 Kafka 集群:
在该图中,Kafka StatefulSet 部署在三个不同可用区的三个节点上。您可以通过在 Kafka
自定义资源规范上设置必需的 Pod 亲和性和拓扑扩展规则来控制此配置。
如果一个可用区发生故障,GKE 将使用推荐的配置在新节点上重新调度 Pod,并从剩余副本中复制 Kafka 和 Zookeeper 的数据。
下图显示了位于三个不同可用区的三个节点上的 ZooKeeper StatefulSet
:
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:
gcloud services enable compute.googleapis.com
iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:
gcloud services enable compute.googleapis.com
iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com -
Grant roles to your user account. Run the following command once for each of the following IAM roles:
role/storage.objectViewer, role/logging.logWriter, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
准备环境
在本教程中,您将使用 Cloud Shell 来管理 Google Cloud 上托管的资源。Cloud Shell 预安装了本教程所需的软件,包括 kubectl
、gcloud CLI、Helm 和 Terraform。
如需使用 Cloud Shell 设置您的环境,请按照以下步骤操作:
点击 Google Cloud 控制台中的 激活 Cloud Shell,从 Google Cloud 控制台启动 Cloud Shell 会话。此操作会在 Google Cloud 控制台的底部窗格中启动会话。
设置环境变量:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
替换
PROJECT_ID
:您的 Google Cloud 项目 ID。克隆 GitHub 代码库:
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
切换到工作目录:
cd kubernetes-engine-samples/streaming
创建集群基础架构
在本部分中,您将运行 Terraform 脚本以创建可用性高的专用区域级 GKE 集群。通过执行以下步骤,可以公开访问控制平面。如需限制访问权限,请创建专用集群。
您可以使用 Standard 或 Autopilot 集群安装操作器。
标准
下图显示了部署在三个不同可用区中的专用区域级 Standard GKE 集群:
若要部署此基础架构,请从 Cloud Shell 运行以下命令:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
出现提示时,请输入 yes
。完成此命令并使集群显示就绪状态可能需要几分钟时间。
Terraform 会创建以下资源:
- Kubernetes 节点的 VPC 网络和专用子网。
- 用于通过 NAT 访问互联网的路由器。
- 专用 GKE 集群(在
us-central1
区域中)。 - 2 个启用了自动扩缩功能的节点池(每个可用区 1-2 个节点,每个可用区至少 1 个节点)
- 具有日志记录和监控权限的
ServiceAccount
。 - 用于灾难恢复的 Backup for GKE
- Google Cloud Managed Service for Prometheus,用于监控集群。
输出类似于以下内容:
...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
Autopilot
下图显示了专用区域级 Autopilot GKE 集群:
若要部署此基础架构,请从 Cloud Shell 运行以下命令:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
出现提示时,请输入 yes
。完成此命令并使集群显示就绪状态可能需要几分钟时间。
Terraform 会创建以下资源:
- Kubernetes 节点的 VPC 网络和专用子网。
- 用于通过 NAT 访问互联网的路由器。
- 专用 GKE 集群(在
us-central1
区域中)。 - 具有日志记录和监控权限的
ServiceAccount
- Google Cloud Managed Service for Prometheus,用于监控集群。
输出类似于以下内容:
...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
连接到集群
配置 kubectl
以与集群通信:
gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}
将 CFK 操作器部署到您的集群
在本部分中,您将使用 Helm Chart 部署 Confluent for Kubernetes (CFK) 操作器,然后部署 Kafka 集群。
添加 Confluent Helm 图表库:
helm repo add confluentinc https://packages.confluent.io/helm
为 CFK 操作器和 Kafka 集群添加命名空间:
kubectl create ns kafka
使用 Helm 部署 CFK 集群操作器:
helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
要使 CFK 能够管理所有命名空间中的资源,请将参数
--set-namespaced=false
添加到 Helm 命令。使用 Helm 验证 Confluent 操作器是否已成功部署:
helm ls -n kafka
输出类似于以下内容:
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION confluent-operator kafka 1 2023-07-07 10:57:45.409158 +0200 CEST deployed confluent-for-kubernetes-0.771.13 2.6.0
部署 Kafka
在本部分中,您将在基本配置中部署 Kafka,然后尝试各种高级配置场景以满足可用性、安全性和可观测性要求。
基本配置
Kafka 实例的基本配置包括以下组成部分:
- Kafka 代理的三个副本,至少需要两个可用副本来实现集群一致性。
- ZooKeeper 节点的三个副本,形成一个集群。
- 两个 Kafka 监听器:一个不使用身份验证,另一个利用 TLS 身份验证(使用 CFK 生成的证书)。
- 对于 Kafka,Java MaxHeapSize 和 MinHeapSize 设置为 4 GB。
- CPU 资源分配:1 个 CPU 请求和 2 个 CPU 限制,同时 Kafka 的内存请求和限制为 5 GB(主服务为 4 GB,指标导出器为 0.5 GB),Zookeeper 为 3 GB(主服务为 2 GB,指标导出为 0.5 GB)。
- 使用
premium-rwo
storageClass 为每个 Pod 分配 100 GB 存储空间(Kafka 数据分配 100 GB,Zookeeper 数据/日志分别分配 90 GB/10 GB)。 - 为每个工作负载配置容忍、nodeAffinity 和 podAntiAffinity,利用其各自的节点池和不同的可用区,确保跨节点适当分布。
- 集群内通信(使用您提供的证书授权机构通过自签名证书提供保护)。
此配置表示创建可直接用于生产环境的 Kafka 集群所需的最少设置。以下部分演示的自定义配置可解决集群安全、访问控制列表 (ACL)、主题管理、证书管理等方面的需要。
创建基本 Kafka 集群
生成 CA 对:
openssl genrsa -out ca-key.pem 2048 openssl req -new -key ca-key.pem -x509 \ -days 1000 \ -out ca.pem \ -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
Confluent for Kubernetes 为 Confluent Platform 组件提供了自动生成的证书,以用于 TLS 网络加密。您必须生成并提供证书授权机构 (CA)。
为证书授权机构创建 Kubernetes Secret:
kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
Secret 的名称是预定义的
使用基本配置创建新的 Kafka 集群:
kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
此命令会创建 CFK 操作器的 Kafka 自定义资源和 Zookeeper 自定义资源,这些资源包含 CPU 和内存的请求与限制、块存储请求以及污点和亲和性,用于在 Kubernetes 节点之间分配预配的 Pod。
请等待几分钟,让 Kubernetes 启动所需的工作负载:
kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
验证是否已创建 Kafka 工作负载:
kubectl get pod,svc,statefulset,deploy,pdb -n kafka
输出类似于以下内容:
NAME READY STATUS RESTARTS AGE pod/confluent-operator-864c74d4b4-fvpxs 1/1 Running 0 49m pod/my-cluster-0 1/1 Running 0 17m pod/my-cluster-1 1/1 Running 0 17m pod/my-cluster-2 1/1 Running 0 17m pod/zookeeper-0 1/1 Running 0 18m pod/zookeeper-1 1/1 Running 0 18m pod/zookeeper-2 1/1 Running 0 18m NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/confluent-operator ClusterIP 10.52.13.164 <none> 7778/TCP 49m service/my-cluster ClusterIP None <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-0-internal ClusterIP 10.52.2.242 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-1-internal ClusterIP 10.52.7.98 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-2-internal ClusterIP 10.52.4.226 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/zookeeper ClusterIP None <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-0-internal ClusterIP 10.52.8.52 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-1-internal ClusterIP 10.52.12.44 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-2-internal ClusterIP 10.52.12.134 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m NAME READY AGE statefulset.apps/my-cluster 3/3 17m statefulset.apps/zookeeper 3/3 18m NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/confluent-operator 1/1 1 1 49m NAME MIN AVAILABLE MAX UNAVAILABLE ALLOWED DISRUPTIONS AGE poddisruptionbudget.policy/my-cluster N/A 1 1 17m poddisruptionbudget.policy/zookeeper N/A 1 1 18m
操作器创建以下资源:
- 用于 Kafka 和 ZooKeeper 的两个 StatefulSet。
- 用于 Kafka 代理副本的三个 Pod。
- 用于 ZooKeeper 副本的三个 Pod。
- 两个
PodDisruptionBudget
资源,确保最多只有一个不可用的副本以实现集群一致性。 - Service
my-cluster
,用作从 Kubernetes 集群内连接的 Kafka 客户端的引导服务器。所有内部 Kafka 监听器在此 Service 中都可用。 - Service
zookeeper
,允许 Kafka 代理作为客户端连接到 ZooKeeper 节点。
身份验证和用户管理
本部分介绍如何启用身份验证和授权,以保护 Kafka 监听器并与客户端共享凭据。
Confluent for Kubernetes 支持 Kafka 的各种身份验证方法,例如:
- SASL/PLAIN 身份验证:客户端使用用户名和密码进行身份验证。用户名和密码存储在服务器端的 Kubernetes Secret 中。
- 使用 LDAP 的 SASL/PLAIN 身份验证:客户端使用用户名和密码进行身份验证。凭据存储在 LDAP 服务器中。
- mTLS 身份验证:客户端使用 TLS 证书进行身份验证。
限制
- CFK 不为用户管理提供自定义资源。但是,您可以将凭据存储在 Secret 中,并在监听器规范中引用 Secret。
- 虽然没有直接管理 ACL 的自定义资源,但官方的 Confluent for Kubernetes 提供了有关使用 Kafka CLI 配置 ACL 的指导。
创建用户
本部分介绍如何部署可演示用户管理功能的 CFK 操作器,包括:
- 在一个监听器上启用了基于密码的身份验证 (SASL/PLAIN) 的 Kafka 集群
- 包含 3 个副本的
KafkaTopic
- 具有读写权限的用户凭据
创建包含用户凭据的 Secret:
export USERNAME=my-user export PASSWORD=$(openssl rand -base64 12) kubectl create secret generic my-user-credentials -n kafka \ --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
凭据应按以下格式存储:
{ "username1": "password1", "username2": "password2", ... "usernameN": "passwordN" }
将 Kafka 集群配置为在端口 9094 上使用监听器,该监听器使用基于密码的身份验证 SCRAM-SHA-512 身份验证:
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
设置主题和客户端 Pod,以便与 Kafka 集群进行交互并执行 Kafka 命令:
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
GKE 将 Secret
my-user-credentials
作为卷装载到客户端 Pod。客户端 Pod 准备就绪后,连接到它并开始使用提供的凭据生成和使用消息:
kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka kubectl exec -it kafkacat -n kafka -- /bin/sh
使用
my-user
凭据生成消息,然后使用该消息来验证其收据。export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2) export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4) echo "Message from my-user" |kcat \ -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -P kcat -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -C
输出类似于以下内容:
Message from my-user % Reached end of topic my-topic [1] at offset 1 % Reached end of topic my-topic [2] at offset 0 % Reached end of topic my-topic [0] at offset 0
按
CTRL+C
以停止使用方进程。如果您收到Connect refused
错误,请等待几分钟,然后重试。退出 Pod shell
exit
备份和灾难恢复
使用 Confluent 操作器,您可以按照某些模式实施高效的备份策略。
您可以使用 Backup for GKE 备份:
- Kubernetes 资源清单。
- 从正在备份的集群的 Kubernetes API 服务器中提取的 Confluent API 自定义资源及其定义。
- 与清单中找到的 PersistentVolumeClaim 资源相对应的卷。
如需详细了解如何使用 Backup for GKE 备份和恢复 Kafka 集群,请参阅为灾难恢复做好准备。
您还可以对 Kafka 集群执行手动备份。您应该备份:
- Kafka 配置,其中包含 Confluent API 的所有自定义资源,例如
KafkaTopics
或Connect
- 数据,存储在 Kafka 代理的 PersistentVolume 中
通过将 Kubernetes 资源清单(包括 Confluent 配置)存储在 Git 代码库中,无需单独备份 Kafka 配置,因为可以根据需要将资源重新应用于新的 Kubernetes 集群。
为了在 Kafka 服务器实例或部署了 Kafka 的 Kubernetes 集群丢失的情况下保护 Kafka 数据恢复,我们建议您配置用于为 Kafka 代理预配卷的 Kubernetes 存储类别,并将 reclaimPolicy
选项设置为 Retain
。我们还建议您截取 Kafka 代理卷的快照。
以下清单描述了使用 reclaimPolicy
选项 Retain
的 StorageClass:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
以下示例展示了 StorageClass 被添加到了 Kafka 集群自定义资源的 spec
中:
...
spec:
...
dataVolumeCapacity: 100Gi
storageClass:
name: premium-rwo-retain
使用此配置时,即使删除了相应的 PersistentVolumeClaim,使用该存储类别预配的 PersistentVolume 也不会被删除。
如需使用现有配置和代理实例数据在新的 Kubernetes 集群上恢复 Kafka 实例,请执行以下操作:
- 将现有 Confluent 自定义资源(
Kafka
、KafkaTopic
、Zookeeper
等)应用于新的 Kubernetes 集群 - 使用 PersistentVolumeClaim 上的
spec.volumeName
属性,将采用新 Kafka 代理实例名称的 PersistentVolumeClaim 更新为旧的 PersistentVolume。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
逐个删除资源
如果您使用的是现有项目,并且不想将其删除,请逐个删除资源。
设置环境变量:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
运行
terraform destroy
命令:export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token) terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID} \ -var region=${REGION} \ -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
将
FOLDER
替换为gke-autopilot
或gke-standard
。出现提示时,请输入
yes
。查找所有未挂接的磁盘:
export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
删除磁盘:
for i in $disk_list; do disk_name=$(echo $i| cut -d'|' -f1) disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||') echo "Deleting $disk_name" gcloud compute disks delete $disk_name --zone $disk_zone --quiet done
后续步骤
- 探索有关 Google Cloud 的参考架构、图表和最佳做法。查看我们的 Cloud 架构中心。