使用 Confluent 将 Apache Kafka 部署到 GKE


本指南介绍如何使用 Confluent for Kubernetes (CFK) 操作器在 Google Kubernetes Engine (GKE) 上部署 Apache Kafka 集群。

Kafka 是一个开源分布式发布-订阅消息传递系统,用于处理大量、高吞吐量和实时的流式数据。您可以使用 Kafka 构建流式数据流水线,以在不同系统和应用之间可靠地移动数据来进行处理和分析。

本指南适用于有意在 GKE 上部署 Kafka 集群的平台管理员、云架构师和运营专家。

您还可以使用 CFK 操作器部署 Confluent Platform 的其他组件,例如基于网络的 Confluent 控制中心、Schema Registry 或 KsqlDB。但是,本指南仅重点介绍 Kafka 部署。

目标

  • 为 Apache Kafka 规划和部署 GKE 基础架构
  • 部署和配置 CFK 操作器
  • 使用 CFK 操作器配置 Apache Kafka,以确保可用性、安全性、可观测性和性能

优势

CFK 具有以下优势:

  • 配置更改可自动滚动更新。
  • 自动进行滚动升级,不会影响 Kafka 可用性。
  • 如果发生故障,CFK 将恢复具有相同 Kafka 代理 ID、配置和永久性存储卷的 Kafka Pod。
  • 自动机架感知功能可将分区副本分布到不同的机架(或可用区),从而提高 Kafka 代理的可用性并限制数据丢失的风险。
  • 支持将汇总指标导出到 Prometheus。

部署架构

Kafka 集群中的每个数据分区都有一个主要代理,并且可以包含一个或多个跟随代理。主要代理会处理对分区的所有读取和写入操作。每个跟随代理都会被动地复制主要代理。

在典型的 Kafka 设置中,您还需要使用一个名为 ZooKeeper 的开源服务来协调 Kafka 集群。此服务可在代理之间选举主要代理,并在发生故障时触发故障切换,从而提供帮助。

您也可以通过激活 KRaft 模式,在不使用 Zookeeper 的情况下部署 Kafka 配置,但由于不支持 KafkaTopic 资源和凭据身份验证,此方法不被视为适合生产环境。

可用性和灾难恢复

本教程为 Kafka 和 ZooKeeper 集群使用单独的节点池可用区,以确保高可用性并为灾难恢复做好准备。

Google Cloud 中的高可用性 Kubernetes 集群依赖于跨多个节点和可用区的区域级集群。此配置可提高容错能力、可伸缩性和地理冗余。此配置还允许您执行滚动更新和维护,同时提供 SLA 以保证正常运行时间和可用性。如需了解详情,请参阅区域级集群

部署图示

下图显示了在 GKE 集群中的多个节点和可用区上运行的 Kafka 集群:

在该图中,Kafka StatefulSet 部署在三个不同可用区的三个节点上。您可以通过在 Kafka 自定义资源规范上设置必需的 Pod 亲和性拓扑扩展规则来控制此配置。

如果一个可用区发生故障,GKE 将使用推荐的配置在新节点上重新调度 Pod,并从剩余副本中复制 Kafka 和 Zookeeper 的数据。

下图显示了位于三个不同可用区的三个节点上的 ZooKeeper StatefulSet

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

须知事项

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  7. 安装 Google Cloud CLI。
  8. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  9. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  10. 确保您的 Google Cloud 项目已启用结算功能

  11. Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  12. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: role/storage.objectViewer, role/logging.logWriter, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。

准备环境

在本教程中,您将使用 Cloud Shell 来管理 Google Cloud 上托管的资源。Cloud Shell 预安装了本教程所需的软件,包括 kubectlgcloud CLIHelmTerraform

如需使用 Cloud Shell 设置您的环境,请按照以下步骤操作:

  1. 点击 Google Cloud 控制台中的 Cloud Shell 激活图标 激活 Cloud Shell,从 Google Cloud 控制台启动 Cloud Shell 会话。此操作会在 Google Cloud 控制台的底部窗格中启动会话。

  2. 设置环境变量:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    替换 PROJECT_ID:您的 Google Cloud 项目 ID

  3. 克隆 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. 切换到工作目录:

    cd kubernetes-engine-samples/streaming
    

创建集群基础架构

在本部分中,您将运行 Terraform 脚本以创建可用性高的专用区域级 GKE 集群。通过执行以下步骤,可以公开访问控制平面。如需限制访问权限,请创建专用集群

您可以使用 Standard 或 Autopilot 集群安装操作器。

Standard

下图显示了部署在三个不同可用区中的专用区域级 Standard GKE 集群:

若要部署此基础架构,请从 Cloud Shell 运行以下命令:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

出现提示时,请输入 yes。完成此命令并使集群显示就绪状态可能需要几分钟时间。

Terraform 会创建以下资源:

  • Kubernetes 节点的 VPC 网络和专用子网。
  • 用于通过 NAT 访问互联网的路由器。
  • 专用 GKE 集群(在 us-central1 区域中)。
  • 2 个启用了自动扩缩功能的节点池(每个可用区 1-2 个节点,每个可用区至少 1 个节点)
  • 具有日志记录和监控权限的 ServiceAccount
  • 用于灾难恢复的 Backup for GKE
  • Google Cloud Managed Service for Prometheus,用于监控集群。

输出类似于以下内容:

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Autopilot

下图显示了专用区域级 Autopilot GKE 集群:

若要部署此基础架构,请从 Cloud Shell 运行以下命令:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

出现提示时,请输入 yes。完成此命令并使集群显示就绪状态可能需要几分钟时间。

Terraform 会创建以下资源:

  • Kubernetes 节点的 VPC 网络和专用子网。
  • 用于通过 NAT 访问互联网的路由器。
  • 专用 GKE 集群(在 us-central1 区域中)。
  • 具有日志记录和监控权限的 ServiceAccount
  • Google Cloud Managed Service for Prometheus,用于监控集群。

输出类似于以下内容:

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

连接到集群

配置 kubectl 以与集群通信:

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

将 CFK 操作器部署到您的集群

在本部分中,您将使用 Helm Chart 部署 Confluent for Kubernetes (CFK) 操作器,然后部署 Kafka 集群。

  1. 添加 Confluent Helm 图表库:

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. 为 CFK 操作器和 Kafka 集群添加命名空间:

    kubectl create ns kafka
    
  3. 使用 Helm 部署 CFK 集群操作器:

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    要使 CFK 能够管理所有命名空间中的资源,请将参数 --set-namespaced=false 添加到 Helm 命令。

  4. 使用 Helm 验证 Confluent 操作器是否已成功部署:

    helm ls -n kafka
    

    输出类似于以下内容:

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

部署 Kafka

在本部分中,您将在基本配置中部署 Kafka,然后尝试各种高级配置场景以满足可用性、安全性和可观测性要求。

基本配置

Kafka 实例的基本配置包括以下组成部分:

  • Kafka 代理的三个副本,至少需要两个可用副本来实现集群一致性。
  • ZooKeeper 节点的三个副本,形成一个集群。
  • 两个 Kafka 监听器:一个不使用身份验证,另一个利用 TLS 身份验证(使用 CFK 生成的证书)。
  • 对于 Kafka,Java MaxHeapSize 和 MinHeapSize 设置为 4 GB。
  • CPU 资源分配:1 个 CPU 请求和 2 个 CPU 限制,同时 Kafka 的内存请求和限制为 5 GB(主服务为 4 GB,指标导出器为 0.5 GB),Zookeeper 为 3 GB(主服务为 2 GB,指标导出为 0.5 GB)。
  • 使用 premium-rwo storageClass 为每个 Pod 分配 100 GB 存储空间(Kafka 数据分配 100 GB,Zookeeper 数据/日志分别分配 90 GB/10 GB)。
  • 为每个工作负载配置容忍、nodeAffinity 和 podAntiAffinity,利用其各自的节点池和不同的可用区,确保跨节点适当分布。
  • 集群内通信(使用您提供的证书授权机构通过自签名证书提供保护)。

此配置表示创建可直接用于生产环境的 Kafka 集群所需的最少设置。以下部分演示的自定义配置可解决集群安全、访问控制列表 (ACL)、主题管理、证书管理等方面的需要。

创建基本 Kafka 集群

  1. 生成 CA 对:

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    Confluent for Kubernetes 为 Confluent Platform 组件提供了自动生成的证书,以用于 TLS 网络加密。您必须生成并提供证书授权机构 (CA)。

  2. 为证书授权机构创建 Kubernetes Secret:

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    Secret 的名称是预定义的

  3. 使用基本配置创建新的 Kafka 集群:

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    此命令会创建 CFK 操作器的 Kafka 自定义资源和 Zookeeper 自定义资源,这些资源包含 CPU 和内存的请求与限制、块存储请求以及污点和亲和性,用于在 Kubernetes 节点之间分配预配的 Pod。

  4. 请等待几分钟,让 Kubernetes 启动所需的工作负载:

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. 验证是否已创建 Kafka 工作负载:

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    输出类似于以下内容:

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

操作器创建以下资源:

  • 用于 Kafka 和 ZooKeeper 的两个 StatefulSet。
  • 用于 Kafka 代理副本的三个 Pod。
  • 用于 ZooKeeper 副本的三个 Pod。
  • 两个 PodDisruptionBudget 资源,确保最多只有一个不可用的副本以实现集群一致性。
  • Service my-cluster,用作从 Kubernetes 集群内连接的 Kafka 客户端的引导服务器。所有内部 Kafka 监听器在此 Service 中都可用。
  • Service zookeeper,允许 Kafka 代理作为客户端连接到 ZooKeeper 节点。

身份验证和用户管理

本部分介绍如何启用身份验证和授权,以保护 Kafka 监听器并与客户端共享凭据。

Confluent for Kubernetes 支持 Kafka 的各种身份验证方法,例如:

限制

  • CFK 不为用户管理提供自定义资源。但是,您可以将凭据存储在 Secret 中,并在监听器规范中引用 Secret。
  • 虽然没有直接管理 ACL 的自定义资源,但官方的 Confluent for Kubernetes 提供了有关使用 Kafka CLI 配置 ACL 的指导。

创建用户

本部分介绍如何部署可演示用户管理功能的 CFK 操作器,包括:

  • 在一个监听器上启用了基于密码的身份验证 (SASL/PLAIN) 的 Kafka 集群
  • 包含 3 个副本的 KafkaTopic
  • 具有读写权限的用户凭据
  1. 创建包含用户凭据的 Secret:

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    凭据应按以下格式存储:

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. 将 Kafka 集群配置为在端口 9094 上使用监听器,该监听器使用基于密码的身份验证 SCRAM-SHA-512 身份验证:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. 设置主题和客户端 Pod,以便与 Kafka 集群进行交互并执行 Kafka 命令:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    GKE 将 Secret my-user-credentials 作为装载到客户端 Pod。

  4. 客户端 Pod 准备就绪后,连接到它并开始使用提供的凭据生成和使用消息:

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. 使用 my-user 凭据生成消息,然后使用该消息来验证其收据。

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    输出类似于以下内容:

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    CTRL+C 以停止使用方进程。如果您收到 Connect refused 错误,请等待几分钟,然后重试。

  6. 退出 Pod shell

    exit
    

备份和灾难恢复

使用 Confluent 操作器,您可以按照某些模式实施高效的备份策略。

您可以使用 Backup for GKE 备份:

  • Kubernetes 资源清单。
  • 从正在备份的集群的 Kubernetes API 服务器中提取的 Confluent API 自定义资源及其定义。
  • 与清单中找到的 PersistentVolumeClaim 资源相对应的卷。

如需详细了解如何使用 Backup for GKE 备份和恢复 Kafka 集群,请参阅为灾难恢复做好准备

您还可以对 Kafka 集群执行手动备份。您应该备份:

  • Kafka 配置,其中包含 Confluent API 的所有自定义资源,例如 KafkaTopicsConnect
  • 数据,存储在 Kafka 代理的 PersistentVolume 中

通过将 Kubernetes 资源清单(包括 Confluent 配置)存储在 Git 代码库中,无需单独备份 Kafka 配置,因为可以根据需要将资源重新应用于新的 Kubernetes 集群。

为了在 Kafka 服务器实例或部署了 Kafka 的 Kubernetes 集群丢失的情况下保护 Kafka 数据恢复,我们建议您配置用于为 Kafka 代理预配卷的 Kubernetes 存储类别,并将 reclaimPolicy 选项设置为 Retain。我们还建议您截取 Kafka 代理卷的快照

以下清单描述了使用 reclaimPolicy 选项 Retain 的 StorageClass:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

以下示例展示了 StorageClass 被添加到了 Kafka 集群自定义资源的 spec 中:

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

使用此配置时,即使删除了相应的 PersistentVolumeClaim,使用该存储类别预配的 PersistentVolume 也不会被删除。

如需使用现有配置和代理实例数据在新的 Kubernetes 集群上恢复 Kafka 实例,请执行以下操作:

  1. 将现有 Confluent 自定义资源(KafkaKafkaTopicZookeeper 等)应用于新的 Kubernetes 集群
  2. 使用 PersistentVolumeClaim 上的 spec.volumeName 属性,将采用新 Kafka 代理实例名称的 PersistentVolumeClaim 更新为旧的 PersistentVolume。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

    删除 Google Cloud 项目:

    gcloud projects delete PROJECT_ID

逐个删除资源

如果您使用的是现有项目,并且不想将其删除,请逐个删除资源。

  1. 设置环境变量:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    
  2. 运行 terraform destroy 命令:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID}   \
      -var region=${REGION}  \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
    

    FOLDER 替换为 gke-autopilotgke-standard

    出现提示时,请输入 yes

  3. 查找所有未挂接的磁盘:

    export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
    
  4. 删除磁盘:

    for i in $disk_list; do
      disk_name=$(echo $i| cut -d'|' -f1)
      disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||')
      echo "Deleting $disk_name"
      gcloud compute disks delete $disk_name --zone $disk_zone --quiet
    done
    

后续步骤