このチュートリアルでは、Kafka オートスケーラーを Cloud Run サービスとして構成し、デプロイする方法について説明します。このオートスケーラーは、Cloud Run ワーカープールのデプロイなど、Kafka コンシューマー ワークロードのスケーリング ロジックを実行します。Kafka オートスケーラーは Kafka クラスタから指標を読み取り、Cloud Run ワーカープールまたはサービスの手動スケーリングを使用して、Kafka コンシューマーのラグ指標に基づいて Kafka コンシューマー ワークロードをスケーリングします。
次の図は、Kafka オートスケーラー サービスが Kafka クラスタから指標を読み取り、Kafka コンシューマー ワーカープールを自動スケーリングする方法を示しています。
必要なロール
このサービスのデプロイと実行に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。
- Cloud Run デベロッパー(
roles/run.developer
) - サービス アカウント ユーザー(
roles/iam.serviceAccountUser
) - Artifact Registry 読み取り(
roles/artifactregistry.reader
) - Cloud Scheduler 管理者(
roles/cloudscheduler.admin
): 自動スケーリング チェックをトリガーする Cloud Scheduler ジョブの作成に必要です - Cloud Tasks キュー管理者(
roles/cloudtasks.queueAdmin
): 自動スケーリング チェック用の Cloud Tasks キューの作成に必要です - セキュリティ管理者(
roles/iam.securityAdmin
): サービス アカウントに権限を付与するために必要です
始める前に
Kafka オートスケーラーを構成して使用するには、次のリソースが必要です。
- Kafka クラスタ
- デプロイされたコンシューマー
Kafka クラスタ
- Kafka クラスタは、Compute Engine、Google Kubernetes Engine、または Managed Service for Apache Kafka のいずれかで実行されている必要があります。
- 構成済みの Kafka トピックと、そのトピックにパブリッシュされるイベント。
デプロイされた Cloud Run コンシューマー
- Kafka コンシューマー ワークロードは、サービスまたはワーカープールとして Cloud Run にデプロイする必要があります。また、Kafka クラスタ、トピック、コンシューマー グループに接続するように構成する必要があります。Kafka コンシューマーの例については、Cloud Run Kafka オートスケーラーのサンプル コンシューマーをご覧ください。
- コンシューマー ワークロードは、Kafka クラスタと同じ Google Cloud プロジェクトに存在している必要があります。
ベスト プラクティス
- ダイレクト VPC を使用して、Kafka コンシューマーを VPC ネットワークに接続します。ダイレクト VPC を使用すると、プライベート IP アドレスを使用して Kafka クラスタに接続し、VPC ネットワークでトラフィックを維持できます。
- コンシューマーがイベントを pull しているかどうかを確認する Kafka コンシューマーの liveness ヘルスチェックを構成します。このヘルスチェックにより、コンテナがクラッシュしていなくても、正常でないインスタンスがイベントの処理を停止した場合は、そのインスタンスが自動的に再起動されるようになります。
Kafka オートスケーラーをビルドする
Cloud Build を使用して、Kafka オートスケーラーのコンテナ イメージをソースコードからビルドできます。
リポジトリのクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/cloud-run-kafka-scaler.git
リポジトリ フォルダに移動します。
cd cloud-run-kafka-scaler
出力イメージ名を指定するには、インクルードされている cloudbuild.yaml
ファイルの %ARTIFACT_REGISTRY_IMAGE%
を更新します(例: us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler
)。
gcloud builds submit --tag us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler
このコマンドは、コンテナ イメージをビルドして Artifact Registry に push します。後で必要になるため、イメージのフルパス(SCALER_IMAGE_PATH
)を記録しておきます。
生成されたイメージはローカルでは実行できない点に注意してください。Java ベースイメージの上にレイヤ化することを目的としています。コンテナ イメージをローカルで実行できるように再構成する方法など、詳細については、ベースイメージの自動更新を構成するをご覧ください。
Kafka オートスケーラーの構成を定義する
Kafka オートスケーラーは、シークレットを使用して構成できます。オートスケーラーは構成を定期的に更新します。つまり、オートスケーラーを再デプロイしなくても、新しいシークレット バージョンを push して構成を変更できます。
Kafka クライアントのプロパティを構成する
Kafka オートスケーラーをデプロイするときに、シークレットをボリュームとしてマウントすることで、Kafka Admin API への接続を構成できます。
kafka_client_config.txt
という名前のファイルを作成し、追加する Kafka 管理クライアント構成のプロパティを記述します。bootstrap.servers
プロパティは必須です。
bootstrap.servers=BOOTSTRAP_SERVER_LIST
BOOTSTRAP_SERVER_LIST は、Kafka クラスタの HOST:PORT
リストに置き換えます。
Kafka 認証を構成する
Kafka サーバーで認証が必要な場合は、必要な構成プロパティを kafka_client_config.txt
ファイルに記述します。たとえば、Google OAuth でアプリケーションのデフォルト認証情報を使用して Managed Service for Apache Kafka クラスタに接続する場合、このシークレットには次のプロパティが含まれている必要があります。
bootstrap.servers=BOOTSTRAP_SERVER_LIST
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
BOOTSTRAP_SERVER_LIST は、Kafka クラスタの HOST:PORT
リストに置き換えます。
Managed Service for Apache Kafka クラスタでアプリケーションのデフォルト認証情報を使用するには、Kafka オートスケーラー サービス アカウントに Managed Kafka クライアント(roles/managedkafka.client
)ロールを付与する必要があります。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/managedkafka.client"
次のように置き換えます。
- SCALER_SERVICE_ACCOUNT: Kafka オートスケーラー サービス アカウントの名前。
- PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
デプロイ時にボリュームとしてマウントされるシークレットを作成するには、kafka_client_config.txt
ファイルを使用します。
gcloud secrets create ADMIN_CLIENT_SECRET_NAME --data-file=kafka_client_config.txt
ADMIN_CLIENT_SECRET_NAME は、Kafka 認証シークレットの名前に置き換えます。
スケーリングを構成する
Kafka オートスケーラーは、/scaler-config/scaling
ボリュームからスケーリング構成を読み取ります。このボリュームの内容は YAML 形式にする必要があります。この構成では、シークレット ボリュームをマウントすることをおすすめします。
次の構成を持つ scaling_config.yaml
という名前のファイルを作成します。
spec: scaleTargetRef: name: projects/PROJECT_ID/locations/REGION/workerpools/CONSUMER_SERVICE_NAME metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: TARGET_CPU_UTILIZATION activationThreshold: CPU_ACTIVATION_THRESHOLD tolerance: CPU_TOLERANCE windowSeconds: CPU_METRIC_WINDOW - type: External external: metric: name: consumer_lag target: type: AverageValue averageValue: LAG_THRESHOLD activationThreshold: LAG_ACTIVATION_THRESHOLD tolerance: LAG_TOLERANCE
次のように置き換えます。
- PROJECT_ID: 自動スケーリングする Kafka コンシューマー ワークロードのプロジェクト ID。
- REGION: 自動スケーリングする Kafka コンシューマー ワークロードのリージョン。
- CONSUMER_SERVICE_NAME: 自動スケーリングする Kafka コンシューマー ワークロードの名前。
- TARGET_CPU_UTILIZATION: 自動スケーリングの計算に使用する CPU 使用率の目標値(例:
60
)。 - LAG_THRESHOLD: 自動スケーリングをトリガーする
consumer_lag
指標のしきい値(例:1000
)。 - (省略可)CPU_ACTIVATION_THRESHOLD: CPU のアクティベーションのしきい値。すべての指標が非アクティブの場合、ターゲット コンシューマーはゼロにスケーリングされます。デフォルトは
0
です。 - (省略可)CPU_TOLERANCE: 指定された範囲内の場合は、スケーリングの変更を防ぐしきい値。ターゲット CPU 使用率に対する割合(%)で表します。デフォルトは
0.1
です。 - (省略可)CPU_METRIC_WINDOW: 平均 CPU 使用率を計算する期間(秒単位)。デフォルトは
120
です。 - (省略可)LAG_ACTIVATION_THRESHOLD:
consumer_lag
指標のアクティベーションのしきい値。すべての指標が非アクティブの場合、ターゲット コンシューマーはゼロにスケーリングされます。デフォルトは0
です。 - (省略可)LAG_TOLERANCE: 指定された範囲内の場合は、スケーリングの変更を防ぐしきい値。ターゲットのコンシューマー ラグに対する割合(%)で表します。デフォルトは
0.1
です。
必要に応じて、behavior:
ブロックを使用して高度なスケーリング プロパティを構成することもできます。このブロックは、Kubernetes HPA スケーリング ポリシーと同じプロパティを数多くサポートしています。
behavior
ブロックを指定しない場合は、次のデフォルト構成が使用されます。
behavior: scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 50 periodSeconds: 30 selectPolicy: Min scaleUp: stabilizationWindowSeconds: 0 policies: - type: Percent value: 100 periodSeconds: 15 - type: Instances value: 4 periodSeconds: 15 selectPolicy: Max
デプロイ時にマウントされるシークレット ボリュームを作成するには、構成を scaling_config.yaml
という名前のファイルにコピーし、次のコマンドを実行します。
gcloud secrets create SCALING_CONFIG_SECRET_NAME --data-file=scaling_config.yaml
SCALING_CONFIG_SECRET_NAME は、スケーリング シークレットの名前に置き換えます。
Kafka オートスケーラーをデプロイする
前提条件を満たしたら、Kafka オートスケーラー サービスとそのサポート インフラストラクチャをデプロイできます。このプロセスを簡素化するために、Terraform モジュールとシェル スクリプトが用意されています。
gcloud
このセクションでは、オートスケーラーを手動でデプロイするために必要な各 gcloud コマンドについて説明します。ほとんどの場合、シェル スクリプトまたは Terraform モジュールの使用をおすすめします。
サービス アカウントを作成する
サービス アカウントの要件は、構成した自動スケーリングのチェック間隔によって異なります。Kafka オートスケーラーを構成すれば、柔軟な間隔で自動スケーリングのチェックを実行できます。
- 1 分以上: Cloud Scheduler は、選択した間隔で POST リクエストを使用して自動スケーリングのチェックをトリガーします。
1 分未満: Cloud Scheduler は、構成した頻度に基づいて、毎分複数の Cloud Tasks の作成をトリガーします。
1 分以上
Kafka オートスケーラーのサービス アカウント
Kafka オートスケーラーのサービス アカウントを作成します。
gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT
SCALER_SERVICE_ACCOUNT は、Kafka オートスケーラーのサービス アカウントの名前に置き換えます。
Kafka オートスケーラーは、Kafka コンシューマー インスタンスの数を更新するために、次の権限が必要です。
- Kafka コンシューマーのサービス アカウントに対する
iam.serviceaccounts.actAs
。 - Kafka コンシューマーのイメージを含むリポジトリに対する
roles/artifactregistry.reader
。 run.workerpools.get
とrun.workerpools.update
。これらの権限は、Cloud Run 管理者ロール(roles/run.admin
)に含まれています。- スケーリングと Kafka 認証の両方のシークレットに対する
roles/secretmanager.secretAccessor
。 - Kafka コンシューマー プロジェクトに対する
roles/monitoring.viewer
。CPU 使用率の指標を読み取るには、このロールが必要です。 - Kafka コンシューマー プロジェクトに対する
roles/monitoring.metricWriter
。このロールは省略可能ですが、オートスケーラーがカスタム指標を出力できるようになるため、オブザーバビリティが向上します。
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser"
gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader" \
--location=REPO_REGION
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.admin"
gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.viewer" \
--condition=None
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.metricWriter" \
--condition=None
次のように置き換えます。
- PROJECT_ID: Kafka オートスケーラー サービスが配置されているプロジェクト ID。
- CONSUMER_SERVICE_ACCOUNT_EMAIL: Kafka コンシューマーのサービス アカウントのメールアドレス(例:
example@PROJECT-ID.iam.gserviceaccount.com
)。 - SCALER_SERVICE_ACCOUNT: Kafka オートスケーラーのサービス アカウント。
- ADMIN_CLIENT_SECRET_NAME: Kafka 認証シークレットの名前。
- SCALING_CONFIG_SECRET_NAME: スケーリング シークレットの名前。
- CONSUMER_IMAGE_REPO: Kafka コンシューマーのコンテナ イメージを含むリポジトリの ID または完全修飾識別子。
- REPO_REGION: コンシューマーのイメージ リポジトリのロケーション。
1 分未満
Cloud Tasks を設定する
Cloud Scheduler は、1 分以上の間隔でのみトリガーできます。1 分未満の間隔の場合は、Cloud Tasks を使用して Kafka オートスケーラーをトリガーします。Cloud Tasks を設定するには、次の操作が必要です。
- 自動スケーリングのチェックタスク用の Cloud Tasks キューを作成する。
- Cloud Run 起動元ロールを使用して Kafka オートスケーラーを呼び出すために Cloud Tasks が使用するサービス アカウントを作成する。
gcloud tasks queues create CLOUD_TASKS_QUEUE_NAME \
--location=REGION
gcloud iam service-accounts create TASKS_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
--member="serviceAccount:TASKS_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.invoker"
次のように置き換えます。
- CLOUD_TASKS_QUEUE_NAME: 自動スケーリングのチェックをトリガーするために構成された Cloud Tasks キュー。
- TASKS_SERVICE_ACCOUNT: Cloud Tasks が自動スケーリングのチェックをトリガーするために使用するサービス アカウント。
- SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
- PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
- REGION: Kafka オートスケーラー サービスのロケーション。
Kafka オートスケーラーのサービス アカウントを設定する
Kafka オートスケーラーのサービス アカウントを作成します。
gcloud iam service-accounts create SCALER_SERVICE_ACCOUNT
SCALER_SERVICE_ACCOUNT は、Kafka オートスケーラーのサービス アカウントの名前に置き換えます。
Kafka オートスケーラーは、Kafka コンシューマー インスタンスの数を更新し、自動スケーリングをチェックするタスクを作成するために、次の権限が必要です。
- Kafka コンシューマーのサービス アカウントに対する
iam.serviceaccounts.actAs
。 - Kafka コンシューマーのイメージを含むリポジトリに対する
roles/artifactregistry.reader
。 run.workerpools.get
とrun.workerpools.update
。これらの権限は、Cloud Run 管理者ロール(roles/run.admin
)に含まれています。- スケーリングと Kafka 認証の両方のシークレットに対する
roles/secretmanager.secretAccessor
。 - Kafka コンシューマー プロジェクトに対する
roles/monitoring.viewer
。CPU 使用率の指標を読み取るには、このロールが必要です。 - Kafka コンシューマー プロジェクトに対する
roles/monitoring.metricWriter
。このロールは省略可能ですが、オートスケーラーがカスタム指標を出力できるようになるため、オブザーバビリティが向上します。 - Cloud Tasks へのデータ追加ロール(
roles/cloudtasks.enqueuer
)。
gcloud iam service-accounts add-iam-policy-binding CONSUMER_SERVICE_ACCOUNT_EMAIL \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser"
gcloud iam service-accounts add-iam-policy-binding CONSUMER_IMAGE_REPO \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader" \
--location=REPO_REGION
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.admin"
gcloud secrets add-iam-policy-binding ADMIN_CLIENT_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud secrets add-iam-policy-binding SCALING_CONFIG_SECRET_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.viewer" \
--condition=None
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.metricWriter" \
--condition=None
gcloud tasks queues add-iam-policy-binding CLOUD_TASKS_QUEUE_NAME \
--member="serviceAccount:SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/cloudtasks.enqueuer" \
--location=REGION
次のように置き換えます。
- PROJECT_ID: Kafka オートスケーラー サービスが配置されているプロジェクト ID。
- CONSUMER_SERVICE_ACCOUNT_EMAIL: Kafka コンシューマーのサービス アカウントのメールアドレス(例:
example@PROJECT_ID.iam.gserviceaccount.com
)。 - SCALER_SERVICE_ACCOUNT: Kafka オートスケーラーのサービス アカウント。
- CONSUMER_IMAGE_REPO: Kafka コンシューマーのコンテナ イメージを含むリポジトリの ID または完全修飾識別子。
- ADMIN_CLIENT_SECRET_NAME: Kafka 認証シークレットの名前。
- SCALING_CONFIG_SECRET_NAME: スケーリング シークレットの名前。
- REPO_REGION: コンシューマーのイメージ リポジトリのロケーション。
- CLOUD_TASKS_QUEUE_NAME: 自動スケーリングのチェックをトリガーするために構成された Cloud Tasks キュー。
- REGION: Kafka オートスケーラー サービスのロケーション。
環境変数を構成する
1 分以上
Kafka オートスケーラーは、環境変数を使用して、Kafka コンシューマーとターゲット ワークロードのその他の側面を指定します。セキュリティ確保のため、機密情報はシークレットとして構成することをおすすめします。
次の変数を使用して、scaler_env_vars.yaml
という名前の YAML ファイルを作成します。
KAFKA_TOPIC_ID: KAFKA_TOPIC_ID CONSUMER_GROUP_ID: CONSUMER_GROUP_ID CYCLE_SECONDS: CYCLE_SECONDS OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS
次のように置き換えます。
- KAFKA_TOPIC_ID: Kafka コンシューマーがサブスクライブするトピック ID。
- CONSUMER_GROUP_ID: ターゲット Kafka コンシューマーが使用するコンシューマー グループ ID。これらの値は一致している必要があります。一致していない場合、自動スケーリングは失敗します。
- CYCLE_SECONDS: オートスケーラーのサイクル期間(秒単位)。
- OUTPUT_SCALER_METRICS: 指標を有効にする設定。カスタム指標の出力を有効にするには、値を
true
に設定します。有効にしない場合はfalse
に設定します。
1 分未満
Kafka オートスケーラーは、環境変数を使用して、Kafka コンシューマーとターゲット ワークロードのその他の側面を指定します。セキュリティ確保のため、機密情報はシークレットとして構成することをおすすめします。
次の変数を使用して、scaler_env_vars.yaml
という名前の YAML ファイルを作成します。
KAFKA_TOPIC_ID: KAFKA_TOPIC_ID CONSUMER_GROUP_ID: CONSUMER_GROUP_ID CYCLE_SECONDS: CYCLE_SECONDS OUTPUT_SCALER_METRICS: OUTPUT_SCALER_METRICS FULLY_QUALIFIED_CLOUD_TASKS_QUEUE_NAME: CLOUD_TASKS_QUEUE_NAME INVOKER_SERVICE_ACCOUNT_EMAIL: TASKS_SERVICE_ACCOUNT_EMAIL
次のように置き換えます。
- KAFKA_TOPIC_ID: Kafka コンシューマーがサブスクライブするトピック ID。
- CONSUMER_GROUP_ID: ターゲット Kafka コンシューマーが使用するコンシューマー グループ ID。これらの値は一致している必要があります。一致していない場合、自動スケーリングは失敗します。
- CYCLE_SECONDS: オートスケーラーのサイクル期間(秒単位)。
- OUTPUT_SCALER_METRICS: 指標を有効にする設定。カスタム指標の出力を有効にするには、値を
true
に設定します。有効にしない場合はfalse
に設定します。 - CLOUD_TASKS_QUEUE_NAME: 自動スケーリングのチェックをトリガーするための Cloud Tasks キューの完全修飾名。次の形式になっています。
projects/$PROJECT_ID/locations/$REGION/queues/$CLOUD_TASKS_QUEUE_NAME
- TASKS_SERVICE_ACCOUNT_EMAIL: Cloud Tasks が自動スケーリングのチェックをトリガーするために使用するサービス アカウント(例:
example@PROJECT_ID.iam.gserviceaccount.com
)。
提供されたイメージを使用して Kafka オートスケーラーをデプロイし、scaler_env_vars.yaml
ファイルとシークレット ボリュームのマウントを使用して Kafka VPC に接続します。
gcloud run deploy SCALER_SERVICE_NAME \
--image=SCALER_IMAGE_URI \
--env-vars-file=scaler_env_vars.yaml \
--service-account=SCALER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
--no-allow-unauthenticated \
--network=KAFKA_VPC_NETWORK \
--subnet=KAFKA_VPC_SUBNET \
--update-secrets=/kafka-config/kafka-client-properties=ADMIN_CLIENT_SECRET_NAME:latest \
--update-secrets=/scaler-config/scaling=SCALING_CONFIG_SECRET_NAME:latest
--labels=created-by=kafka-autoscaler
次のように置き換えます。
- SCALER_IMAGE_URI: Kafka オートスケーラー イメージの URI。
- SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
- SCALER_SERVICE_ACCOUNT: Kafka オートスケーラー サービス アカウントの名前。
- PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
- KAFKA_VPC_NETWORK: Kafka クラスタに接続されている VPC ネットワーク。
- KAFKA_VPC_SUBNET: Kafka クラスタに接続されている VPC サブネット。
- ADMIN_CLIENT_SECRET_NAME: Kafka 認証シークレットの名前。
- SCALING_CONFIG_SECRET_NAME: スケーリング シークレットの名前。
定期的な自動スケーリング チェックを設定する
このセクションでは、Cloud Scheduler を使用して定期的な自動スケーリング チェックをトリガーする方法について説明します。
- 1 分以上: 選択した間隔でトリガーするように Cloud Scheduler を構成します。
- 1 分未満: 毎分トリガーするように Cloud Scheduler を構成します。
起動元サービス アカウントを作成する
Cloud Scheduler が Kafka オートスケーラーを呼び出せるようにするには、Kafka オートスケーラー サービスに起動元ロール(roles/run.invoker
)を持つサービス アカウントを作成する必要があります。
gcloud iam service-accounts create SCALER_INVOKER_SERVICE_ACCOUNT
gcloud run services add-iam-policy-binding SCALER_SERVICE_NAME \
--member="serviceAccount:SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/run.invoker"
次のように置き換えます。
- SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
- SCALER_INVOKER_SERVICE_ACCOUNT: 起動元サービス アカウントの名前。
- PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
Cloud Scheduler ジョブを作成する
1 分以上
選択した自動スケーリングのチェック間隔で Cloud Scheduler ジョブを作成します。
gcloud scheduler jobs create http kafka-scaling-check \
--location=REGION \
--schedule="CRON_SCHEDULE" \
--time-zone="TIMEZONE" \
--uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
--oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
--http-method=POST
次のように置き換えます。
- SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
- SCALER_INVOKER_SERVICE_ACCOUNT: 起動元サービス アカウントの名前。
- PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
- PROJECT_NUMBER: Kafka オートスケーラー サービスのプロジェクト番号。
- REGION: Kafka オートスケーラー サービスのロケーション。
- TIMEZONE: タイムゾーン(例:
America/Los_Angeles
)。 - CRON_SCHEDULE: 選択したスケジュール(Crontab 形式)。たとえば、1 分ごとの場合は
"* * * * *"
とします。
1 分未満
毎分実行される Cloud Scheduler ジョブを作成します。
gcloud scheduler jobs create http kafka-scaling-check \
--location=REGION \
--schedule="* * * * *" \
--time-zone="TIMEZONE" \
--uri=https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app \
--oidc-service-account-email=SCALER_INVOKER_SERVICE_ACCOUNT@PROJECT_ID.iam.gserviceaccount.com \
--http-method=POST
次のように置き換えます。
- SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
- SCALER_INVOKER_SERVICE_ACCOUNT: 起動元サービス アカウントの名前。
- PROJECT_ID: Kafka オートスケーラー サービスのプロジェクト ID。
- PROJECT_NUMBER: Kafka オートスケーラー サービスのプロジェクト番号。
- REGION: Kafka オートスケーラー サービスのロケーション。
- TIMEZONE: タイムゾーン(例:
America/Los_Angeles
)。
Terraform
terraform/
ディレクトリには、Kafka オートスケーラーとそれに関連するリソースのプロビジョニングに使用できる再利用可能な Terraform モジュールが格納されています。
このモジュールにより、次のものを作成する処理が自動化されます。
- Kafka オートスケーラー Cloud Run サービス
- サービス アカウントと IAM バインディングのサポート
- Cloud Tasks キュー
- Cloud Scheduler のジョブ
詳細な手順、使用例、すべての入出力変数の説明については、terraform readme
をご覧ください。
Terraform モジュールに対して、必要な変数(前提条件の詳細を含む)を提供する必要があります。たとえば、プロジェクト ID、リージョン、コンシューマー SA のメールアドレス、シークレット名、スケーラー イメージのパス、トピック ID などです。
シェル
オートスケーラーには、必要なすべてのリソースを自動的に作成して構成する setup_kafka_scaler.sh
スクリプトが用意されています。
環境変数を設定する
スクリプトを実行する前に、必要な環境変数をすべて設定していることを確認してください。
# Details for already-deployed Kafka consumer
export PROJECT_ID=PROJECT_ID
export REGION=REGION
export CONSUMER_SERVICE_NAME=DEPLOYED_KAFKA_CONSUMER
export CONSUMER_SA_EMAIL=KAFKA_CONSUMER_ACCOUNT_EMAIL # For example, NAME@PROJECT_ID.iam.gserviceaccount.com
export TOPIC_ID=KAFKA_TOPIC_ID
export CONSUMER_GROUP_ID=KAFKA_CONSUMER_GROUP_ID
export NETWORK=VPC_NETWORK
export SUBNET=VPC_SUBNET
# Details for new items to be created during this setup
export CLOUD_TASKS_QUEUE_NAME=CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS
export TASKS_SERVICE_ACCOUNT=TASKS_SERVICE_ACCOUNT_NAME
export SCALER_SERVICE_NAME=KAFKA_AUTOSCALER_SERVICE_NAME
export SCALER_IMAGE_PATH=KAFKA_AUTOSCALER_IMAGE_URI
export SCALER_CONFIG_SECRET=KAFKA_AUTOSCALER_CONFIG_SECRET_NAME
export CYCLE_SECONDS=SCALER_CHECK_FREQUENCY # For example, 15; this value should be at least 5 seconds.
export OUTPUT_SCALER_METRICS=false # If you want scaling metrics to outputted to Cloud Monitoring set this to true and ensure your scaler service account has permission to write metrics (for example, via roles/monitoring.metricWriter).
次のように置き換えます。
- PROJECT_ID: Kafka オートスケーラー サービスが配置されているプロジェクト ID。
- REGION: Kafka オートスケーラー サービスのロケーション。
- DEPLOYED_KAFKA_CONSUMER: Kafka コンシューマーの名前。
- KAFKA_CONSUMER_ACCOUNT_EMAIL: Kafka コンシューマーのサービス アカウントのメールアドレス。
- KAFKA_TOPIC_ID: Kafka コンシューマーがサブスクライブするトピック ID。
- KAFKA_CONSUMER_GROUP_ID: ターゲット Kafka コンシューマーが使用するコンシューマー グループ ID。これらの値は一致している必要があります。一致していない場合、自動スケーリングは失敗します。
- VPC_NETWORK: Kafka クラスタに接続されている VPC ネットワーク。
- VPC_SUBNET: Kafka クラスタに接続されている VPC サブネット。
- CLOUD_TASKS_QUEUE_FOR_SCALING_CHECKS: 自動スケーリングのチェックをトリガーするために構成された Cloud Tasks キュー。
- TASKS_SERVICE_ACCOUNT_NAME: Cloud Tasks が自動スケーリングのチェックをトリガーするために使用するサービス アカウント。
- KAFKA_AUTOSCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
- KAFKA_AUTOSCALER_IMAGE_URI: Kafka オートスケーラー イメージの URI。
- KAFKA_AUTOSCALER_CONFIG_SECRET_NAME: スケーリング シークレットの名前。
- SCALER_CHECK_FREQUENCY: オートスケーラーのサイクル期間(秒単位)。
設定スクリプトを実行する
提供された setup_kafka_scaler.sh
スクリプトを実行します。
./setup_kafka_scaler.sh
このスクリプトにより、次のアクションが実行されます。
- 自動スケーリングのチェックをトリガーするために使用される Cloud Tasks キューを作成する。
- Kafka オートスケーラーのサービス アカウントを作成し、必要な権限を付与する。
- Kafka オートスケーラーを構成してデプロイする。
- 自動スケーリングのチェックを定期的にトリガーする Cloud Scheduler ジョブを作成する。
setup_kafka_scaler.sh
スクリプトを実行すると、構成された環境変数が出力されます。続行する前に、環境変数が正しいことを確認してください。
追加の権限を付与する
Kafka コンシューマーのインスタンス数を変更するには、デプロイされたコンテナ イメージに対する閲覧権限が、Kafka オートスケーラーのサービス アカウントに付与されている必要があります。たとえば、コンシューマー イメージが Artifact Registry からデプロイされた場合は、次のコマンドを実行します。
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SCALER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader" # Or appropriate role for your registry
Kafka の自動スケーリングが機能していることを確認する
Kafka オートスケーラー サービスのスケーリングは、サービス URL(SCALER_SERVICE_NAME-
PROJECT_NUMBER.
REGION.run.app
)へのリクエストによってトリガーされます。
Kafka オートスケーラー サービスに POST
リクエストを送信することで、自動スケーリングの計算をトリガーできます。
curl -X POST -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://SCALER_SERVICE_NAME-PROJECT_NUMBER.REGION.run.app
次のように置き換えます。
- SCALER_SERVICE_NAME: Kafka オートスケーラー サービスの名前。
- PROJECT_NUMBER: Kafka オートスケーラー サービスのプロジェクト番号。
- REGION: Kafka オートスケーラー サービスのロケーション。
POST
リクエストにより、自動スケーリングの計算がトリガーされ、ログへの出力が行われ、推奨事項に基づいてインスタンス数が変更されます。
Kafka オートスケーラー サービスのログには、[SCALING] Recommended instances X
などのメッセージが記述されます。
OUTPUT_SCALER_METRICS
フラグが有効になっている場合は、custom.googleapis.com/cloud-run-kafkascaler
で、スケーラーの Cloud Monitoring 指標も確認できます。
高度なスケーリング構成
spec: metrics: behavior: scaleDown: stabilizationWindowSeconds: [INT] policies: - type: [Percent, Instances] value: [INT] periodSeconds: [INT] selectPolicy: [Min, Max] scaleUp: stabilizationWindowSeconds: [INT] policies: - type: [Percent, Instances] value: [INT] periodSeconds: [INT] selectPolicy: [Min, Max]
次のリストは、上記の要素の一部を示したものです。
scaleDown
: インスタンス数を減らす場合(スケールダウン)の動作。scaleUp
: インスタンス数を増やす場合(スケールアップ)の動作。stabilizationWindowSeconds
: 周期的な期間について計算されたインスタンス数の最大値(scaleDown
)または最小値(scaleUp
)。値を0
に設定すると、最新の計算値が使用されます。selectPolicy
: 複数のポリシーが構成されている場合に適用する結果。Min
: 最小の変化Max
: 最大の変化Percent
: 期間ごとの変化は、構成された合計インスタンスの割合(%)に制限されます。Instances
: 期間ごとの変化は、構成されたインスタンス数に制限されます。periodSeconds
: ポリシーが適用される期間。
たとえば、デフォルトの構成を使用した完全な仕様は次のようになります。
spec: scaleTargetRef: name: projects/PROJECT-ID/locations/us-central1/workerpools/kafka-consumer-worker metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 60 activationThreshold: 0 tolerance: 0.1 windowSeconds: 120 - type: External external: metric: name: consumer_lag target: type: AverageValue averageValue: 1000 activationThreshold: 0 tolerance: 0.1 behavior: scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 50 periodSeconds: 30 selectPolicy: Min scaleUp: stabilizationWindowSeconds: 0 policies: - type: Percent value: 100 periodSeconds: 15 - type: Instances value: 4 periodSeconds: 15 selectPolicy: Max