高可用性 Kafka クラスタを GKE にデプロイする


Kafka は、オープンソースとして提供されているパブリッシュ / サブスクライブ型の分散メッセージング システムで、大規模なリアルタイム ストリーミング データを高スループットで処理します。Kafka を使用すると、システムやアプリケーション間でデータを確実に移動し、処理と分析を行うストリーミング データ パイプラインを構築できます。

このチュートリアルは、Google Kubernetes Engine(GKE)に高可用性 Kafka クラスタをデプロイすることに関心があるプラットフォーム管理者、クラウド アーキテクト、運用担当者を対象としています。

目標

このチュートリアルの学習内容は次のとおりです。

  • Terraform を使用して GKE のリージョン クラスタを作成する。
  • 高可用性 Kafka クラスタをデプロイする。
  • Kafka バイナリをアップグレードする。
  • Kafka クラスタのバックアップと復元を行う。
  • GKE ノードの中断と Kafka ブローカーのフェイルオーバーをシミュレートする。

アーキテクチャ

このセクションでは、このチュートリアルで構築するソリューションのアーキテクチャについて説明します。

Kafka クラスタは、受信データ ストリームを処理し、Kafka クライアント(コンシューマー)に対してパブリッシュ / サブスクライブ型のメッセージングを提供するために連携して機能するサーバー(ブローカー)のグループです。

Kafka クラスタの各データ パーティションには 1 つのリーダー ブローカーが存在します。また、1 つ以上のフォロワー ブローカーが存在する場合もあります。リーダー ブローカーは、パーティションに対するすべての読み取りと書き込みを処理します。フォロワー ブローカーはリーダー ブローカーの複製です。

一般的な Kafka の設定では、ZooKeeper というオープンソース サービスも使用して Kafka クラスタを調整します。このサービスは、ブローカーの中からリーダーを選出し、障害発生時にフェイルオーバーをトリガーする際に役立ちます。

このチュートリアルでは、Kafka ブローカーと Zookeeper サービスを個別の StatefulSet として構成することで、Kafka クラスタを GKE にデプロイします。高可用性 Kafka クラスタをプロビジョニングして障害復旧の準備を行うため、別々のノードプールゾーンを使用するように Kafka と Zookeeper StatefulSet を構成します。

次の図は、Kafka StatefulSet が GKE クラスタ内の複数のノードとゾーンで実行される方法を示しています。

この図は、複数のゾーンにデプロイされた GKE 上の Kafka StatefulSet のアーキテクチャを示しています。
図 1: 3 つの異なるゾーンにまたがる GKE ノードへの Kafka StatefulSet のデプロイ。

次の図は、Zookeeper StatefulSet が GKE クラスタ内の複数のノードとゾーンで実行される方法を示しています。

この図は、複数のゾーンにデプロイされた GKE 上の Zookeeper StatefulSet のアーキテクチャを示しています。
図 2: 3 つの異なるゾーンにまたがる GKE ノードへの Kafka Zookeeper のデプロイ。

ノードのプロビジョニングと Pod のスケジューリング

Autopilot クラスタを使用している場合は、Autopilot がノードのプロビジョニングを処理し、ワークロードの Pod をスケジューリングします。Pod の反アフィニティを使用して、同じ StatefulSet の 2 つの Pod が同じノードと同じゾーンにスケジューリングされないようにします。

Standard クラスタを使用している場合は、Pod の tolerationノード アフィニティを構成する必要があります。詳細については、専用ノードプールにワークロードを分離するをご覧ください。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

プロジェクトを設定する

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console のプロジェクト セレクタのページで、[プロジェクトを作成] をクリックして新しい Google Cloud プロジェクトの作成を開始します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM API を有効にします。

    API を有効にする

  5. Google Cloud Console のプロジェクト セレクタのページで、[プロジェクトを作成] をクリックして新しい Google Cloud プロジェクトの作成を開始します。

    プロジェクト セレクタに移動

  6. Google Cloud プロジェクトで課金が有効になっていることを確認します

  7. Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM API を有効にします。

    API を有効にする

ロールを設定する

  1. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    $ gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。

環境を設定する

このチュートリアルでは、Cloud Shell を使用して Google Cloud でホストされるリソースを管理します。Cloud Shell には、Dockerkubectlgcloud CLIHelmTerraform など、このチュートリアルに必要なソフトウェアがプリインストールされています。

Cloud Shell を使用して環境を設定するには、次の操作を行います。

  1. Google Cloud コンソールCloud Shell 有効化アイコンCloud Shell をアクティブにする」をクリックして、Google Cloud コンソールから Cloud Shell セッションを起動します。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。

  2. 環境変数を設定します。

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    

    次の値を置き換えます。

  3. デフォルトの環境変数を設定します。

    gcloud config set project PROJECT_ID
    
  4. コード リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  5. 作業ディレクトリを変更します。

    cd kubernetes-engine-samples/streaming/gke-stateful-kafka
    

クラスタ インフラストラクチャを作成する

このセクションでは、Terraform スクリプトを実行して、2 つのリージョン GKE クラスタを作成します。プライマリ クラスタは us-central1 にデプロイされます。

クラスタの作成手順は次のとおりです。

Autopilot

Cloud Shell で、次のコマンドを実行します。

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

プロンプトが表示されたら、「yes」と入力します。

Standard

Cloud Shell で、次のコマンドを実行します。

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

プロンプトが表示されたら、「yes」と入力します。

Terraform 構成ファイルにより、インフラストラクチャのデプロイに必要な次のリソースが作成されます。

  • Docker イメージを保存するための Artifact Registry リポジトリを作成します。
  • VM のネットワーク インターフェース用に VPC ネットワークとサブネットを作成します。
  • 2 つの GKE クラスタを作成します。

Terraform は、2 つのリージョンに限定公開クラスタを作成し、障害復旧用に Backup for GKE を有効にします。

Kafka をクラスタにデプロイする

このセクションでは、Helm チャートを使用して GKE に Kafka をデプロイします。このオペレーションで次のリソースが作成されます。

Helm チャートを使用して Kafka をデプロイするには、次の操作を行います。

  1. Docker アクセスを構成します。

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Artifact Registry に Kafka と Zookeeper のイメージを登録します。

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. プライマリ クラスタに対する kubectl コマンドライン アクセスを構成します。

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --region=${REGION} \
        --project=${PROJECT_ID}
    
  4. Namespace を作成します。

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Helm チャート バージョン 20.0.6 を使用して Kafka をインストールします。

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    出力は次のようになります。

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Kafka レプリカが実行されていることを確認します(数分かかる場合があります)。

    kubectl get all -n kafka
    

    出力は次のようになります。

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

テストデータを作成する

このセクションでは、Kafka アプリケーションをテストし、メッセージを生成します。

  1. Kafka アプリケーションを操作するコンシューマー クライアント Pod を作成します。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. 3 つのパーティションがあり、レプリケーション係数が 3 のトピックを topic1 という名前で作成します。

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. トピック パーティションが 3 つのブローカーすべてに複製されていることを確認します。

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    出力例では、topic1 に 3 つのパーティションがあり、それぞれに異なるリーダーとレプリカのセットがあることに注意してください。これは、Kafka がパーティショニングを使用して複数のブローカー間でデータを分散するためで、より大規模なスケーラビリティやより強固なフォールト トレランスを実現できます。レプリケーション係数が 3 の場合、各パーティションには 3 つのレプリカが必ず存在します。この場合、1 つまたは 2 つのブローカーに障害が発生しても引き続きデータを使用できます。

  4. 次のコマンドを実行して、メッセージ番号を topic1 に一括で生成します。

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. すべてのパーティションから topic1 を使用するには、次のコマンドを実行します。

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    CTRL+C」と入力して、コンシューマー プロセスを停止します。

Kafka のベンチマーク

ユースケースを正確にモデル化するために、クラスタで予想される負荷のシミュレーションを実行できす。パフォーマンスをテストするには、Kafka パッケージに含まれるツール(bin フォルダの kafka-producer-perf-test.sh スクリプトと kafka-consumer-perf-test.sh スクリプト)を使用します。

  1. ベンチマーク用にトピックを作成します。

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Kafka クラスタで負荷を発生させます。

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    プロデューサーが topic-benchmark で 10,000,000 件のレコードを生成します。出力は次のようになります。

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    すべてのレコードが送信されると、次のような追加指標が出力に表示されます。

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    監視を終了するには、「CTRL + C」と入力します。

  3. Pod のシェルを終了します。

    exit
    

アップグレードを管理する

Kafka と Kubernetes のバージョンは定期的に更新されています。運用のベスト プラクティスに従って、ソフトウェア環境を定期的にアップグレードしてください。

Kafka バイナリ アップグレードを計画する

このセクションでは、Helm を使用して Kafka イメージを更新し、トピックがまだ利用できることを確認します。

クラスタに Kafka をデプロイするで使用した Helm チャートで以前の Kafka バージョンからアップグレードするには、次の操作を行います。

  1. Artifact Registry に次のイメージを登録します。

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. 次の手順で、アップグレードされた Kafka と Zookeeper のイメージを使用して Helm チャートをデプロイします。バージョン固有のガイダンスについては、バージョン アップグレード用の Kafka の手順をご覧ください。

    1. Chart.yaml 依存関係のバージョンを更新します。
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. 次の例のように、新しい Kafka と Zookeeper のイメージを使用して Helm チャートをデプロイします。

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Kafka Pod のアップグレードを監視します。

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    監視を終了するには、「CTRL + C」と入力します。

  3. クライアント Pod を使用して Kafka クラスタに接続します。

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. topic1 からのメッセージにアクセスできることを確認します。

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力には、前の手順で生成されたメッセージが表示されます。CTRL+C と入力してプロセスを終了します。

  5. クライアント Pod を終了します。

    exit
    

障害復旧の準備を行う

サービス中断イベントの発生時に本番環境のワークロードの可用性を確保するには、障害復旧(DR)計画を準備する必要があります。DR 計画の詳細については、障害復旧計画ガイドをご覧ください。

GKE クラスタでワークロードをバックアップして復元するには、Backup for GKE を使用します。

Kafka のバックアップと復元の例

このセクションでは、クラスタのバックアップを gke-kafka-us-central1 から入手して、gke-kafka-us-west1 に復元します。ProtectedApplication カスタム リソースを使用して、アプリケーション スコープでバックアップと復元のオペレーションを行います。

次の図は、障害復旧ソリューションのコンポーネントと、それらのコンポーネントがどのように関係しているのかを示しています。

この図は、高可用性 Kafka クラスタ用のバックアップと復元のソリューションの例を示しています。
図 3: 高可用性 Kafka クラスタ用のバックアップと復元のソリューション。

Kafka クラスタのバックアップと復元を準備するには、次の操作を行います。

  1. 環境変数を設定します。

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. クラスタが RUNNING 状態であることを確認します。

    gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
    
  3. バックアップ プランを作成します。

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. バックアップを手動で作成します。スケジュール バックアップは通常、バックアップ プランの cron スケジュールによって管理されますが、次の例は、1 回限りのバックアップ オペレーションを開始する方法を示しています。

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. 復元プランを作成します。

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. バックアップから手動で復元します。

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. バックアップ クラスタに復元されたアプリケーションを監視します。すべての Pod が実行されて準備が整うまでに数分かかることがあります。

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --region us-west1
    kubectl get pod -n kafka --watch
    

    CTRL+C」と入力して、すべての Pod が稼働状態になったら監視を終了します。

  8. コンシューマーが以前のトピックを取得できるかどうか確認します。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    出力は次のようになります。

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    CTRL+C と入力してプロセスを終了します。

  9. Pod を終了します。

    exit
    

Kafka サービスの停止をシミュレートする

このセクションでは、ブローカーをホストしている Kubernetes ノードを置き換えて、ノード障害をシミュレートします。このセクションの内容は、Standard にのみ適用されます。Autopilot の場合、ノードは自動的に管理されるため、ノードの障害をシミュレートすることはできません。

  1. クライアント Pod を作成して Kafka アプリケーションに接続します。

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. トピック topic-failover-test を作成し、テスト トラフィックを生成します。

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. topic-failover-test トピックのリーダー ブローカーを決めます。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    上記の出力で、Leader: 1topic-failover-test のリーダーがブローカー 1 であることを意味します。これは Pod kafka-1 に対応します。

  4. 新しいターミナルを開き、同じクラスタに接続します。

    gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
    
  5. Pod kafka-1 が実行されているノードを確認します。

    kubectl get pod -n kafka kafka-1 -o wide
    

    出力は次のようになります。

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    上記の出力から、Pod kafka-1 がノード gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 で実行されていることがわかります。

  6. ノードをドレインして Pod を強制排除します。

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    NODE は、Pod kafka-1 が実行されているノードに置き換えます。この例の場合、ノードは gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 です。

    出力は次のようになります。

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. Pod kafka-1 が実行されているノードを確認します。

    kubectl get pod -n kafka kafka-1 -o wide
    

    出力は次のようになります。

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    上記の出力から、アプリケーションが新しいノードで実行されていることがわかります。

  8. kafka-client Pod に接続されているターミナルで、topic-failover-test のリーダー ブローカーを特定します。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    この出力例では、リーダーは 1 のままです。しかし、現在は新しいノードで実行されています。

Kafka リーダーの障害をテストする

  1. Cloud Shell で Kafka クライアントに接続し、describe を使用して topic1 の各パーティションに選出されているリーダーを確認します。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. Kafka クライアントに接続していない Cloud Shell で、kafka-0 リーダー ブローカーを削除して、新しいリーダーを強制的に選択します。前の出力のリーダーの一つにマッピングされているインデックスを削除する必要があります。

    kubectl delete pod -n kafka kafka-0 --force
    

    出力は次のようになります。

    pod "kafka-0" force deleted
    
  3. Kafka クライアントに接続されている Cloud Shell で describe を使用して、選択されたリーダーを確認します。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    出力で、中断されたリーダー(kafka-0)が割り当てられていた場合、各パーティションで新しいリーダーに変更されます。これは、Pod が削除されて再作成されたときに、元のリーダーが置き換えられたことを示します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

Delete a Google Cloud project:

gcloud projects delete PROJECT_ID

次のステップ