Confluent를 사용하여 GKE에 Apache Kafka 배포


이 가이드에서는 Kubernetes용 Confluent(CFK) 연산자를 사용하여 Google Kubernetes Engine(GKE)에 Apache Kafka 클러스터를 배포하는 방법을 설명합니다.

Kafka는 높은 볼륨, 높은 처리량, 실시간 스트리밍 데이터를 처리하기 위한 오픈소스 분산 게시-구독 메시징 시스템입니다. Kafka를 사용하면 처리 및 분석을 위해 여러 시스템 및 애플리케이션에서 데이터를 안정적으로 이동하는 스트리밍 데이터 파이프라인을 빌드할 수 있습니다.

이 가이드는 Kafka 클러스터를 GKE에 배포하는 데 관심이 있는 플랫폼 관리자, 클라우드 설계자, 운영 전문가를 대상으로 합니다.

또한 CFK 연산자를 사용하여 웹 기반 Confluent 제어 센터, 스키마 레지스트리 또는 KsqlDB와 같은 Confluent 플랫폼의 다른 구성요소를 배포할 수 있습니다. 그러나 이 가이드에서는 Kafka 배포만 설명합니다.

목표

  • Apache Kafka용 GKE 인프라 계획 및 배포
  • CFK 연산자 배포 및 구성
  • 가용성, 보안, 관측 가능성, 성능을 보장하기 위해 CFK 연산자를 사용하여 Apache Kafka 구성

이점

CFK는 다음과 같은 이점을 제공합니다.

  • 구성 변경에 대한 자동 순차적 업데이트
  • Kafka 가용성에 영향을 주지 않고 자동화된 순차적 업그레이드
  • 장애가 발생하면 CFK가 동일한 Kafka 브로커 ID, 구성, 영구 스토리지 볼륨을 사용하여 Kafka 포드 복원
  • 파티션의 복제본을 여러 랙(또는 영역)에 분산하여 랙 인식을 자동화하므로 Kafka 브로커의 가용성을 개선하고 데이터 손실 위험 제한
  • Prometheus로 집계된 측정항목 내보내기 지원

배포 아키텍처

Kafka 클러스터의 각 데이터 파티션은 리더 브로커가 하나 있고 팔로어 브로커를 하나 이상 포함할 수 있습니다. 리더 브로커는 파티션에 대한 모든 읽기 쓰기를 처리합니다. 각 팔로어 브로커는 리더 브로커를 수동적으로 복제합니다.

일반적인 Kafka 설정에서는 Kafka 클러스터 조정을 위해 ZooKeeper라는 오픈소스 서비스도 사용됩니다. 이 서비스는 브로커 중에서 리더를 선별하고 오류 시 장애 조치를 트리거하는 데 도움이 됩니다.

KRaft 모드를 활성화하여 Zookeeper를 사용하지 않고도 Kafka 구성을 배포할 수도 있지만 이 방법은 KafkaTopic 리소스, 사용자 인증 정보 인증 등을 지원하지 않으므로 프로덕션에 즉시 사용할 수 있는 것으로 간주되지 않습니다.

가용성 및 재해 복구

이 튜토리얼에서는 고가용성을 보장하고 재해 복구를 준비하도록 Kafka 클러스터와 ZooKeeper 클러스터에 별도의 노드 풀영역을 사용합니다.

Google Cloud에서 가용성이 높은 Kubernetes 클러스터는 여러 노드와 가용성 영역을 포괄하는 리전 클러스터를 사용합니다. 이 구성은 내결함성, 확장성, 지리적 중복성을 개선합니다. 이 구성을 통해 업타임 및 가용성을 위한 SLA를 제공하면서 순차적 업데이트 및 유지보수를 수행할 수 있습니다. 자세한 내용은 리전 클러스터를 참조하세요.

배포 다이어그램

다음 다이어그램에서는 GKE 클러스터의 여러 노드 및 영역에서 실행되는 Kafka 클러스터를 보여줍니다.

다이어그램에서 Kafka StatefulSet는 서로 다른 3개 영역의 노드 3개에 배포되어 있습니다. 필요한 포드 어피니티Kafka 커스텀 리소스 사양에서 토폴로지 분산 규칙을 설정하여 이 구성을 제어할 수 있습니다.

영역 하나가 실패하는 경우 권장 구성을 사용하면 GKE는 Kafka 및 Zookeeper 모두에 대해 새 노드에서 포드를 다시 예약하고 나머지 복제본에서 데이터를 복제합니다.

다음 다이어그램에서 서로 다른 영역 3개의 노드 3개에 배포된 ZooKeeper StatefulSet을 보여줍니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud CLI를 설치합니다.
  3. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  4. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager API를 사용 설정합니다.

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  7. Google Cloud CLI를 설치합니다.
  8. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  9. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  10. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  11. GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager API를 사용 설정합니다.

    gcloud services enable compute.googleapis.com iam.googleapis.com container.googleapis.com gkebackup.googleapis.com cloudresourcemanager.googleapis.com
  12. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. role/storage.objectViewer, role/logging.logWriter, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.

환경 준비

이 튜토리얼에서는 Cloud Shell을 사용하여 Google Cloud에서 호스팅되는 리소스를 관리합니다. Cloud Shell에는 kubectl, gcloud CLI, Helm, Terraform 등 이 튜토리얼에 필요한 소프트웨어가 사전 설치되어 있습니다.

Cloud Shell로 환경을 설정하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 Cloud Shell 활성화 아이콘Cloud Shell 활성화를 클릭하여 Google Cloud 콘솔에서 Cloud Shell 세션을 시작합니다. 그러면 Google Cloud 콘솔 하단 창에서 세션이 시작됩니다.

  2. 환경 변수를 설정합니다.

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    PROJECT_ID를 Google Cloud의 프로젝트 ID로 바꿉니다.

  3. GitHub 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. 작업 디렉터리로 변경합니다.

    cd kubernetes-engine-samples/streaming
    

클러스터 인프라 만들기

이 섹션에서는 Terraform 스크립트를 실행하여 가용성이 높은 비공개 리전 GKE 클러스터를 만듭니다. 다음 단계에서는 제어 영역에 대한 공개 액세스를 허용합니다. 액세스를 제한하려면 비공개 클러스터를 만듭니다.

Standard 또는 Autopilot 클러스터를 사용하여 연산자를 설치할 수 있습니다.

Standard

다음 다이어그램에서는 서로 다른 영역 3개에 배포된 비공개 리전 Standard GKE 클러스터를 보여줍니다.

이 인프라를 배포하려면 Cloud Shell에서 다음 명령어를 실행합니다.

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

메시지가 표시되면 yes를 입력합니다. 이 명령어가 완료되고 클러스터에 준비 상태가 표시되는 데 몇 분 정도 걸릴 수 있습니다.

Terraform에서 다음 리소스를 만듭니다.

  • Kubernetes 노드의 VPC 네트워크 및 비공개 서브넷
  • NAT를 통해 인터넷에 액세스할 수 있는 라우터
  • us-central1 리전의 비공개 GKE 클러스터
  • 자동 확장이 사용 설정된 노드 풀 2개(영역당 노드 1~2개, 영역당 최소 노드 1개)
  • 로깅 및 모니터링 권한이 있는 ServiceAccount
  • 재해 복구를 위한 Backup for GKE
  • 클러스터 모니터링을 위한 Google Cloud Managed Service for Prometheus

출력은 다음과 비슷합니다.

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Autopilot

다음 다이어그램에서는 비공개 리전 Autopilot GKE 클러스터를 보여줍니다.

인프라를 배포하려면 Cloud Shell에서 다음 명령어를 실행합니다.

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

메시지가 표시되면 yes를 입력합니다. 이 명령어가 완료되고 클러스터에 준비 상태가 표시되는 데 몇 분 정도 걸릴 수 있습니다.

Terraform에서 다음 리소스를 만듭니다.

  • Kubernetes 노드의 VPC 네트워크 및 비공개 서브넷
  • NAT를 통해 인터넷에 액세스할 수 있는 라우터
  • us-central1 리전의 비공개 GKE 클러스터
  • 로깅 및 모니터링 권한이 있는 ServiceAccount
  • 클러스터 모니터링을 위한 Google Cloud Managed Service for Prometheus

출력은 다음과 비슷합니다.

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

클러스터에 연결

클러스터와 통신하도록 kubectl을 구성합니다.

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

클러스터에 CFK 연산자 배포

이 섹션에서는 Helm 차트를 사용하여 Kubernetes용 Confluent(CFK) 연산자를 배포한 후 Kafka 클러스터를 배포합니다.

  1. Confluent Helm Chart 저장소를 추가합니다.

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. CFK 연산자 및 Kafka 클러스터의 네임스페이스를 추가합니다.

    kubectl create ns kafka
    
  3. Helm을 사용하여 CFK 클러스터 연산자를 배포합니다.

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    CFK가 모든 네임스페이스에서 리소스를 관리할 수 있게 하려면 Helm 명령어에 --set-namespaced=false 매개변수를 추가합니다.

  4. Confluent 연산자가 Helm을 사용하여 성공적으로 배포되었는지 확인합니다.

    helm ls -n kafka
    

    출력은 다음과 비슷합니다.

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

Kafka 배포

이 섹션에서는 기본 구성에 Kafka를 배포한 후 가용성, 보안, 관측 가능성 요구사항을 해결하기 위해 다양한 고급 구성 시나리오를 시도합니다.

기본 구성

Kafka 인스턴스의 기본 구성에는 다음 구성요소가 포함됩니다.

  • 클러스터 일관성에 필요한 사용할 수 있는 복제본이 최소 2개 있는 Kafka 브로커 복제본 3개
  • 클러스터를 형성하는 ZooKeeper 노드 복제본 3개
  • Kafka 리스너 두 개: 인증 없이 하나, CFK에서 생성된 인증서로 TLS 인증을 활용하는 하나
  • Kafka의 경우 자바 MaxHeapSizeMinHeapSize가 4GB로 설정
  • CPU 요청 1개 및 CPU 한도 2개의 CPU 리소스 할당, Kafka의 경우 5GB 메모리 요청 및 한도(기본 서비스의 경우 4GB, 측정항목 내보내기 도구의 경우 0.5GB), Zookeeper용 3GB(기본 서비스의 경우 2GB, 측정항목 내보내기 도구의 경우 0.5GB)
  • premium-rwo StorageClass를 사용하여 각 포드에 할당된 스토리지 100GB, Kafka 데이터의 경우 100GB, Zookeeper 데이터/로그의 경우 90/10
  • 노드 간에 올바른 배포를 보장하고 각 노드 풀과 다른 영역을 활용하는 각 워크로드에 구성된 톨러레이션(toleration), nodeAffinities, podAntiAffinities
  • 사용자가 제공한 인증 기관을 사용하여 자체 서명된 인증서로 보호되는 클러스터 내부의 통신

이 구성은 프로덕션에 사용 가능한 Kafka 클러스터를 만드는 데 필요한 최소 설정을 나타냅니다. 다음 섹션에서는 클러스터 보안, 액세스 제어 목록(ACL), 주제 관리, 인증서 관리 등과 같은 측면을 해결하는 커스텀 구성을 보여줍니다.

기본 Kafka 클러스터 만들기

  1. CA 쌍을 생성합니다.

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    Kubernetes용 Confluent는 TLS 네트워크 암호화에 사용할 Confluent 플랫폼 구성요소의 자동 생성 인증서를 제공합니다. 인증 기관(CA)을 생성하고 제공해야 합니다.

  2. 인증 기관의 Kubernetes 보안 비밀을 만듭니다.

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    보안 비밀의 이름은 사전 정의됩니다.

  3. 기본 구성을 사용하여 새 Kafka 클러스터를 만듭니다.

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    이 명령어는 CPU 및 메모리 요청과 한도, 블록 스토리지 요청, 프로비저닝된 포드를 Kubernetes 노드 간에 배포하기 위한 taint 및 어피니티를 포함하는 CFK 연산자의 Kafka 커스텀 리소스와 Zookeeper 커스텀 리소스를 만듭니다.

  4. Kubernetes에서 필요한 워크로드를 시작하는 동안 몇 분 정도 기다립니다.

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. Kafka 워크로드가 생성되었는지 확인합니다.

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    출력은 다음과 비슷합니다.

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

연산자는 다음 리소스를 만듭니다.

  • Kafka 및 ZooKeeper를 위한 StatefulSet 2개
  • Kafka 브로커 복제본을 위한 포드 3개
  • ZooKeeper 복제본을 위한 포드 3개
  • 클러스터 일관성을 위해 사용할 수 없는 최대 1개의 복제본을 보장하는 PodDisruptionBudget 리소스 2개
  • Kubernetes 클러스터 내에서 연결하는 Kafka 클라이언트의 부트스트랩 서버 역할을 하는 my-cluster 서비스. 모든 내부 Kafka 리스너가 이 서비스에서 제공됩니다.
  • Kafka 브로커가 클라이언트로 ZooKeeper 노드에 연결할 수 있는 zookeeper 서비스

인증 및 사용자 관리

이 섹션에서는 Kafka 리스너를 보호하고 클라이언트와 사용자 인증 정보를 공유하도록 인증과 승인을 사용 설정하는 방법을 보여줍니다.

Kubernetes용 Confluent는 다음과 같은 다양한 Kafka 인증 방법을 지원합니다.

  • SASL/PLAIN 인증: 클라이언트가 인증에 사용자 이름과 비밀번호를 사용합니다. 사용자 이름과 비밀번호는 서버 측에 Kubernetes 보안 비밀로 저장됩니다.
  • LDAP 인증이 있는 SASL/PLAIN: 클라이언트가 인증에 사용자 이름과 비밀번호를 사용합니다. 사용자 인증 정보는 LDAP 서버에 저장됩니다.
  • mTLS 인증: 클라이언트가 인증에 TLS 인증서를 사용합니다.

제한사항

  • CFK는 사용자 관리를 위한 커스텀 리소스를 제공하지 않습니다. 그러나 보안 비밀에 사용자 인증 정보를 저장하고 보안 비밀을 참조하여 리스너 사양에 포함할 수 있습니다.
  • ACL을 직접 관리할 수 있는 커스텀 리소스는 없지만 공식 Kubernetes용 Confluent는 Kafka CLI를 사용하여 ACL을 구성하는 방법을 안내합니다.

사용자 생성

이 섹션에서는 다음을 포함하여 사용자 관리 기능을 보여주는 CFK 연산자를 배포하는 방법을 보여줍니다.

  • 리스너 중 하나에서 비밀번호 기반 인증(SASL/PLAIN)이 사용 설정된 Kafka 클러스터
  • 복제본 3개가 있는 KafkaTopic
  • 읽기 및 쓰기 권한이 있는 사용자 인증 정보
  1. 사용자 인증 정보로 보안 비밀을 만듭니다.

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    사용자 인증 정보는 다음 형식으로 저장해야 합니다.

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. 포트 9094에서 비밀번호 기반 인증 SCRAM-SHA-512 인증으로 리스너를 사용하도록 Kafka 클러스터를 구성합니다.

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. 주제 및 클라이언트 포드를 설정하여 Kafka 클러스터와 상호작용하고 Kafka 명령어를 실행합니다.

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    GKE는 보안 비밀 my-user-credentials를 클라이언트 포드에 볼륨으로 마운트합니다.

  4. 클라이언트 포드가 준비되면 연결하고 제공된 사용자 인증 정보를 사용하여 메시지를 생성하고 사용하기 시작합니다.

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. my-user 사용자 인증 정보를 사용하여 메시지를 생성한 다음 메시지를 사용하여 수신을 확인합니다.

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    출력은 다음과 비슷합니다.

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    CTRL+C를 입력하여 소비자 프로세스를 중지합니다. Connect refused 오류가 발생하면 몇 분 정도 기다린 후 다시 시도하세요.

  6. 포드 셸을 종료합니다.

    exit
    

백업 및 재해 복구

Confluent 연산자를 사용하면 특정 패턴을 따라 효율적인 백업 전략을 구현할 수 있습니다.

Backup for GKE를 사용하여 백업할 수 있습니다.

  • Kubernetes 리소스 매니페스트
  • Confluent API 커스텀 리소스와 백업 중인 클러스터의 Kubernetes API 서버에서 추출된 정의
  • 매니페스트에서 찾은 PersistentVolumeClaim 리소스에 해당하는 볼륨

Backup for GKE를 사용하여 Kafka 클러스터를 백업 및 복원하는 방법에 대한 자세한 내용은 재해 복구 준비를 참조하세요.

Kafka 클러스터의 수동 백업을 수행할 수도 있습니다. 다음을 백업해야 합니다.

  • KafkaTopics 또는 Connect와 같은 Confluent API의 모든 커스텀 리소스가 포함된 Kafka 구성
  • Kafka 브로커의 PersistentVolume에 저장된 데이터

Confluent 구성을 포함하여 Kubernetes 리소스 매니페스트를 Git 저장소에 저장하면 필요할 때 리소스를 새 Kubernetes 클러스터에 다시 적용할 수 있으므로 Kafka 구성을 별도로 백업할 필요가 없습니다.

Kafka 서버 인스턴스 또는 Kafka가 배포된 Kubernetes 클러스터가 손실된 시나리오에서 Kafka 데이터 복구를 보호하려면 reclaimPolicy 옵션을 Retain으로 설정하여 Kafka 브로커의 볼륨을 프로비저닝하는 데 사용되는 Kubernetes 스토리지 클래스를 구성하는 것이 좋습니다. 또한 Kafka 브로커 볼륨의 스냅샷을 만드는 것이 좋습니다.

다음 매니페스트에서는 reclaimPolicy 옵션 Retain을 사용하는 StorageClass를 설명합니다.

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

다음 예시에서는 Kafka 클러스터 커스텀 리소스의 spec에 추가된 StorageClass를 보여줍니다.

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

이 구성을 사용하면 스토리지 클래스를 사용하여 프로비저닝된 PersistentVolume은 해당 PersistentVolumeClaim이 삭제되더라도 삭제되지 않습니다.

기존 구성과 브로커 인스턴스 데이터를 사용하여 새 Kubernetes 클러스터에서 Kafka 인스턴스를 복구하려면 다음 안내를 따르세요.

  1. 기존 Confluent 커스텀 리소스(Kafka, KafkaTopic, Zookeeper 등)를 새 Kubernetes 클러스터에 적용합니다.
  2. PersistentVolumeClaim에서 spec.volumeName 속성을 사용하여 새 Kafka 브로커 인스턴스 이름이 있는 PersistentVolumeClaim을 이전 PersistentVolume으로 업데이트합니다.

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

프로젝트 삭제

    Google Cloud 프로젝트를 삭제합니다.

    gcloud projects delete PROJECT_ID

개별 리소스 삭제

기존 프로젝트를 사용한 경우 삭제하지 않으려면 개별 리소스를 삭제합니다.

  1. 환경 변수를 설정합니다.

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    
  2. terraform destroy 명령어를 실행합니다.

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID}   \
      -var region=${REGION}  \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
    

    FOLDERgke-autopilot 또는 gke-standard로 바꿉니다.

    메시지가 표시되면 yes를 입력합니다.

  3. 연결되지 않은 모든 디스크를 찾습니다.

    export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
    
  4. 디스크를 삭제합니다.

    for i in $disk_list; do
      disk_name=$(echo $i| cut -d'|' -f1)
      disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||')
      echo "Deleting $disk_name"
      gcloud compute disks delete $disk_name --zone $disk_zone --quiet
    done
    

다음 단계

  • Google Cloud에 대한 참조 아키텍처, 다이어그램, 권장사항 살펴보기 Cloud 아키텍처 센터 살펴보기