GKE에 고가용성 Kafka 클러스터 배포


Kafka는 높은 볼륨, 높은 처리량, 실시간 스트리밍 데이터를 처리하기 위한 오픈소스 분산 게시-구독 메시징 시스템입니다. Kafka를 사용하면 처리 및 분석을 위해 여러 시스템 및 애플리케이션에서 데이터를 안정적으로 이동하는 스트리밍 데이터 파이프라인을 빌드할 수 있습니다.

이 튜토리얼은 Google Kubernetes Engine(GKE)에 고가용성 Kafka 클러스터 배포에 관심이 있는 플랫폼 관리자, 클라우드 설계자, 운영 전문가를 대상으로 합니다.

목표

이 튜토리얼에서는 다음과 같은 방법을 알아봅니다.

  • Terraform을 사용하여 리전별 GKE 클러스터를 만듭니다.
  • 가용성이 높은 Kafka 클러스터를 배포합니다.
  • Kafka 바이너리를 업그레이드합니다.
  • Kafka 클러스터를 백업 및 복원합니다.
  • GKE 노드 중단 및 Kafka 브로커 장애 조치를 시뮬레이션합니다.

아키텍처

이 섹션에서는 이 튜토리얼에서 빌드할 솔루션의 아키텍처를 설명합니다.

Kafka 클러스터는 Kafka 클라이언트(소비자)에 대해 들어오는 데이터 스트림 및 게시-구독 메시징을 처리하기 위해 함께 작동하는 하나 이상의 서버(브로커) 그룹입니다.

Kafka 클러스터의 각 데이터 파티션은 리더 브로커가 하나 있고 팔로어 브로커를 하나 이상 포함할 수 있습니다. 리더 브로커는 파티션에 대한 모든 읽기 쓰기를 처리합니다. 각 팔로어 브로커는 리더 브로커를 수동적으로 복제합니다.

일반적인 Kafka 설정에서는 Kafka 클러스터 조정을 위해 ZooKeeper라는 오픈소스 서비스도 사용됩니다. 이 서비스는 브로커 중에서 리더를 선별하고 오류 시 장애 조치를 트리거하는 데 도움이 됩니다.

이 튜토리얼에서는 Kafka 브로커 및 Zookeeper 서비스를 개별 StatefulSet로 구성하여 GKE에 Kafka 클러스터를 배포합니다. 고가용성 Kafka 클러스터를 프로비저닝하고 재해 복구를 준비하려면 개별 노드 풀영역을 사용하도록 Kafka 및 Zookeeper StatefulSet를 구성합니다.

다음 다이어그램은 Kafka StatefulSet가 GKE 클러스터의 여러 노드 및 영역에서 실행되는 방법을 보여줍니다.

여러 영역에 배포된 GKE에서 Kafka StatefulSet 아키텍처 예시를 보여주는 다이어그램입니다.
그림 1: 세 가지 영역에서 GKE 노드에 Kafka StatefulSet 배포.

다음 다이어그램은 Zookeeper StatefulSet가 GKE 클러스터의 여러 노드 및 영역에서 실행되는 방법을 보여줍니다.

여러 영역에 배포된 GKE에서 Zookeeper StatefulSet 아키텍처 예시를 보여주는 다이어그램입니다.
그림 2: 세 가지 영역에서 GKE 노드에 Kafka Zookeeper 배포.

노드 프로비저닝 및 포드 예약

Autopilot 클러스터를 사용하는 경우 Autopilot은 워크로드에 대한 노드 프로비저닝 및 포드 예약을 처리합니다. 포드 안티어피니티를 사용하여 동일한 StatefulSet의 두 포드가 동일 노드 및 동일 영역에 예약되지 않도록 보장합니다.

Standard 클러스터를 사용하는 경우 포드 톨러레이션(toleration)노드 어피니티를 구성해야 합니다. 자세한 내용은 전용 노드 풀에서 워크로드 격리를 참조하세요.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

프로젝트 설정

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  4. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  6. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  7. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

역할 설정

  1. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    $ gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.

환경 설정하기

이 튜토리얼에서는 Cloud Shell을 사용하여 Google Cloud에서 호스팅되는 리소스를 관리합니다. Cloud Shell에는 Docker, kubectl, gcloud CLI, Helm, Terraform을 포함하여 이 튜토리얼에 필요한 소프트웨어가 사전 설치되어 있습니다.

Cloud Shell로 환경을 설정하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 Cloud Shell 활성화 아이콘Cloud Shell 활성화를 클릭하여 Google Cloud 콘솔에서 Cloud Shell 세션을 시작합니다. 그러면 Google Cloud 콘솔 하단 창에서 세션이 시작됩니다.

  2. 환경 변수를 설정합니다.

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    

    다음 값을 바꿉니다.

  3. 기본 환경 변수를 설정합니다.

    gcloud config set project PROJECT_ID
    
  4. 코드 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  5. 작업 디렉터리로 변경합니다.

    cd kubernetes-engine-samples/streaming/gke-stateful-kafka
    

클러스터 인프라 만들기

이 섹션에서는 Terraform 스크립트를 실행하여 2개의 리전 GKE 클러스터를 만듭니다. 기본 클러스터는 us-central1에 배포됩니다.

클러스터를 만들려면 다음 단계를 따르세요.

Autopilot

Cloud Shell에서 다음 명령어를 실행합니다.

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

메시지가 표시되면 yes를 입력합니다.

Standard

Cloud Shell에서 다음 명령어를 실행합니다.

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

메시지가 표시되면 yes를 입력합니다.

Terraform 구성 파일은 인프라를 배포하기 위해 다음 리소스를 만듭니다.

  • Docker 이미지를 저장할 Artifact Registry 저장소를 만듭니다.
  • VM의 네트워크 인터페이스에 대한 VPC 네트워크 및 서브넷을 만듭니다.
  • 두 개의 GKE 클러스터를 만듭니다.

Terraform은 두 리전에 비공개 클러스터를 만들고 재해 복구를 위해 Backup for GKE를 사용 설정합니다.

클러스터에 Kafka 배포

이 섹션에서는 Helm 차트를 사용하여 GKE에 Kafka를 배포합니다. 이 작업은 다음 리소스를 생성합니다.

Helm 차트를 사용하여 Kafka를 배포하려면 다음 단계를 수행합니다.

  1. Docker 액세스를 구성합니다.

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Artifact Registry에 Kafka 및 Zookeeper 이미지를 채웁니다.

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. 기본 클러스터에 대한 kubectl 명령줄 액세스를 구성합니다.

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --region=${REGION} \
        --project=${PROJECT_ID}
    
  4. 네임스페이스를 만듭니다.

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Helm 차트 버전 20.0.6을 사용하여 Kafka를 설치합니다.

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    출력은 다음과 비슷합니다.

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Kafka 복제본이 실행 중인지 확인합니다(몇 분 정도 걸릴 수 있음).

    kubectl get all -n kafka
    

    출력은 다음과 비슷합니다.

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

테스트 데이터 만들기

이 섹션에서는 Kafka 애플리케이션을 테스트하고 메시지를 생성합니다.

  1. Kafka 애플리케이션과 상호작용하기 위해 소비자 클라이언트 포드를 만듭니다.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. 파티션이 3개 있고 복제 계수가 3인 topic1이라는 주제를 만듭니다.

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 주제 파티션이 3개 브로커에 모두 복제되었는지 확인합니다.

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    출력 예시에서 topic1에는 3개 파티션이 있고, 각 파티션마다 서로 다른 리더 및 복제본 집합이 포함되어 있습니다. 이것은 확장성 및 내결함성 향상을 위해 Kafka가 파티션 나누기를 사용하여 여러 브로커에 데이터를 분산하기 때문입니다. 복제 계수 3은 각 파티션에 복제본을 3개 포함하여 한 두 개의 브로커가 실패하더라도 데이터를 계속 사용할 수 있게 보장합니다.

  4. 다음 명령어를 실행하여 메시지 번호를 topic1에 대량으로 생성합니다.

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. 다음 명령어를 실행하여 모든 파티션에서 topic1을 소비합니다.

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    CTRL+C를 입력하여 소비자 프로세스를 중지합니다.

Kafka 벤치마크

사용 사례를 정확하게 모델링하기 위해서는 클러스터에서 예상 부하에 대한 시뮬레이션을 실행할 수 있습니다. 성능을 테스트하려면 bin 폴더에 있는 kafka-producer-perf-test.shkafka-consumer-perf-test.sh 스크립트와 같이 Kafka 패키지에 포함된 도구를 사용합니다.

  1. 벤치마킹 주제를 만듭니다.

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Kafka 클러스터에 부하를 만듭니다.

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    제작자가 topic-benchmark에서 10,000,000개 레코드를 생성합니다. 출력은 다음과 비슷합니다.

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    모든 레코드가 전송되었으면 다음과 비슷하게 출력에 추가 측정항목이 표시됩니다.

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    확인을 종료하려면 CTRL + C를 입력합니다.

  3. 포드 셸을 종료합니다.

    exit
    

업그레이드 관리

Kafka 및 Kubernetes의 버전 업데이트는 정기적으로 출시됩니다. 작업 권장사항에 따라 소프트웨어 환경을 정기적으로 업데이트하세요.

Kafka 바이너리 업그레이드 계획

이 섹션에서는 Helm을 사용하여 Kafka 이미지를 업데이트하고 주제를 계속 사용할 수 있는지 확인합니다.

클러스터에 Kafka 배포에서 사용한 Helm 차트로부터 이전 Kafka 버전에서 업그레이드하려면 다음 단계를 수행합니다.

  1. Artifact Registry에 다음 이미지를 채웁니다.

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. 업그레이드된 Kafka 및 Zookeeper 이미지로 Helm 차트를 배포하려면 다음 단계를 수행합니다. 버전 관련 안내는 Kafka 버전 업그레이드 안내를 참조하세요.

    1. Chart.yaml 종속 항목 버전을 업데이트합니다.
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. 다음 예시에 표시된 것처럼 새 Kafka 및 Zookeeper 이미지를 사용하여 Helm 차트를 배포합니다.

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Kafka 포드가 업그레이드되는 것을 확인합니다.

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    확인을 종료하려면 CTRL + C를 입력합니다.

  3. 클라이언트 포드를 사용하여 Kafka 클러스터에 연결합니다.

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. topic1에서 메시지에 액세스할 수 있는지 확인합니다.

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    이전 단계에서 생성된 메시지가 출력에 표시됩니다. 프로세스를 종료하려면 CTRL+C를 입력합니다.

  5. 클라이언트 포드를 종료합니다.

    exit
    

재해 복구 대비

서비스 중단 이벤트 발생 시 프로덕션 워크로드를 계속 사용할 수 있도록 하려면 재해 복구(DR) 계획을 준비해야 합니다. DR 계획에 대한 자세한 내용은 재해 복구 계획 가이드를 참조하세요.

GKE 클러스터에서 워크로드를 백업 및 복원하려면 Backup for GKE를 사용하면 됩니다.

Kafka 백업 및 복원 시나리오 예시

이 섹션에서는 gke-kafka-us-central1에서 클러스터 백업을 가져오고 백업을 gke-kafka-us-west1에 복원합니다. ProtectedApplication 커스텀 리소스를 사용하여 애플리케이션 범위에서 백업 및 복원 작업을 수행합니다.

다음 다이어그램은 재해 복구 솔루션의 구성요소와 상호 관계를 보여줍니다.

가용성이 높은 Kafka 클러스터의 백업 및 복구 솔루션 예시를 보여주는 다이어그램
그림 3: 고가용성 Kafka 클러스터의 백업 및 복구 솔루션 예시

Kafka 클러스터 백업 및 복원을 준비하려면 다음 단계를 수행합니다.

  1. 환경 변수 설정

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. 클러스터가 RUNNING 상태인지 확인합니다.

    gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
    
  3. 백업 계획을 만듭니다.

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. 수동으로 백업을 만듭니다. 예약된 백업은 일반적으로 백업 계획에서 크론 일정으로 제어되지만 다음 예시는 일회성 백업 작업을 시작하는 방법을 보여줍니다.

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. 복원 계획을 만듭니다.

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. 백업에서 수동으로 복원합니다.

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. 복원된 애플리케이션이 백업 클러스터에 표시되는 것을 확인합니다. 모든 포드가 실행되고 준비될 때까지 몇 분 정도 걸릴 수 있습니다.

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --region us-west1
    kubectl get pod -n kafka --watch
    

    모든 포드가 작동되어 실행되면 CTRL+C를 입력하여 확인을 종료합니다.

  8. 소비자가 이전 주제를 가져올 수 있는지 확인합니다.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    출력은 다음과 비슷합니다.

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    프로세스를 종료하려면 CTRL+C를 입력합니다.

  9. 포드를 종료합니다.

    exit
    

Kafka 서비스 중단 시뮬레이션

이 섹션에서는 브로커를 호스팅하는 Kubernetes 노드를 교체하여 노드 오류를 시뮬레이션합니다. 이 섹션은 Standard 버전에만 적용됩니다. Autopilot에서는 노드가 자동으로 관리되므로 노드 오류를 시뮬레이션할 수 없습니다.

  1. Kafka 애플리케이션에 연결하기 위해 클라이언트 포드를 만듭니다.

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. topic-failover-test 주제를 만들고 테스트 트래픽을 생성합니다.

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. topic-failover-test 주제 리더로 브로커를 결정합니다.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    위 출력에서 Leader: 1topic-failover-test의 리더가 브로커 1입니다. 이것은 포드 kafka-1에 해당합니다.

  4. 새 터미널을 열고 동일한 클러스터에 연결합니다.

    gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
    
  5. kafka-1 포드가 실행되는 노드를 찾습니다.

    kubectl get pod -n kafka kafka-1 -o wide
    

    출력은 다음과 비슷합니다.

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    위 출력에서 kafka-1 포드는 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 노드에서 실행됩니다.

  6. 노드를 드레이닝하여 포드를 제거합니다.

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    NODE를 kafka-1 포드가 실행되는 노드로 바꿉니다. 이 예시에서는 노드가 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72입니다.

    출력은 다음과 비슷합니다.

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. kafka-1 포드가 실행되는 노드를 찾습니다.

    kubectl get pod -n kafka kafka-1 -o wide
    

    출력은 다음과 비슷하게 표시됩니다.

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    위 출력에서는 애플리케이션이 새 노드에서 실행됩니다.

  8. kafka-client 포드에 연결된 터미널에서 topic-failover-test의 리더인 브로커를 결정합니다.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷하게 표시됩니다.

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    예시 출력에서 리더는 계속 1입니다. 하지만 이제 새 노드에서 실행됩니다.

Kafka 리더 오류 테스트

  1. Cloud Shell에서 Kafka 클라이언트에 연결하고 describe를 사용하여 topic1의 각 파티션에 대해 선택된 리더를 확인합니다.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. Kafka 클라이언트에 연결되지 않은 Cloud Shell에서 새 리더 선택을 강제하기 위해 kafka-0 리더 브로커를 삭제합니다. 이전 출력의 리더 중 하나에 매핑되는 색인을 삭제해야 합니다.

    kubectl delete pod -n kafka kafka-0 --force
    

    출력은 다음과 비슷합니다.

    pod "kafka-0" force deleted
    
  3. Kafka 클라이언트에 연결된 Cloud Shell에서 describe를 사용하여 선택된 리더를 확인합니다.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    출력에서 중단된 리더(kafka-0)에 할당된 경우 각 파티션의 새 리더가 변경됩니다. 이것은 포드가 삭제되고 다시 생성될 때 원래 리더가 교체되었음을 나타냅니다.

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

프로젝트 삭제

청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 프로젝트를 삭제하는 것입니다.

Delete a Google Cloud project:

gcloud projects delete PROJECT_ID

다음 단계