使用 Confluent 将 Apache Kafka 部署到 GKE


本指南介绍如何使用 Confluent for Kubernetes (CFK) 操作器在 Google Kubernetes Engine (GKE) 上部署 Apache Kafka 集群。

Kafka 是一个开源分布式发布-订阅消息传递系统,用于处理大量、高吞吐量和实时的流式数据。您可以使用 Kafka 构建流式数据流水线,以在不同系统和应用之间可靠地移动数据来进行处理和分析。

本指南适用于有意在 GKE 上部署 Kafka 集群的平台管理员、云架构师和运营专家。

您还可以使用 CFK 操作器部署 Confluent Platform 的其他组件,例如基于网络的 Confluent 控制中心、Schema Registry 或 KsqlDB。但是,本指南仅重点介绍 Kafka 部署。

目标

  • 为 Apache Kafka 规划和部署 GKE 基础架构
  • 部署和配置 CFK 操作器
  • 使用 CFK 操作器配置 Apache Kafka,以确保可用性、安全性、可观测性和性能

优势

CFK 具有以下优势:

  • 配置更改可自动滚动更新。
  • 自动进行滚动升级,不会影响 Kafka 可用性。
  • 如果发生故障,CFK 将恢复具有相同 Kafka 代理 ID、配置和永久性存储卷的 Kafka Pod。
  • 自动机架感知功能可将分区副本分布到不同的机架(或可用区),从而提高 Kafka 代理的可用性并限制数据丢失的风险。
  • 支持将汇总指标导出到 Prometheus。

部署架构

Kafka 集群中的每个数据分区都有一个主要代理,并且可以包含一个或多个跟随代理。主要代理会处理对分区的所有读取和写入操作。每个跟随代理都会被动地复制主要代理。

在典型的 Kafka 设置中,您还需要使用一个名为 ZooKeeper 的开源服务来协调 Kafka 集群。此服务可在代理之间选举主要代理,并在发生故障时触发故障切换,从而提供帮助。

您也可以通过激活 KRaft 模式,在不使用 Zookeeper 的情况下部署 Kafka 配置,但由于不支持 KafkaTopic 资源和凭据身份验证,此方法不被视为适合生产环境。

可用性和灾难恢复

本教程为 Kafka 和 ZooKeeper 集群使用单独的节点池可用区,以确保高可用性并为灾难恢复做好准备。

Google Cloud 中的高可用性 Kubernetes 集群依赖于跨多个节点和可用区的区域级集群。此配置可提高容错能力、可伸缩性和地理冗余。此配置还允许您执行滚动更新和维护,同时提供 SLA 以保证正常运行时间和可用性。如需了解详情,请参阅区域级集群

部署图示

下图显示了在 GKE 集群中的多个节点和可用区上运行的 Kafka 集群:

在该图中,Kafka StatefulSet 部署在三个不同可用区的三个节点上。您可以通过在 Kafka 自定义资源规范上设置必需的 Pod 亲和性拓扑扩展规则来控制此配置。

如果一个可用区发生故障,GKE 将使用推荐的配置在新节点上重新调度 Pod,并从剩余副本中复制 Kafka 和 Zookeeper 的数据。

下图显示了位于三个不同可用区的三个节点上的 ZooKeeper StatefulSet

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

须知事项

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager APIs:

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  12. Grant roles to your user account. Run the following command once for each of the following IAM roles: role/storage.objectViewer, role/logging.logWriter, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

准备环境

在本教程中,您将使用 Cloud Shell 来管理 Google Cloud 上托管的资源。Cloud Shell 预安装了本教程所需的软件,包括 kubectlgcloud CLIHelmTerraform

如需使用 Cloud Shell 设置您的环境,请按照以下步骤操作:

  1. 点击 Google Cloud 控制台中的 Cloud Shell 激活图标 激活 Cloud Shell,从 Google Cloud 控制台启动 Cloud Shell 会话。此操作会在 Google Cloud 控制台的底部窗格中启动会话。

  2. 设置环境变量:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    替换 PROJECT_ID:您的 Google Cloud 项目 ID

  3. 克隆 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. 切换到工作目录:

    cd kubernetes-engine-samples/streaming
    

创建集群基础架构

在本部分中,您将运行 Terraform 脚本以创建可用性高的专用区域级 GKE 集群。通过执行以下步骤,可以公开访问控制平面。如需限制访问权限,请创建专用集群

您可以使用 Standard 或 Autopilot 集群安装操作器。

Standard

下图显示了部署在三个不同可用区中的专用区域级 Standard GKE 集群:

若要部署此基础架构,请从 Cloud Shell 运行以下命令:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

出现提示时,请输入 yes。完成此命令并使集群显示就绪状态可能需要几分钟时间。

Terraform 会创建以下资源:

  • Kubernetes 节点的 VPC 网络和专用子网。
  • 用于通过 NAT 访问互联网的路由器。
  • 专用 GKE 集群(在 us-central1 区域中)。
  • 2 个启用了自动扩缩功能的节点池(每个可用区 1-2 个节点,每个可用区至少 1 个节点)
  • 具有日志记录和监控权限的 ServiceAccount
  • 用于灾难恢复的 Backup for GKE
  • Google Cloud Managed Service for Prometheus,用于监控集群。

输出类似于以下内容:

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Autopilot

下图显示了专用区域级 Autopilot GKE 集群:

若要部署此基础架构,请从 Cloud Shell 运行以下命令:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

出现提示时,请输入 yes。完成此命令并使集群显示就绪状态可能需要几分钟时间。

Terraform 会创建以下资源:

  • Kubernetes 节点的 VPC 网络和专用子网。
  • 用于通过 NAT 访问互联网的路由器。
  • 专用 GKE 集群(在 us-central1 区域中)。
  • 具有日志记录和监控权限的 ServiceAccount
  • Google Cloud Managed Service for Prometheus,用于监控集群。

输出类似于以下内容:

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

连接到集群

配置 kubectl 以与集群通信:

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

将 CFK 操作器部署到您的集群

在本部分中,您将使用 Helm Chart 部署 Confluent for Kubernetes (CFK) 操作器,然后部署 Kafka 集群。

  1. 添加 Confluent Helm 图表库:

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. 为 CFK 操作器和 Kafka 集群添加命名空间:

    kubectl create ns kafka
    
  3. 使用 Helm 部署 CFK 集群操作器:

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    要使 CFK 能够管理所有命名空间中的资源,请将参数 --set-namespaced=false 添加到 Helm 命令。

  4. 使用 Helm 验证 Confluent 操作器是否已成功部署:

    helm ls -n kafka
    

    输出类似于以下内容:

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

部署 Kafka

在本部分中,您将在基本配置中部署 Kafka,然后尝试各种高级配置场景以满足可用性、安全性和可观测性要求。

基本配置

Kafka 实例的基本配置包括以下组成部分:

  • Kafka 代理的三个副本,至少需要两个可用副本来实现集群一致性。
  • ZooKeeper 节点的三个副本,形成一个集群。
  • 两个 Kafka 监听器:一个不使用身份验证,另一个利用 TLS 身份验证(使用 CFK 生成的证书)。
  • 对于 Kafka,Java MaxHeapSize 和 MinHeapSize 设置为 4 GB。
  • CPU 资源分配:1 个 CPU 请求和 2 个 CPU 限制,同时 Kafka 的内存请求和限制为 5 GB(主服务为 4 GB,指标导出器为 0.5 GB),Zookeeper 为 3 GB(主服务为 2 GB,指标导出为 0.5 GB)。
  • 使用 premium-rwo storageClass 为每个 Pod 分配 100 GB 存储空间(Kafka 数据分配 100 GB,Zookeeper 数据/日志分别分配 90 GB/10 GB)。
  • 为每个工作负载配置容忍、nodeAffinity 和 podAntiAffinity,利用其各自的节点池和不同的可用区,确保跨节点适当分布。
  • 集群内通信(使用您提供的证书授权机构通过自签名证书提供保护)。

此配置表示创建可直接用于生产环境的 Kafka 集群所需的最少设置。以下部分演示的自定义配置可解决集群安全、访问控制列表 (ACL)、主题管理、证书管理等方面的需要。

创建基本 Kafka 集群

  1. 生成 CA 对:

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    Confluent for Kubernetes 为 Confluent Platform 组件提供了自动生成的证书,以用于 TLS 网络加密。您必须生成并提供证书授权机构 (CA)。

  2. 为证书授权机构创建 Kubernetes Secret:

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    Secret 的名称是预定义的

  3. 使用基本配置创建新的 Kafka 集群:

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    此命令会创建 CFK 操作器的 Kafka 自定义资源和 Zookeeper 自定义资源,这些资源包含 CPU 和内存的请求与限制、块存储请求以及污点和亲和性,用于在 Kubernetes 节点之间分配预配的 Pod。

  4. 请等待几分钟,让 Kubernetes 启动所需的工作负载:

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. 验证是否已创建 Kafka 工作负载:

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    输出类似于以下内容:

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

操作器创建以下资源:

  • 用于 Kafka 和 ZooKeeper 的两个 StatefulSet。
  • 用于 Kafka 代理副本的三个 Pod。
  • 用于 ZooKeeper 副本的三个 Pod。
  • 两个 PodDisruptionBudget 资源,确保最多只有一个不可用的副本以实现集群一致性。
  • Service my-cluster,用作从 Kubernetes 集群内连接的 Kafka 客户端的引导服务器。所有内部 Kafka 监听器在此 Service 中都可用。
  • Service zookeeper,允许 Kafka 代理作为客户端连接到 ZooKeeper 节点。

身份验证和用户管理

本部分介绍如何启用身份验证和授权,以保护 Kafka 监听器并与客户端共享凭据。

Confluent for Kubernetes 支持 Kafka 的各种身份验证方法,例如:

限制

  • CFK 不为用户管理提供自定义资源。但是,您可以将凭据存储在 Secret 中,并在监听器规范中引用 Secret。
  • 虽然没有直接管理 ACL 的自定义资源,但官方的 Confluent for Kubernetes 提供了有关使用 Kafka CLI 配置 ACL 的指导。

创建用户

本部分介绍如何部署可演示用户管理功能的 CFK 操作器,包括:

  • 在一个监听器上启用了基于密码的身份验证 (SASL/PLAIN) 的 Kafka 集群
  • 包含 3 个副本的 KafkaTopic
  • 具有读写权限的用户凭据
  1. 创建包含用户凭据的 Secret:

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    凭据应按以下格式存储:

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. 将 Kafka 集群配置为在端口 9094 上使用监听器,该监听器使用基于密码的身份验证 SCRAM-SHA-512 身份验证:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. 设置主题和客户端 Pod,以便与 Kafka 集群进行交互并执行 Kafka 命令:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    GKE 将 Secret my-user-credentials 作为装载到客户端 Pod。

  4. 客户端 Pod 准备就绪后,连接到它并开始使用提供的凭据生成和使用消息:

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. 使用 my-user 凭据生成消息,然后使用该消息来验证其收据。

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    输出类似于以下内容:

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    CTRL+C 以停止使用方进程。如果您收到 Connect refused 错误,请等待几分钟,然后重试。

  6. 退出 Pod shell

    exit
    

备份和灾难恢复

使用 Confluent 操作器,您可以按照某些模式实施高效的备份策略。

您可以使用 Backup for GKE 备份:

  • Kubernetes 资源清单。
  • 从正在备份的集群的 Kubernetes API 服务器中提取的 Confluent API 自定义资源及其定义。
  • 与清单中找到的 PersistentVolumeClaim 资源相对应的卷。

如需详细了解如何使用 Backup for GKE 备份和恢复 Kafka 集群,请参阅为灾难恢复做好准备

您还可以对 Kafka 集群执行手动备份。您应该备份:

  • Kafka 配置,其中包含 Confluent API 的所有自定义资源,例如 KafkaTopicsConnect
  • 数据,存储在 Kafka 代理的 PersistentVolume 中

通过将 Kubernetes 资源清单(包括 Confluent 配置)存储在 Git 代码库中,无需单独备份 Kafka 配置,因为可以根据需要将资源重新应用于新的 Kubernetes 集群。

为了在 Kafka 服务器实例或部署了 Kafka 的 Kubernetes 集群丢失的情况下保护 Kafka 数据恢复,我们建议您配置用于为 Kafka 代理预配卷的 Kubernetes 存储类别,并将 reclaimPolicy 选项设置为 Retain。我们还建议您截取 Kafka 代理卷的快照

以下清单描述了使用 reclaimPolicy 选项 Retain 的 StorageClass:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

以下示例展示了 StorageClass 被添加到了 Kafka 集群自定义资源的 spec 中:

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

使用此配置时,即使删除了相应的 PersistentVolumeClaim,使用该存储类别预配的 PersistentVolume 也不会被删除。

如需使用现有配置和代理实例数据在新的 Kubernetes 集群上恢复 Kafka 实例,请执行以下操作:

  1. 将现有 Confluent 自定义资源(KafkaKafkaTopicZookeeper 等)应用于新的 Kubernetes 集群
  2. 使用 PersistentVolumeClaim 上的 spec.volumeName 属性,将采用新 Kafka 代理实例名称的 PersistentVolumeClaim 更新为旧的 PersistentVolume。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

逐个删除资源

如果您使用的是现有项目,并且不想将其删除,请逐个删除资源。

  1. 设置环境变量:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    
  2. 运行 terraform destroy 命令:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID}   \
      -var region=${REGION}  \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
    

    FOLDER 替换为 gke-autopilotgke-standard

    出现提示时,请输入 yes

  3. 查找所有未挂接的磁盘:

    export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
    
  4. 删除磁盘:

    for i in $disk_list; do
      disk_name=$(echo $i| cut -d'|' -f1)
      disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||')
      echo "Deleting $disk_name"
      gcloud compute disks delete $disk_name --zone $disk_zone --quiet
    done
    

后续步骤