Kafka 소비자 워크로드 자동 확장

이 튜토리얼에서는 Kafka 자동 확장 처리를 Cloud Run 서비스로 구성하고 배포하는 방법을 보여줍니다. 이 자동 확장 처리는 Cloud Run 작업자 풀 배포와 같은 Kafka 소비자 워크로드의 확장 로직을 수행합니다. Kafka 자동 확장 처리는 Kafka 클러스터에서 측정항목을 읽고 Cloud Run 작업자 풀 또는 서비스의 수동 확장을 사용하여 Kafka 소비자 지연 측정항목에 따라 Kafka 소비자 워크로드를 확장합니다.

다음 다이어그램은 Kafka 자동 확장 처리 서비스가 Kafka 클러스터에서 측정항목을 읽어 Kafka 소비자 작업자 풀을 자동 확장하는 방법을 보여줍니다.

Kafka 자동 확장 처리 서비스가 Kafka에서 측정항목을 가져와 Kafka 소비자를 자동 확장합니다.

필요한 역할

이 서비스를 배포하고 실행하는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.

시작하기 전에

Kafka 자동 확장 처리를 구성하고 사용하려면 다음 리소스가 있어야 합니다.

  • Kafka 클러스터
  • 배포된 소비자

Kafka 클러스터

배포된 Cloud Run 소비자

  • Kafka 소비자 워크로드는 Cloud Run에 서비스 또는 작업자 풀로 배포해야 합니다. Kafka 클러스터, 주제, 소비자 그룹에 연결하도록 구성해야 합니다. Kafka 소비자의 예는 Cloud Run Kafka 자동 확장 처리 예시 소비자를 참조하세요.
  • 소비자 워크로드는 Kafka 클러스터와 동일한 Google Cloud 프로젝트에 있어야 합니다.

권장사항

  • 직접 VPC를 사용하여 Kafka 소비자를 VPC 네트워크에 연결합니다. 직접 VPC를 사용하면 비공개 IP 주소를 사용하여 Kafka 클러스터에 연결할 수 있으며 VPC 네트워크에서 트래픽을 유지할 수도 있습니다.
  • 소비자가 이벤트를 가져오는지 확인하는 Kafka 소비자의 활성 상태 점검을 구성합니다. 상태 점검은 비정상 인스턴스가 이벤트 처리를 중지하는 경우 컨테이너가 비정상 종료되지 않더라도 자동으로 다시 시작되도록 하는 데 도움이 됩니다.

Kafka 자동 확장 처리 빌드

Cloud Build를 사용하여 소스 코드에서 Kafka 자동 확장 처리의 컨테이너 이미지를 빌드할 수 있습니다.

  1. 저장소를 복제합니다.

    git clone https://github.com/GoogleCloudPlatform/cloud-run-kafka-scaler.git
    
  2. 저장소 폴더로 이동합니다.

    cd cloud-run-kafka-scaler
    

출력 이미지 이름을 지정하려면 포함된 cloudbuild.yaml 파일에서 %ARTIFACT_REGISTRY_IMAGE%를 업데이트합니다(예: us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler).

gcloud builds submit --tag us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler

이 명령어는 컨테이너 이미지를 빌드하고 이를 Artifact Registry에 푸시합니다. 나중에 필요하므로 전체 이미지 경로(SCALER_IMAGE_PATH)를 기록합니다.

결과 이미지는 로컬에서 실행되지 않습니다. Java 기본 이미지 위에 레이어로 지정됩니다. 로컬에서 실행하기 위해 컨테이너 이미지를 재조합하는 방법을 비롯한 자세한 내용은 기본 이미지 자동 업데이트 구성을 참조하세요.

Kafka 자동 확장 처리 구성 정의

보안 비밀을 사용하여 Kafka 자동 확장 처리를 구성할 수 있습니다. 자동 확장 처리는 구성을 주기적으로 새로고침하므로 자동 확장 처리를 재배포하지 않고도 새 보안 비밀 버전을 푸시하여 구성을 변경할 수 있습니다.

Kafka 클라이언트 속성 구성

Kafka 자동 확장 처리를 배포할 때 보안 비밀을 볼륨으로 마운트하여 Kafka Admin API에 대한 연결을 구성할 수 있습니다.

kafka_client_config.txt라는 파일을 만들고 추가하려는 Kafka 관리 클라이언트 구성 속성을 포함합니다. bootstrap.servers 속성은 필수입니다.

bootstrap.servers=BOOTSTRAP_SERVER_LIST

BOOTSTRAP_SERVER_LIST를 Kafka 클러스터의 HOST:PORT 목록으로 바꿉니다.

Kafka 인증 구성

Kafka 서버에 인증이 필요한 경우 kafka_client_config.txt 파일에 필요한 구성 속성을 포함합니다. 예를 들어 Google OAuth를 사용하여 애플리케이션 기본 사용자 인증 정보Apache Kafka용 관리형 서비스 클러스터에 연결하려면 이 보안 비밀에 다음 속성이 포함되어야 합니다.

bootstrap.servers=BOOTSTRAP_SERVER_LIST
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

BOOTSTRAP_SERVER_LIST를 Kafka 클러스터의 HOST:PORT 목록으로 바꿉니다.

Apache Kafka용 관리형 서비스 클러스터에서 애플리케이션 기본 사용자 인증 정보를 사용하려면 Kafka 자동 확장 처리 서비스 계정에 관리형 Kafka 클라이언트(roles/managedkafka.client) 역할도 부여해야 합니다.

gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/managedkafka.client"

다음을 바꿉니다.

  • SCALER_SERVICE_ACCOUNT: Kafka 자동 확장 처리 서비스 계정의 이름입니다.
  • PROJECT_ID: Kafka 자동 확장 처리 서비스의 프로젝트 ID입니다.

배포 시 볼륨으로 마운트될 보안 비밀을 만들려면 kafka_client_config.txt 파일을 사용합니다.

gcloud secrets create ADMIN_CLIENT_SECRET_NAME --data-file=kafka_client_config.txt

ADMIN_CLIENT_SECRET_NAMEKafka 인증 보안 비밀의 이름으로 바꿉니다.

확장 구성

Kafka 자동 확장 처리는 /scaler-config/scaling 볼륨에서 확장 구성을 읽습니다. 이 볼륨의 콘텐츠는 YAML 형식이어야 합니다. 이 구성을 위해 보안 비밀 볼륨을 마운트하는 것이 좋습니다.

다음 구성으로 scaling_config.yaml이라는 파일을 만듭니다.

spec:
  scaleTargetRef:
    name: projects/PROJECT_ID/locations/REGION/workerpools/CONSUMER_SERVICE_NAME
 metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: TARGET_CPU_UTILIZATION
        activationThreshold: CPU_ACTIVATION_THRESHOLD
        tolerance: CPU_TOLERANCE
        windowSeconds: CPU_METRIC_WINDOW
  - type: External
    external:
      metric:
        name: consumer_lag
      target:
        type: AverageValue
        averageValue: LAG_THRESHOLD
        activationThreshold: LAG_ACTIVATION_THRESHOLD
        tolerance: LAG_TOLERANCE

다음을 바꿉니다.

  • PROJECT_ID: 자동 확장할 Kafka 소비자 워크로드의 프로젝트 ID입니다.
  • REGION: 자동 확장할 Kafka 소비자 워크로드의 리전입니다.
  • CONSUMER_SERVICE_NAME: 자동 확장할 Kafka 소비자 워크로드의 이름입니다.
  • TARGET_CPU_UTILIZATION: 자동 확장 계산의 대상 CPU 사용률입니다(예: 60).
  • LAG_THRESHOLD: 자동 확장을 트리거하는 consumer_lag 측정항목의 기준입니다(예: 1000).
  • (선택사항) CPU_ACTIVATION_THRESHOLD: CPU의 활성화 기준점입니다. 모든 측정항목이 비활성 상태이면 대상 소비자가 0으로 축소됩니다. 기본값은 0입니다.
  • (선택사항) CPU_TOLERANCE: 지정된 범위 내에 있는 경우 확장 변경을 방지하는 기준점입니다. 대상 CPU 사용률의 백분율로 표시됩니다. 기본값은 0.1입니다.
  • (선택사항) CPU_METRIC_WINDOW: 평균 CPU 사용량이 계산되는 기간(초)입니다. 기본값은 120입니다.
  • (선택사항) LAG_ACTIVATION_THRESHOLD: consumer_lag 측정항목의 활성화 기준점입니다. 모든 측정항목이 비활성 상태이면 대상 소비자가 0으로 축소됩니다. 기본값은 0입니다.
  • (선택사항) LAG_TOLERANCE: 지정된 범위 내에 있는 경우 확장 변경을 방지하는 기준점입니다. 대상 소비자 지연의 백분율로 표현됩니다. 기본값은 0.1입니다.

원하는 경우 behavior: 블록을 사용하여 고급 확장 속성을 구성할 수 있습니다. 이 블록은 Kubernetes HPA 확장 정책과 동일한 여러 속성을 지원합니다.

behavior 블록을 지정하지 않으면 다음 기본 구성이 사용됩니다.

behavior:
  scaleDown:
    stabilizationWindowSeconds: 300
    policies:
    - type: Percent
      value: 50
      periodSeconds: 30
    selectPolicy: Min
  scaleUp:
    stabilizationWindowSeconds: 0
    policies:
    - type: Percent
      value: 100
      periodSeconds: 15
    - type: Instances
      value: 4
      periodSeconds: 15
    selectPolicy: Max

배포에 마운트될 보안 비밀 볼륨을 만들려면 구성을 scaling_config.yaml이라는 파일에 복사한 후 다음을 실행합니다.

gcloud secrets create SCALING_CONFIG_SECRET_NAME --data-file=scaling_config.yaml

SCALING_CONFIG_SECRET_NAME확장 보안 비밀의 이름으로 바꿉니다.

Kafka 자동 확장 처리 배포

기본 요건을 완료한 후 Kafka 자동 확장 처리 서비스와 지원 인프라를 배포할 수 있습니다. 이 프로세스를 간소화하기 위해 Terraform 모듈과 셸 스크립트가 제공됩니다.

gcloud

이 섹션에서는 자동 확장 처리를 수동으로 배포하는 데 필요한 각 gcloud 명령어를 설명합니다. 대부분의 경우 셸 스크립트 또는 Terraform 모듈을 대신 사용하는 것이 좋습니다.

서비스 계정 만들기

서비스 계정 요구사항은 사용자가 구성한 자동 확장 검사 간격에 따라 다릅니다. 유연한 간격으로 자동 확장 검사를 실행하도록 Kafka 자동 확장 처리를 구성할 수 있습니다.

  • 1분 이상: Cloud Scheduler가 선택된 간격으로 POST 요청을 사용하여 자동 확장 검사를 트리거합니다.
  • 1분 미만: Cloud Scheduler가 구성된 빈도에 따라 1분마다 여러 Cloud Tasks 생성을 트리거합니다.

1분 이상

Kafka 자동 확장 처리 서비스 계정

Kafka 자동 확장 처리의 서비스 계정을 만듭니다.

gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT

SCALER_SERVICE_ACCOUNT를 Kafka 자동 확장 처리 서비스 계정의 이름으로 바꿉니다.

Kafka 자동 확장 처리가 Kafka 소비자 인스턴스 수를 업데이트하려면 다음 권한이 필요합니다.

  • Kafka 소비자 서비스 계정에 대한 iam.serviceaccounts.actAs
  • Kafka 소비자 이미지가 포함된 저장소에 대한 roles/artifactregistry.reader
  • run.workerpools.getrun.workerpools.update. 이러한 권한은 Cloud Run 관리자 역할(roles/run.admin)에 포함되어 있습니다.
  • 확장 및 Kafka 인증 보안 비밀 모두에 대한 roles/secretmanager.secretAccessor
  • Kafka 소비자 프로젝트에 대한 roles/monitoring.viewer. 이 역할은 CPU 사용률 측정항목을 읽는 데 필요합니다.
  • Kafka 소비자 프로젝트에 대한 roles/monitoring.metricWriter. 이 역할은 선택사항이지만 이를 통해 자동 확장 처리에서 더 나은 모니터링 가능성을 위해 커스텀 측정항목을 내보낼 수 있습니다.
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/artifactregistry.reader" \
    --location=REPO_REGION

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.admin"

gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.viewer" \
    --condition=None

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.metricWriter" \
    --condition=None

다음을 바꿉니다.

  • PROJECT_ID: Kafka 자동 확장 처리 서비스가 있는 프로젝트 ID입니다.
  • CONSUMER_SERVICE_ACCOUNT_EMAIL: Kafka 소비자의 서비스 계정 이메일입니다. 예: example@PROJECT-ID.iam.gserviceaccount.com
  • SCALER_SERVICE_ACCOUNT: Kafka 자동 확장 처리의 서비스 계정입니다.
  • ADMIN_CLIENT_SECRET_NAME: Kafka 인증 보안 비밀의 이름입니다.
  • SCALING_CONFIG_SECRET_NAME: 확장 보안 비밀의 이름입니다.
  • CONSUMER_IMAGE_REPO: Kafka 소비자용 컨테이너 이미지가 있는 저장소의 ID 또는 정규화된 식별자입니다.
  • REPO_REGION: 소비자 이미지 저장소의 위치입니다.

1분 미만

Cloud Tasks 설정

Cloud Scheduler는 1분 이상의 간격으로만 트리거할 수 있습니다. 간격이 1분 미만인 경우 Cloud Tasks를 사용하여 Kafka 자동 확장 처리를 트리거합니다. Cloud Tasks를 설정하려면 다음이 필요합니다.

  • 자동 확장 검사 태스크를 위한 Cloud Tasks 큐를 만듭니다.
  • Cloud Tasks가 Cloud Run 호출자 역할로 Kafka 자동 확장 처리를 호출하는 데 사용하는 서비스 계정을 만듭니다.
gcloud tasks queues create CLOUD_TASKS_QUEUE_NAME \
--location=REGION
gcloud iam service-accounts create TASKS_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
    --member="serviceAccount:TASKS_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.invoker"

다음을 바꿉니다.

  • CLOUD_TASKS_QUEUE_NAME: 자동 확장 검사를 트리거하기 위해 구성된 Cloud Tasks 큐입니다.
  • TASKS_SERVICE_ACCOUNT: Cloud Tasks가 자동 확장 검사를 트리거하는 데 사용해야 하는 서비스 계정입니다.
  • SCALER_SERVICE_NAME: Kafka 자동 확장 처리 서비스의 이름입니다.
  • PROJECT_ID: Kafka 자동 확장 처리 서비스의 프로젝트 ID입니다.
  • REGION: Kafka 자동 확장 처리 서비스의 위치입니다.

Kafka 자동 확장 처리 서비스 계정 설정

Kafka 자동 확장 처리의 서비스 계정을 만듭니다.

gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT

SCALER_SERVICE_ACCOUNT를 Kafka 자동 확장 처리 서비스 계정의 이름으로 바꿉니다.

Kafka 소비자 인스턴스 수를 업데이트하고 자동 확장 검사 태스크를 만들려면 Kafka 자동 확장 처리에 다음 권한이 필요합니다.

  • Kafka 소비자 서비스 계정에 대한 iam.serviceaccounts.actAs
  • Kafka 소비자 이미지가 포함된 저장소에 대한 roles/artifactregistry.reader
  • run.workerpools.getrun.workerpools.update. 이러한 권한은 Cloud Run 관리자 역할(roles/run.admin)에 포함되어 있습니다.
  • 확장 및 Kafka 인증을 위한 보안 비밀 모두에 대한 roles/secretmanager.secretAccessor
  • Kafka 소비자 프로젝트에 대한 roles/monitoring.viewer. 이 역할은 CPU 사용률 측정항목을 읽는 데 필요합니다.
  • Kafka 소비자 프로젝트에 대한 roles/monitoring.metricWriter. 이 역할은 선택사항이지만 이를 통해 자동 확장 처리에서 더 나은 모니터링 가능성을 위해 커스텀 측정항목을 내보낼 수 있습니다.
  • Cloud Tasks 큐 추가자 역할(roles/cloudtasks.enqueuer)
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/artifactregistry.reader" \
    --location=REPO_REGION

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/run.admin"

gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
  --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.viewer" \
    --condition=None

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/monitoring.metricWriter" \
    --condition=None

gcloud tasks queues add-iam-policy-binding CLOUD_TASKS_QUEUE_NAME \
    --member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/cloudtasks.enqueuer" \
    --location=REGION

다음을 바꿉니다.

  • PROJECT_ID: Kafka 자동 확장 처리 서비스가 있는 프로젝트 ID입니다.
  • CONSUMER_SERVICE_ACCOUNT_EMAIL: Kafka 소비자의 서비스 계정 이메일입니다. 예: example@PROJECT_ID.iam.gserviceaccount.com
  • SCALER_SERVICE_ACCOUNT: Kafka 자동 확장 처리의 서비스 계정입니다.
  • CONSUMER_IMAGE_REPO: Kafka 소비자용 컨테이너 이미지가 있는 저장소의 ID 또는 정규화된 식별자입니다.
  • ADMIN_CLIENT_SECRET_NAME: Kafka 인증 보안 비밀의 이름입니다.
  • SCALING_CONFIG_SECRET_NAME: 확장 보안 비밀의 이름입니다.
  • REPO_REGION: 소비자 이미지 저장소의 위치입니다.
  • CLOUD_TASKS_QUEUE_NAME: 자동 확장 검사를 트리거하기 위해 구성된 Cloud Tasks 큐입니다.
  • REGION: Kafka 자동 확장 처리 서비스의 위치입니다.

환경 변수 구성

1분 이상

Kafka 자동 확장 처리는 환경 변수를 사용하여 Kafka 소비자 및 대상 워크로드의 기타 측면을 지정합니다. 보안을 위해 민감한 정보를 보안 비밀로 구성하는 것이 좋습니다.

다음 변수를 사용하여 scaler_env_vars.yaml이라는 YAML 파일을 만듭니다.

KAFKA_TOPIC_ID: KAFKA_TOPIC_ID
CONSUMER_GROUP_ID: CONSUMER_GROUP_ID
CYCLE_SECONDS: CYCLE_SECONDS
OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS

다음을 바꿉니다.

  • KAFKA_TOPIC_ID: Kafka 소비자가 구독하는 주제 ID입니다.
  • CONSUMER_GROUP_ID: 대상 Kafka 소비자가 사용하는 소비자 그룹 ID입니다. 이러한 값은 일치해야 하며, 그렇지 않으면 자동 확장이 실패합니다.
  • CYCLE_SECONDS: 자동 확장 처리 주기(초)입니다.
  • OUTPUT_SCALER_METRICS: 측정항목을 사용 설정하는 설정입니다. 커스텀 측정항목 출력을 사용 설정하려면 값을 true로 설정하고, 그렇지 않으면 false로 설정합니다.

1분 미만

Kafka 자동 확장 처리는 환경 변수를 사용하여 Kafka 소비자 및 대상 워크로드의 기타 측면을 지정합니다. 보안을 위해 민감한 정보를 보안 비밀로 구성하는 것이 좋습니다.

다음 변수를 사용하여 scaler_env_vars.yaml이라는 YAML 파일을 만듭니다.

KAFKA_TOPIC_ID: KAFKA_TOPIC_ID
CONSUMER_GROUP_ID: CONSUMER_GROUP_ID
CYCLE_SECONDS: CYCLE_SECONDS
OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS
FULLY_QUALIFIED_CLOUD_TASKS_QUEUE_NAME: CLOUD_TASKS_QUEUE_NAME
INVOKER_SERVICE_ACCOUNT_EMAIL: TASKS_SERVICE_ACCOUNT_EMAIL

다음을 바꿉니다.

  • KAFKA_TOPIC_ID: Kafka 소비자가 구독하는 주제 ID입니다.
  • CONSUMER_GROUP_ID: 대상 Kafka 소비자가 사용하는 소비자 그룹 ID입니다. 이러한 값은 일치해야 하며, 그렇지 않으면 자동 확장이 실패합니다.
  • CYCLE_SECONDS: 자동 확장 처리 주기(초)입니다.
  • OUTPUT_SCALER_METRICS: 측정항목을 사용 설정하는 설정입니다. 커스텀 측정항목 출력을 사용 설정하려면 값을 true로 설정하고, 그렇지 않으면 false로 설정합니다.
  • CLOUD_TASKS_QUEUE_NAME: 자동 확장 검사를 트리거하는 Cloud Tasks 큐의 정규화된 이름입니다. projects/$PROJECT_ID/locations/$REGION/queues/$CLOUD_TASKS_QUEUE_NAME 형식을 사용합니다.
  • TASKS_SERVICE_ACCOUNT_EMAIL: Cloud Tasks가 자동 확장 검사를 트리거하는 데 사용해야 하는 서비스 계정입니다. 예: example@PROJECT_ID.iam.gserviceaccount.com

제공된 이미지를 사용하여 Kafka 자동 확장 처리를 배포하고 scaler_env_vars.yaml 파일 및 보안 비밀 볼륨 마운트를 사용하여 Kafka VPC에 연결합니다.

gcloud run deploy SCALER_SERVICE_NAME \
    --image=SCALER_IMAGE_URI \
    --env-vars-file=scaler_env_vars.yaml \
    --service-account=SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --no-allow-unauthenticated \
    --network=KAFKA_VPC_NETWORK \
    --subnet=KAFKA_VPC_SUBNET \
    --update-secrets=/kafka-config/kafka-client-properties=ADMIN_CLIENT_SECRET_NAME:latest \
    --update-secrets=/scaler-config/scaling=SCALING_CONFIG_SECRET_NAME:latest
    --labels=created-by=kafka-autoscaler

다음을 바꿉니다.

  • SCALER_IMAGE_URI: Kafka 자동 확장 처리 이미지의 URI입니다.
  • SCALER_SERVICE_NAME: Kafka 자동 확장 처리 서비스의 이름입니다.
  • SCALER_SERVICE_ACCOUNT: Kafka 자동 확장 처리 서비스 계정의 이름입니다.
  • PROJECT_ID: Kafka 자동 확장 처리 서비스의 프로젝트 ID입니다.
  • KAFKA_VPC_NETWORK: Kafka 클러스터에 연결된 VPC 네트워크입니다.
  • KAFKA_VPC_SUBNET: Kafka 클러스터에 연결된 VPC 서브넷입니다.
  • ADMIN_CLIENT_SECRET_NAME: Kafka 인증 보안 비밀의 이름입니다.
  • SCALING_CONFIG_SECRET_NAME: 확장 보안 비밀의 이름입니다.

주기적인 자동 확장 검사 설정

이 섹션에서는 Cloud Scheduler를 사용하여 주기적인 자동 확장 검사를 트리거합니다.

  • 1분 이상: 선택한 간격으로 트리거되도록 Cloud Scheduler를 구성합니다.
  • 1분 미만: 매분 트리거되도록 Cloud Scheduler를 구성합니다.
호출자 서비스 계정 만들기

Cloud Scheduler가 Kafka 자동 확장 처리를 호출할 수 있도록 하려면 Kafka 자동 확장 처리 서비스에 대한 호출자 역할(roles/run.invoker)이 있는 서비스 계정을 만들어야 합니다.

gcloud iam service-accounts create SCALER_INVOKER_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
  --member="serviceAccount:SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/run.invoker"

다음을 바꿉니다.

  • SCALER_SERVICE_NAME: Kafka 자동 확장 처리 서비스의 이름입니다.
  • SCALER_INVOKER_SERVICE_ACCOUNT: 호출자 서비스 계정의 이름입니다.
  • PROJECT_ID: Kafka 자동 확장 처리 서비스의 프로젝트 ID입니다.
Cloud Scheduler 작업 만들기

1분 이상

선택한 자동 확장 검사 간격으로 Cloud Scheduler 작업을 만듭니다.

gcloud scheduler jobs create http kafka-scaling-check \
    --location=REGION \
    --schedule="CRON_SCHEDULE" \
    --time-zone="TIMEZONE" \
    --uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
    --oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --http-method=POST

다음을 바꿉니다.

  • SCALER_SERVICE_NAME: Kafka 자동 확장 처리 서비스의 이름입니다.
  • SCALER_INVOKER_SERVICE_ACCOUNT: 호출자 서비스 계정의 이름입니다.
  • PROJECT_ID: 프로젝트 ID 또는 Kafka 자동 확장 처리 서비스입니다.
  • PROJECT_NUMBER: Kafka 자동 확장 처리 서비스의 프로젝트 번호입니다.
  • REGION: Kafka 자동 확장 처리 서비스의 위치입니다.
  • TIMEZONE: 시간대입니다(예: America/Los_Angeles).
  • CRON_SCHEDULE: Crontab 형식의 선택된 일정입니다. 예를 들어 매분의 경우 "* * * * *"입니다.

1분 미만

매분 실행되는 Cloud Scheduler 작업을 만듭니다.

gcloud scheduler jobs create http kafka-scaling-check \
    --location=REGION \
    --schedule="* * * * *" \
    --time-zone="TIMEZONE" \
    --uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
    --oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
    --http-method=POST

다음을 바꿉니다.

  • SCALER_SERVICE_NAME: Kafka 자동 확장 처리 서비스의 이름입니다.
  • SCALER_INVOKER_SERVICE_ACCOUNT: 호출자 서비스 계정의 이름입니다.
  • PROJECT_ID: Kafka 자동 확장 처리 서비스의 프로젝트 ID입니다.
  • PROJECT_NUMBER: Kafka 자동 확장 처리 서비스의 프로젝트 번호입니다.
  • REGION: Kafka 자동 확장 처리 서비스의 위치입니다.
  • TIMEZONE: 시간대입니다(예: America/Los_Angeles).

terraform

terraform/ 디렉터리에는 Kafka 자동 확장 처리 및 관련 리소스를 프로비저닝하는 데 사용할 수 있는 재사용 가능한 Terraform 모듈이 포함되어 있습니다.

이 모듈은 다음 항목의 생성을 자동화합니다.

  • Kafka 자동 확장 처리 Cloud Run 서비스
  • 지원 서비스 계정 및 IAM 바인딩
  • Cloud Tasks 큐
  • Cloud Scheduler 작업

자세한 안내, 사용 예시, 모든 입력/출력 변수에 대한 설명은 terraform readme를 참조하세요.

프로젝트 ID, 리전, 소비자 SA 이메일, 보안 비밀 이름, 확장 처리 이미지 경로, 주제 ID와 같은 기본 요건의 세부정보를 포함하여 필요한 변수를 Terraform 모듈에 제공해야 합니다.

자동 확장 처리와 함께 setup_kafka_scaler.sh 스크립트가 제공되어 필요한 모든 리소스를 자동으로 만들고 구성합니다.

환경 변수 설정하기

스크립트를 실행하기 전에 필요한 모든 환경 변수를 설정했는지 확인하세요.

# Details for already-deployed Kafka consumer
export PROJECT_ID=PROJECT_ID
export REGION=REGION
export CONSUMER_SERVICE_NAME=DEPLOYED_KAFKA_CONSUMER
export CONSUMER_SA_EMAIL=KAFKA_CONSUMER_ACCOUNT_EMAIL # For example, NAME@PROJECT_ID.iam.gserviceaccount.com
export TOPIC_ID=KAFKA_TOPIC_ID
export CONSUMER_GROUP_ID=KAFKA_CONSUMER_GROUP_ID
export NETWORK=VPC_NETWORK
export SUBNET=VPC_SUBNET

# Details for new items to be created during this setup
export CLOUD_TASKS_QUEUE_NAME=CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS
export TASKS_SERVICE_ACCOUNT=TASKS_SERVICE_ACCOUNT_NAME

export SCALER_SERVICE_NAME=KAFKA_AUTOSCALER_SERVICE_NAME
export SCALER_IMAGE_PATH=KAFKA_AUTOSCALER_IMAGE_URI
export SCALER_CONFIG_SECRET=KAFKA_AUTOSCALER_CONFIG_SECRET_NAME

export CYCLE_SECONDS=SCALER_CHECK_FREQUENCY # For example, 15; this value should be at least 5 seconds.

export OUTPUT_SCALER_METRICS=false # If you want scaling metrics to outputted to Cloud Monitoring set this to true and ensure your scaler service account has permission to write metrics (for example, via roles/monitoring.metricWriter).

다음을 바꿉니다.

  • PROJECT_ID: Kafka 자동 확장 처리 서비스가 있는 프로젝트 ID입니다.
  • REGION: Kafka 자동 확장 처리 서비스의 위치입니다.
  • DEPLOYED_KAFKA_CONSUMER: Kafka 소비자 이름입니다.
  • KAFKA_CONSUMER_ACCOUNT_EMAIL: Kafka 소비자의 서비스 계정 이메일입니다.
  • KAFKA_TOPIC_ID: Kafka 소비자가 구독하는 주제 ID입니다.
  • KAFKA_CONSUMER_GROUP_ID: 대상 Kafka 소비자가 사용하는 소비자 그룹 ID입니다. 이러한 값은 일치해야 하며, 그렇지 않으면 자동 확장이 실패합니다.
  • VPC_NETWORK: Kafka 클러스터에 연결된 VPC 네트워크입니다.
  • VPC_SUBNET: Kafka 클러스터에 연결된 VPC 서브넷입니다.
  • CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS: 자동 확장 검사를 트리거하기 위해 구성된 Cloud Tasks 큐입니다.
  • TASKS_SERVICE_ACCOUNT_NAME: Cloud Tasks가 자동 확장 검사를 트리거하는 데 사용해야 하는 서비스 계정입니다.
  • KAFKA_AUTOSCALER_SERVICE_NAME: Kafka 자동 확장 처리 서비스의 이름입니다.
  • KAFKA_AUTOSCALER_IMAGE_URI: Kafka 자동 확장 처리 이미지의 URI입니다.
  • KAFKA_AUTOSCALER_CONFIG_SECRET_NAME: 확장 보안 비밀의 이름입니다.
  • SCALER_CHECK_FREQUENCY: 자동 확장 처리 주기(초)입니다.

설정 스크립트 실행

제공된 setup_kafka_scaler.sh 스크립트를 실행합니다.

./setup_kafka_scaler.sh

스크립트는 다음 작업을 실행합니다.

  • 자동 확장 검사를 트리거하는 데 사용되는 Cloud Tasks 큐를 만듭니다.
  • Kafka 자동 확장 처리 서비스 계정을 만들고 필요한 권한을 부여합니다.
  • Kafka 자동 확장 처리를 구성하고 배포합니다.
  • 자동 확장 검사를 주기적으로 트리거하는 Cloud Scheduler 작업을 만듭니다.

setup_kafka_scaler.sh 스크립트가 실행되면 구성된 환경 변수가 출력됩니다. 계속하기 전에 환경 변수가 올바른지 확인하세요.

추가 권한 부여

Kafka 소비자의 인스턴스 수를 변경하려면 Kafka 자동 확장 처리 서비스 계정에 배포된 컨테이너 이미지에 대한 보기 권한이 있어야 합니다. 예를 들어 Artifact Registry에서 소비자 이미지를 배포한 경우 다음 명령어를 실행합니다.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SCALER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/artifactregistry.reader" # Or appropriate role for your registry

Kafka 자동 확장 작동 여부 확인

Kafka 자동 확장 처리 서비스의 확장은 서비스 URL에 대한 요청(SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app)으로 트리거됩니다.

Kafka 자동 확장 처리 서비스에 POST 요청을 보내 자동 확장 계산을 트리거할 수 있습니다.

curl -X POST -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app

다음을 바꿉니다.

  • SCALER_SERVICE_NAME: Kafka 자동 확장 처리 서비스의 이름입니다.
  • PROJECT_NUMBER: Kafka 자동 확장 처리 서비스의 프로젝트 번호입니다.
  • REGION: Kafka 자동 확장 처리 서비스의 위치입니다.

POST 요청은 자동 확장 계산을 트리거하고 로깅에 출력하는 것은 물론 추천에 따라 인스턴스 수를 변경합니다.

Kafka 자동 확장 처리 서비스의 로그에는 [SCALING] Recommended instances X와 같은 메시지가 포함되어야 합니다.

OUTPUT_SCALER_METRICS 플래그가 사용 설정된 경우 custom.googleapis.com/cloud-run-kafkascaler 아래에서 확장 처리 Cloud Monitoring 측정항목을 찾을 수도 있습니다.

고급 확장 구성

spec:
  metrics:
  behavior:
    scaleDown:
      stabilizationWindowSeconds: [INT]
      policies:
      - type: [Percent, Instances]
        value: [INT]
        periodSeconds: [INT]
      selectPolicy: [Min, Max]
    scaleUp:
      stabilizationWindowSeconds: [INT]
      policies:
      - type: [Percent, Instances]
        value: [INT]
        periodSeconds: [INT]
      selectPolicy: [Min, Max]

다음 목록에서는 이전 요소 중 일부를 설명합니다.

  • scaleDown: 인스턴스 수를 줄일 때(축소)의 동작입니다.
  • scaleUp: 인스턴스 수를 늘릴 때(수직 확장)의 동작입니다.
  • stabilizationWindowSeconds: 가장 높은 값(scaleDown) 또는 가장 낮은 값(scaleUp)입니다. 연속 기간 동안 인스턴스 수를 계산합니다. 값을 0으로 설정하면 가장 최근에 계산된 값이 사용됩니다.
  • selectPolicy: 여러 정책이 구성된 경우 시행할 결과입니다.
  • Min: 가장 작은 변경사항입니다.
  • Max: 가장 큰 변경사항입니다.
  • Percent: 기간별 변경사항이 구성된 총 인스턴스의 비율로 제한됩니다.
  • Instances: 기간별 변경사항이 구성된 인스턴스 수로 제한됩니다.
  • periodSeconds: 정책이 시행되는 기간입니다.

예를 들어 기본 구성을 사용하는 전체 사양은 다음과 같습니다.

spec:
  scaleTargetRef:
    name: projects/PROJECT-ID/locations/us-central1/workerpools/kafka-consumer-worker
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60
          activationThreshold: 0
          tolerance: 0.1
          windowSeconds: 120
    - type: External
      external:
        metric:
          name: consumer_lag
        target:
          type: AverageValue
          averageValue: 1000
          activationThreshold: 0
          tolerance: 0.1
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Percent
          value: 50
          periodSeconds: 30
      selectPolicy: Min
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
        - type: Percent
          value: 100
          periodSeconds: 15
        - type: Instances
          value: 4
          periodSeconds: 15
      selectPolicy: Max