Kafka는 높은 볼륨, 높은 처리량, 실시간 스트리밍 데이터를 처리하기 위한 오픈소스 분산 게시-구독 메시징 시스템입니다. Kafka를 사용하면 처리 및 분석을 위해 여러 시스템 및 애플리케이션에서 데이터를 안정적으로 이동하는 스트리밍 데이터 파이프라인을 빌드할 수 있습니다.
이 튜토리얼은 Google Kubernetes Engine(GKE)에 고가용성 Kafka 클러스터 배포에 관심이 있는 플랫폼 관리자, 클라우드 설계자, 운영 전문가를 대상으로 합니다.
목표
이 튜토리얼에서는 다음과 같은 방법을 알아봅니다.- Terraform을 사용하여 리전별 GKE 클러스터를 만듭니다.
- 가용성이 높은 Kafka 클러스터를 배포합니다.
- Kafka 바이너리를 업그레이드합니다.
- Kafka 클러스터를 백업 및 복원합니다.
- GKE 노드 중단 및 Kafka 브로커 장애 조치를 시뮬레이션합니다.
아키텍처
이 섹션에서는 이 튜토리얼에서 빌드할 솔루션의 아키텍처를 설명합니다.
Kafka 클러스터는 Kafka 클라이언트(소비자)에 대해 들어오는 데이터 스트림 및 게시-구독 메시징을 처리하기 위해 함께 작동하는 하나 이상의 서버(브로커) 그룹입니다.
Kafka 클러스터의 각 데이터 파티션은 리더 브로커가 하나 있고 팔로어 브로커를 하나 이상 포함할 수 있습니다. 리더 브로커는 파티션에 대한 모든 읽기 쓰기를 처리합니다. 각 팔로어 브로커는 리더 브로커를 수동적으로 복제합니다.
일반적인 Kafka 설정에서는 Kafka 클러스터 조정을 위해 ZooKeeper라는 오픈소스 서비스도 사용됩니다. 이 서비스는 브로커 중에서 리더를 선별하고 오류 시 장애 조치를 트리거하는 데 도움이 됩니다.
이 튜토리얼에서는 Kafka 브로커 및 Zookeeper 서비스를 개별 StatefulSet로 구성하여 GKE에 Kafka 클러스터를 배포합니다. 고가용성 Kafka 클러스터를 프로비저닝하고 재해 복구를 준비하려면 개별 노드 풀 및 영역을 사용하도록 Kafka 및 Zookeeper StatefulSet를 구성합니다.
다음 다이어그램은 Kafka StatefulSet가 GKE 클러스터의 여러 노드 및 영역에서 실행되는 방법을 보여줍니다.
다음 다이어그램은 Zookeeper StatefulSet가 GKE 클러스터의 여러 노드 및 영역에서 실행되는 방법을 보여줍니다.
노드 프로비저닝 및 포드 예약
Autopilot 클러스터를 사용하는 경우 Autopilot은 워크로드에 대한 노드 프로비저닝 및 포드 예약을 처리합니다. 포드 안티어피니티를 사용하여 동일한 StatefulSet의 두 포드가 동일 노드 및 동일 영역에 예약되지 않도록 보장합니다.
Standard 클러스터를 사용하는 경우 포드 톨러레이션(toleration) 및 노드 어피니티를 구성해야 합니다. 자세한 내용은 전용 노드 풀에서 워크로드 격리를 참조하세요.
비용
이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.
이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.
시작하기 전에
프로젝트 설정
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
-
In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.
역할 설정
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
환경 설정하기
이 튜토리얼에서는 Cloud Shell을 사용하여 Google Cloud에서 호스팅되는 리소스를 관리합니다. Cloud Shell에는 Docker, kubectl
, gcloud CLI, Helm, Terraform을 포함하여 이 튜토리얼에 필요한 소프트웨어가 사전 설치되어 있습니다.
Cloud Shell로 환경을 설정하려면 다음 단계를 따르세요.
Google Cloud 콘솔에서 Cloud Shell 활성화를 클릭하여 Google Cloud 콘솔에서 Cloud Shell 세션을 시작합니다. 그러면 Google Cloud 콘솔 하단 창에서 세션이 시작됩니다.
환경 변수를 설정합니다.
export PROJECT_ID=PROJECT_ID export REGION=us-central1
다음 값을 바꿉니다.
- PROJECT_ID: Google Cloud 프로젝트 ID입니다.
기본 환경 변수를 설정합니다.
gcloud config set project PROJECT_ID
코드 저장소를 클론합니다.
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
작업 디렉터리로 변경합니다.
cd kubernetes-engine-samples/streaming/gke-stateful-kafka
클러스터 인프라 만들기
이 섹션에서는 Terraform 스크립트를 실행하여 2개의 리전 GKE 클러스터를 만듭니다.
기본 클러스터는 us-central1
에 배포됩니다.
클러스터를 만들려면 다음 단계를 따르세요.
Autopilot
Cloud Shell에서 다음 명령어를 실행합니다.
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
메시지가 표시되면 yes
를 입력합니다.
Standard
Cloud Shell에서 다음 명령어를 실행합니다.
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
메시지가 표시되면 yes
를 입력합니다.
Terraform 구성 파일은 인프라를 배포하기 위해 다음 리소스를 만듭니다.
- Docker 이미지를 저장할 Artifact Registry 저장소를 만듭니다.
- VM의 네트워크 인터페이스에 대한 VPC 네트워크 및 서브넷을 만듭니다.
- 두 개의 GKE 클러스터를 만듭니다.
Terraform은 두 리전에 비공개 클러스터를 만들고 재해 복구를 위해 Backup for GKE를 사용 설정합니다.
클러스터에 Kafka 배포
이 섹션에서는 Helm 차트를 사용하여 GKE에 Kafka를 배포합니다. 이 작업은 다음 리소스를 생성합니다.
- Kafka 및 Zookeeper StatefulSet
- Kafka 내보내기 도구 배포. 내보내기 도구는 Prometheus 소비에 대한 Kafka 측정항목을 수집합니다.
- 자발적 중단 중 오프라인 포드 수를 제한하는 포드 중단 예산(PDB)
Helm 차트를 사용하여 Kafka를 배포하려면 다음 단계를 수행합니다.
Docker 액세스를 구성합니다.
gcloud auth configure-docker us-docker.pkg.dev
Artifact Registry에 Kafka 및 Zookeeper 이미지를 채웁니다.
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
기본 클러스터에 대한
kubectl
명령줄 액세스를 구성합니다.gcloud container clusters get-credentials gke-kafka-us-central1 \ --region=${REGION} \ --project=${PROJECT_ID}
네임스페이스를 만듭니다.
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
Helm 차트 버전 20.0.6을 사용하여 Kafka를 설치합니다.
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
출력은 다음과 비슷합니다.
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
Kafka 복제본이 실행 중인지 확인합니다(몇 분 정도 걸릴 수 있음).
kubectl get all -n kafka
출력은 다음과 비슷합니다.
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
테스트 데이터 만들기
이 섹션에서는 Kafka 애플리케이션을 테스트하고 메시지를 생성합니다.
Kafka 애플리케이션과 상호작용하기 위해 소비자 클라이언트 포드를 만듭니다.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
파티션이 3개 있고 복제 계수가 3인
topic1
이라는 주제를 만듭니다.kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
주제 파티션이 3개 브로커에 모두 복제되었는지 확인합니다.
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
출력은 다음과 비슷합니다.
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
출력 예시에서
topic1
에는 3개 파티션이 있고, 각 파티션마다 서로 다른 리더 및 복제본 집합이 포함되어 있습니다. 이것은 확장성 및 내결함성 향상을 위해 Kafka가 파티션 나누기를 사용하여 여러 브로커에 데이터를 분산하기 때문입니다. 복제 계수 3은 각 파티션에 복제본을 3개 포함하여 한 두 개의 브로커가 실패하더라도 데이터를 계속 사용할 수 있게 보장합니다.다음 명령어를 실행하여 메시지 번호를
topic1
에 대량으로 생성합니다.ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
다음 명령어를 실행하여 모든 파티션에서
topic1
을 소비합니다.kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
CTRL+C
를 입력하여 소비자 프로세스를 중지합니다.
Kafka 벤치마크
사용 사례를 정확하게 모델링하기 위해서는 클러스터에서 예상 부하에 대한 시뮬레이션을 실행할 수 있습니다. 성능을 테스트하려면 bin
폴더에 있는 kafka-producer-perf-test.sh
및 kafka-consumer-perf-test.sh
스크립트와 같이 Kafka 패키지에 포함된 도구를 사용합니다.
벤치마킹 주제를 만듭니다.
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Kafka 클러스터에 부하를 만듭니다.
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
제작자가
topic-benchmark
에서 10,000,000개 레코드를 생성합니다. 출력은 다음과 비슷합니다.623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
모든 레코드가 전송되었으면 다음과 비슷하게 출력에 추가 측정항목이 표시됩니다.
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
확인을 종료하려면
CTRL + C
를 입력합니다.포드 셸을 종료합니다.
exit
업그레이드 관리
Kafka 및 Kubernetes의 버전 업데이트는 정기적으로 출시됩니다. 작업 권장사항에 따라 소프트웨어 환경을 정기적으로 업데이트하세요.
Kafka 바이너리 업그레이드 계획
이 섹션에서는 Helm을 사용하여 Kafka 이미지를 업데이트하고 주제를 계속 사용할 수 있는지 확인합니다.
클러스터에 Kafka 배포에서 사용한 Helm 차트로부터 이전 Kafka 버전에서 업그레이드하려면 다음 단계를 수행합니다.
Artifact Registry에 다음 이미지를 채웁니다.
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
업그레이드된 Kafka 및 Zookeeper 이미지로 Helm 차트를 배포하려면 다음 단계를 수행합니다. 버전 관련 안내는 Kafka 버전 업그레이드 안내를 참조하세요.
Chart.yaml
종속 항목 버전을 업데이트합니다.
../scripts/chart.sh kafka 20.1.0
다음 예시에 표시된 것처럼 새 Kafka 및 Zookeeper 이미지를 사용하여 Helm 차트를 배포합니다.
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
Kafka 포드가 업그레이드되는 것을 확인합니다.
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
확인을 종료하려면
CTRL + C
를 입력합니다.클라이언트 포드를 사용하여 Kafka 클러스터에 연결합니다.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
topic1
에서 메시지에 액세스할 수 있는지 확인합니다.kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
이전 단계에서 생성된 메시지가 출력에 표시됩니다. 프로세스를 종료하려면
CTRL+C
를 입력합니다.클라이언트 포드를 종료합니다.
exit
재해 복구 대비
서비스 중단 이벤트 발생 시 프로덕션 워크로드를 계속 사용할 수 있도록 하려면 재해 복구(DR) 계획을 준비해야 합니다. DR 계획에 대한 자세한 내용은 재해 복구 계획 가이드를 참조하세요.
GKE 클러스터에서 워크로드를 백업 및 복원하려면 Backup for GKE를 사용하면 됩니다.
Kafka 백업 및 복원 시나리오 예시
이 섹션에서는 gke-kafka-us-central1
에서 클러스터 백업을 가져오고 백업을 gke-kafka-us-west1
에 복원합니다. ProtectedApplication
커스텀 리소스를 사용하여 애플리케이션 범위에서 백업 및 복원 작업을 수행합니다.
다음 다이어그램은 재해 복구 솔루션의 구성요소와 상호 관계를 보여줍니다.
Kafka 클러스터 백업 및 복원을 준비하려면 다음 단계를 수행합니다.
환경 변수 설정
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
클러스터가
RUNNING
상태인지 확인합니다.gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
백업 계획을 만듭니다.
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
수동으로 백업을 만듭니다. 예약된 백업은 일반적으로 백업 계획에서 크론 일정으로 제어되지만 다음 예시는 일회성 백업 작업을 시작하는 방법을 보여줍니다.
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
복원 계획을 만듭니다.
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
백업에서 수동으로 복원합니다.
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
복원된 애플리케이션이 백업 클러스터에 표시되는 것을 확인합니다. 모든 포드가 실행되고 준비될 때까지 몇 분 정도 걸릴 수 있습니다.
gcloud container clusters get-credentials gke-kafka-us-west1 \ --region us-west1 kubectl get pod -n kafka --watch
모든 포드가 작동되어 실행되면
CTRL+C
를 입력하여 확인을 종료합니다.소비자가 이전 주제를 가져올 수 있는지 확인합니다.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
출력은 다음과 비슷합니다.
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
프로세스를 종료하려면
CTRL+C
를 입력합니다.포드를 종료합니다.
exit
Kafka 서비스 중단 시뮬레이션
이 섹션에서는 브로커를 호스팅하는 Kubernetes 노드를 교체하여 노드 오류를 시뮬레이션합니다. 이 섹션은 Standard 버전에만 적용됩니다. Autopilot에서는 노드가 자동으로 관리되므로 노드 오류를 시뮬레이션할 수 없습니다.
Kafka 애플리케이션에 연결하기 위해 클라이언트 포드를 만듭니다.
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
topic-failover-test
주제를 만들고 테스트 트래픽을 생성합니다.kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
topic-failover-test
주제 리더로 브로커를 결정합니다.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
출력은 다음과 비슷합니다.
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
위 출력에서
Leader: 1
은topic-failover-test
의 리더가 브로커 1입니다. 이것은 포드kafka-1
에 해당합니다.새 터미널을 열고 동일한 클러스터에 연결합니다.
gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
kafka-1
포드가 실행되는 노드를 찾습니다.kubectl get pod -n kafka kafka-1 -o wide
출력은 다음과 비슷합니다.
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
위 출력에서
kafka-1
포드는gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
노드에서 실행됩니다.노드를 드레이닝하여 포드를 제거합니다.
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
NODE를 kafka-1 포드가 실행되는 노드로 바꿉니다. 이 예시에서는 노드가
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
입니다.출력은 다음과 비슷합니다.
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
kafka-1
포드가 실행되는 노드를 찾습니다.kubectl get pod -n kafka kafka-1 -o wide
출력은 다음과 비슷하게 표시됩니다.
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
위 출력에서는 애플리케이션이 새 노드에서 실행됩니다.
kafka-client
포드에 연결된 터미널에서topic-failover-test
의 리더인 브로커를 결정합니다.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
출력은 다음과 비슷하게 표시됩니다.
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
예시 출력에서 리더는 계속 1입니다. 하지만 이제 새 노드에서 실행됩니다.
Kafka 리더 오류 테스트
Cloud Shell에서 Kafka 클라이언트에 연결하고
describe
를 사용하여topic1
의 각 파티션에 대해 선택된 리더를 확인합니다.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
출력은 다음과 비슷합니다.
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
Kafka 클라이언트에 연결되지 않은 Cloud Shell에서 새 리더 선택을 강제하기 위해
kafka-0
리더 브로커를 삭제합니다. 이전 출력의 리더 중 하나에 매핑되는 색인을 삭제해야 합니다.kubectl delete pod -n kafka kafka-0 --force
출력은 다음과 비슷합니다.
pod "kafka-0" force deleted
Kafka 클라이언트에 연결된 Cloud Shell에서
describe
를 사용하여 선택된 리더를 확인합니다.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
출력은 다음과 비슷합니다.
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
출력에서 중단된 리더(
kafka-0
)에 할당된 경우 각 파티션의 새 리더가 변경됩니다. 이것은 포드가 삭제되고 다시 생성될 때 원래 리더가 교체되었음을 나타냅니다.
삭제
이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.
프로젝트 삭제
청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 프로젝트를 삭제하는 것입니다.
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
다음 단계
- 완전 관리형 및 확장 가능한 메시징 서비스에 대해서는 Kafka에서 Pub/Sub로 마이그레이션을 참조하세요.