在 GKE 上部署高可用性 Kafka 叢集

Kafka 是開放原始碼的分散式發布/訂閱訊息傳遞系統,可處理大量、高輸送量和即時串流資料。您可以使用 Kafka 建構串流資料管道,在不同系統和應用程式之間穩定移動資料,以進行處理和分析。

本教學課程的適用對象為平台管理員、雲端架構師和營運專員,他們有興趣在 Google Kubernetes Engine (GKE) 上部署高可用性的 Kafka 叢集。

建立叢集基礎架構

在本節中,您將執行 Terraform 指令碼,建立兩個地區 GKE 叢集。主要叢集會部署在 us-central1 中。

如要建立叢集,請按照下列步驟操作:

Autopilot

在 Cloud Shell 中執行下列指令:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

系統顯示提示訊息時,請輸入 yes

標準

在 Cloud Shell 中執行下列指令:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

系統顯示提示訊息時,請輸入 yes

Terraform 設定檔會建立下列資源,以部署基礎架構:

  • 建立 Artifact Registry 存放區,用於儲存 Docker 映像檔。
  • 為 VM 的網路介面建立虛擬私有雲網路和子網路。
  • 建立兩個 GKE 叢集。

Terraform 會在這兩個區域中建立私人叢集,並啟用 Backup for GKE 進行災難復原。

在叢集上部署 Kafka

在本節中,您將使用 Helm 資訊套件在 GKE 上部署 Kafka。這項作業會建立下列資源:

  • Kafka 和 Zookeeper StatefulSet。
  • Kafka 匯出工具部署作業。匯出工具會收集 Kafka 指標,供 Prometheus 使用。
  • Pod 中斷預算 (PDB): 限制自願中斷期間離線的 Pod 數量。

如要使用 Helm 圖表部署 Kafka,請按照下列步驟操作:

  1. 設定 Docker 存取權。

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. 在 Artifact Registry 中填入 Kafka 和 Zookeeper 映像檔。

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. 設定主要叢集的 kubectl 指令列存取權。

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --location=${REGION} \
        --project=${PROJECT_ID}
    
  4. 建立命名空間。

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. 使用 Helm 資訊套件 20.0.6 版安裝 Kafka。

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    輸出結果會與下列內容相似:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. 確認 Kafka 副本正在執行 (可能需要幾分鐘)。

    kubectl get all -n kafka
    

    輸出結果會與下列內容相似:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

建立測試資料

在本節中,您將測試 Kafka 應用程式並產生訊息。

  1. 建立消費者端 Pod,與 Kafka 應用程式互動。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. 建立名為 topic1 的主題,其中包含三個分割區,以及三個複寫因數。

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 確認主題分區已在所有三個代理程式中複製。

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    輸出結果會與下列內容相似:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    在範例輸出中,請注意 topic1 有三個分割區,每個分割區都有不同的領導者和副本集。這是因為 Kafka 會使用分割功能,將資料分配到多個代理程式,進而提升擴充性和容錯能力。複製係數為 3 可確保每個分割區都有三個副本,即使一或兩個代理程式發生故障,資料仍可供使用。

  4. 執行下列指令,將大量訊息號碼產生至 topic1

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. 執行下列指令,從所有分割區取用 topic1

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    輸入 CTRL+C 即可停止消費者程序。

Kafka 基準

如要準確模擬用途,您可以對叢集預期負載執行模擬作業。如要測試效能,請使用 Kafka 套件隨附的工具,也就是 bin 資料夾中的 kafka-producer-perf-test.shkafka-consumer-perf-test.sh 指令碼。

  1. 建立基準化主題。

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. 在 Kafka 叢集上建立負載。

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    供應商會在 topic-benchmark產生 10,000,000 筆記錄。輸出內容會類似以下內容:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    所有記錄都傳送完畢後,輸出內容中應會顯示其他指標,如下所示:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    如要退出手錶,請輸入 CTRL + C

  3. 結束 Pod 殼層。

    exit
    

管理升級

Kafka 和 Kubernetes 的版本更新會定期發布。請遵循作業最佳做法,定期升級軟體環境。

規劃 Kafka 二進位檔升級作業

在本節中,您將使用 Helm 更新 Kafka 映像檔,並確認主題仍可使用。

如要從「在叢集上部署 Kafka」一文中使用的 Helm 圖表升級較舊的 Kafka 版本,請按照下列步驟操作:

  1. 使用下列映像檔填入 Artifact Registry:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. 請按照下列步驟,使用升級後的 Kafka 和 Zookeeper 映像檔部署 Helm 資訊套件。如需特定版本的指南,請參閱 Kafka 版本升級說明

    1. 更新 Chart.yaml 依附元件版本:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. 使用新的 Kafka 和 Zookeeper 映像檔部署 Helm 資訊套件,如下列範例所示:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    觀看 Kafka Pod 升級:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    如要退出手錶,請輸入 CTRL + C

  3. 使用用戶端 Pod 連線至 Kafka 叢集。

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. 確認你可以存取 topic1 的訊息。

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    輸出內容應顯示上一個步驟產生的訊息。輸入 CTRL+C 即可結束程序。

  5. 結束用戶端 Pod。

    exit
    

為災難復原做好準備

為確保生產環境工作負載在服務中斷事件發生時仍可使用,您應準備災難復原 (DR) 計畫。如要進一步瞭解 DR 規劃,請參閱「災難復原規劃指南」。

如要備份及還原 GKE 叢集中的工作負載,可以使用 GKE 備份服務

Kafka 備份和還原情境範例

在本節中,您將從 gke-kafka-us-central1 備份叢集,並將備份還原至 gke-kafka-us-west1。您將使用 ProtectedApplication 自訂資源,在應用程式範圍執行備份和還原作業。

下圖說明災難復原解決方案的元件,以及這些元件之間的關聯。

這張圖顯示高可用性 Kafka 叢集的備份與復原解決方案範例。
圖 3:高可用性 Kafka 叢集的備份與復原解決方案範例。

如要準備備份及還原 Kafka 叢集,請按照下列步驟操作:

  1. 設定環境變數。

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. 確認叢集處於 RUNNING 狀態。

    gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
    
  3. 建立備份方案。

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. 手動建立備份。排定的備份作業通常會依據備份方案中的 cron 排程執行,但以下範例說明如何啟動一次性備份作業。

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. 建立還原方案。

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. 手動從備份還原。

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. 在備份叢集中查看還原的應用程式。所有 Pod 可能需要幾分鐘才能執行並準備就緒。

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --location us-west1
    kubectl get pod -n kafka --watch
    

    當所有 Pod 啟動並執行後,輸入 CTRL+C 即可結束監控。

  8. 驗證消費者是否可擷取先前的主題。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    輸出結果會與下列內容相似:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    輸入 CTRL+C 即可結束程序。

  9. 離開 Pod。

    exit
    

模擬 Kafka 服務中斷

在本節中,您將替換代管代理程式的 Kubernetes 節點,模擬節點故障。本節內容僅適用於標準版。 Autopilot 會為您管理節點,因此無法模擬節點故障。

  1. 建立用戶端 Pod,連線至 Kafka 應用程式。

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. 建立主題 topic-failover-test 並產生測試流量。

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 判斷 topic-failover-test 主題的領導者代理程式。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    輸出結果會與下列內容相似:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    在上述輸出內容中,Leader: 1 表示 topic-failover-test 的領導者是代理程式 1。這對應於 Pod kafka-1

  4. 開啟新的終端機,並連線至同一個叢集。

    gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
    
  5. 找出 Pod kafka-1 執行的節點。

    kubectl get pod -n kafka kafka-1 -o wide
    

    輸出結果會與下列內容相似:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    在上述輸出內容中,您會看到 Pod kafka-1 在節點 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 上執行。

  6. 排空節點以逐出 Pod。

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    NODE 替換為執行 kafka-1 的節點 Pod。在本例中,節點為 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72

    輸出結果會與下列內容相似:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. 找出 Pod kafka-1 執行的節點。

    kubectl get pod -n kafka kafka-1 -o wide
    

    輸出內容應如下所示:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    從上述輸出內容中,您可以看到應用程式是在新節點上執行。

  8. 在連線至 kafka-client Pod 的終端機中,判斷哪個代理程式是 topic-failover-test 的領導者。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    輸出內容應如下所示:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    在範例輸出內容中,領導者仍為 1。不過,現在是在新節點上執行。

測試 Kafka 領導者故障

  1. 在 Cloud Shell 中連線至 Kafka 用戶端,然後使用 describe 查看 topic1 中每個分區選出的領導者。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    輸出結果會與下列內容相似:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. 在未連線至 Kafka 用戶端的 Cloud Shell 中,刪除 kafka-0 領導者代理程式,強制進行新的領導者選舉。您應刪除對應至先前輸出內容中其中一位領導者的索引。

    kubectl delete pod -n kafka kafka-0 --force
    

    輸出結果會與下列內容相似:

    pod "kafka-0" force deleted
    
  3. 在連線至 Kafka 用戶端的 Cloud Shell 中,使用 describe 查看選出的領導者。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    輸出結果會與下列內容相似:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    在輸出內容中,每個分割區的新領導者都會變更 (如果已指派給中斷的領導者 (kafka-0))。這表示系統在刪除並重新建立 Pod 時,已取代原始領導者。