Confluent を使用して Apache Kafka を GKE にデプロイする


このガイドでは、Confluent for Kubernetes(CFK)オペレーターを使用して、Apache Kafka クラスタを Google Kubernetes Engine(GKE)にデプロイする方法を説明します。

Kafka は、オープンソースとして提供されているパブリッシュ / サブスクライブ型の分散メッセージング システムで、大規模なリアルタイム ストリーミング データを高スループットで処理します。Kafka を使用すると、システムやアプリケーション間でデータを確実に移動し、処理と分析を行うストリーミング データ パイプラインを構築できます。

このガイドは、GKE に Kafka クラスタをデプロイすることを検討しているプラットフォーム管理者、クラウド アーキテクト、運用担当者を対象としています。

CFK オペレーターを使用して、ウェブベースの Confluent Control Center、Schema Registry、KsqlDB など、Confluent Platform のその他のコンポーネントをデプロイすることもできます。ただし、このガイドでは Kafka のデプロイについてのみ説明します。

目標

  • Apache Kafka 用の GKE インフラストラクチャを計画してデプロイする
  • CFK オペレーターをデプロイして構成する
  • CFK オペレーターを使用して Apache Kafka を構成し、可用性、セキュリティ、オブザーバビリティ、パフォーマンスを確保する

利点

CFK には次の利点があります。

  • 構成変更に対する自動ローリング アップデート。
  • Kafka の可用性に影響を与えない自動ローリング アップグレード。
  • 障害が発生すると、CFK は、同じ Kafka ブローカー ID、構成、永続ストレージ ボリュームを持つ Kafka Pod を復元します。
  • ラックの自動認識。パーティションのレプリカを異なるラック(またはゾーン)に分散し、Kafka ブローカーの可用性を高め、データ損失のリスクを抑えます。
  • Prometheus への集計指標のエクスポートをサポート。

デプロイ アーキテクチャ

Kafka クラスタの各データ パーティションには 1 つのリーダー ブローカーが存在します。また、1 つ以上のフォロワー ブローカーが存在する場合もあります。リーダー ブローカーは、パーティションに対するすべての読み取りと書き込みを処理します。フォロワー ブローカーはリーダー ブローカーの複製です。

一般的な Kafka の設定では、ZooKeeper というオープンソース サービスも使用して Kafka クラスタを調整します。このサービスは、ブローカーの中からリーダーを選出し、障害発生時にフェイルオーバーをトリガーする際に役立ちます。

KRaft モードを有効にして、ZooKeeper を使用せずに Kafka 構成をデプロイすることもできますが、この方法では KafkaTopic リソースや認証情報による認証などの機能がサポートされていないため、本番環境に対応していないとみなされます。

可用性と障害復旧

このチュートリアルでは、Kafka クラスタと ZooKeeper クラスタに個別のノードプールゾーンを使用して、高可用性を確保して障害から復旧できるようにします。

Google Cloud の高可用性 Kubernetes クラスタは、複数のノードとアベイラビリティ ゾーンにまたがるリージョン クラスタに依存しています。この構成により、フォールト トレランス、スケーラビリティ、地理的冗長性が向上します。この構成では、稼働時間と可用性の SLA を提供しながら、ローリング アップデートとメンテナンスを行うことも可能です。詳細については、リージョン クラスタをご覧ください。

デプロイ図

次の図は、GKE クラスタ内の複数のノードとゾーンで実行されている Kafka クラスタを示しています。

この図では、Kafka StatefulSet が 3 つの異なるゾーンの 3 つのノードにデプロイされています。この構成を制御するには、必要な Pod アフィニティ ルールとトポロジ分散ルールを Kafka カスタム リソース仕様に設定します。

1 つのゾーンに障害が発生した場合、GKE は推奨構成を使用して新しいノードで Pod を再スケジュールし、残りのレプリカからデータを複製します。この処理は Kafka と Zookeeper の両方に対して行われます。

次の図は、3 つの異なるゾーンの 3 つのノードにデプロイされた ZooKeeper StatefulSet を示しています。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager API を有効にします。

    gcloud services enable compute.googleapis.comiam.googleapis.comcontainer.googleapis.comgkebackup.googleapis.comcloudresourcemanager.googleapis.com
  7. Google Cloud CLI をインストールします。
  8. gcloud CLI を初期化するには:

    gcloud init
  9. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  10. Google Cloud プロジェクトで課金が有効になっていることを確認します

  11. GKE, Backup for GKE, Compute Engine, Identity and Access Management, and Resource Manager API を有効にします。

    gcloud services enable compute.googleapis.comiam.googleapis.comcontainer.googleapis.comgkebackup.googleapis.comcloudresourcemanager.googleapis.com
  12. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 role/storage.objectViewer, role/logging.logWriter, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。

環境を準備する

このチュートリアルでは、Cloud Shell を使用して Google Cloud でホストされるリソースを管理します。Cloud Shell には、このチュートリアルに必要なソフトウェア(kubectlgcloud CLIHelmTerraform など)がプリインストールされています

Cloud Shell を使用して環境を設定するには、次の操作を行います。

  1. Google Cloud コンソールCloud Shell 有効化アイコンCloud Shell をアクティブにする」をクリックして、Google Cloud コンソールから Cloud Shell セッションを起動します。Google Cloud コンソールの下部ペインでセッションが起動します。

  2. 環境変数を設定します。

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    PROJECT_ID の置き換え: Google Cloud のプロジェクト ID に置き換えます。

  3. GitHub リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. 作業ディレクトリを変更します。

    cd kubernetes-engine-samples/streaming
    

クラスタ インフラストラクチャを作成する

このセクションでは、Terraform スクリプトを実行して、限定公開の高可用性リージョン GKE クラスタを作成します。次の手順では、コントロール プレーンへの公開アクセスを許可します。アクセスを制限するため、限定公開クラスタを作成します。

オペレーターは、Standard または Autopilot クラスタを使用してインストールできます。

Standard

次の図は、3 つの異なるゾーンにデプロイされた限定公開のリージョン GKE Standard クラスタを示しています。

このインフラストラクチャをデプロイするには、Cloud Shell から次のコマンドを実行します。

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

プロンプトが表示されたら、「yes」と入力します。このコマンドが完了し、クラスタが準備完了ステータスになるまでに数分かかることがあります。

Terraform が次のリソースを作成します。

  • Kubernetes ノードの VPC ネットワークとプライベート サブネット。
  • NAT 経由でインターネットにアクセスするためのルーター。
  • us-central1 リージョンの限定公開 GKE クラスタ。
  • 自動スケーリングが有効な 2 つのノードプール(ゾーンあたり 1~2 ノード、ゾーンあたり 1 ノード以上)
  • ロギングとモニタリングの権限を持つ ServiceAccount
  • Backup for GKE(障害復旧用)。
  • Google Cloud Managed Service for Prometheus(クラスタ モニタリング用)。

出力は次のようになります。

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Autopilot

次の図は、限定公開のリージョン GKE Autopilot クラスタを示しています。

このインフラストラクチャをデプロイするには、Cloud Shell から次のコマンドを実行します。

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

プロンプトが表示されたら、「yes」と入力します。このコマンドが完了し、クラスタが準備完了ステータスになるまでに数分かかることがあります。

Terraform が次のリソースを作成します。

  • Kubernetes ノードの VPC ネットワークとプライベート サブネット
  • NAT 経由でインターネットにアクセスするためのルーター。
  • us-central1 リージョンの限定公開 GKE クラスタ。
  • ロギングとモニタリングの権限を持つ ServiceAccount
  • Google Cloud Managed Service for Prometheus(クラスタ モニタリング用)。

出力は次のようになります。

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

クラスタに接続する

クラスタと通信を行うように kubectl を構成します。

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

CFK オペレーターをクラスタにデプロイする

このセクションでは、Helm チャートを使用して Confluent for Kubernetes(CFK)オペレーターをデプロイしてから、Kafka クラスタをデプロイします。

  1. Confluent Helm チャート リポジトリを追加します。

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. CFK オペレーターと Kafka クラスタの Namespace を追加します。

    kubectl create ns kafka
    
  3. Helm を使用して CFK クラスタ オペレーターをデプロイします。

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    CFK がすべての Namespace のリソースを管理できるようにするため、Helm コマンドにパラメータ --set-namespaced=false を追加します。

  4. Helm を使用して、Confluent オペレーターが正常にデプロイされたことを確認します。

    helm ls -n kafka
    

    出力は次のようになります。

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

Kafka をデプロイする

このセクションでは、Kafka を基本構成にデプロイしてから、可用性、セキュリティ、オブザーバビリティの要件を満たすため、さまざまな高度な構成シナリオを試してみます。

基本構成

Kafka インスタンスの基本構成には、次のコンポーネントが含まれています。

  • Kafka ブローカーの 3 つのレプリカ。クラスタの整合性を確保するため、利用可能なレプリカが少なくとも 2 つ必要です。
  • クラスタを形成する ZooKeeper ノードの 3 つのレプリカ。
  • 2 つの Kafka リスナー: 1 つは認証を利用しません。もう 1 つは CFK によって生成された証明書を使用して TLS 認証を利用します。
  • Kafka では Java の MaxHeapSize と MinHeapSize が 4 GB に設定されています。
  • 1 CPU リクエストの CPU リソースの割り当ては 2 CPU に制限されています。メモリ リクエストの割り当ては、Kafka の場合 5 GB で、メインサービスに 4 GB、指標エクスポータに 0.5 GB という制限があります。ZooKeeper の場合は 3 GB で、メインサービスに 2 GB、指標エクスポータに 0.5 GB という制限があります。
  • premium-rwo storageClass を使用して各 Pod に 100 GB のストレージが割り当てられます。Kafka データには 100 GB、Zookeeper データとログにはそれぞれ 90 GB、10 GB が割り当てられます。
  • 各ワークロードに構成された toleration、nodeAffinities、podAntiAffinities。それぞれのノードプールと異なるゾーンを使用して、ノード間で適切に分散されます。
  • 指定した認証局を使用して自己署名証明書で保護されているクラスタ内の通信。

この構成は、本番環境に対応した Kafka クラスタの作成に必要な最小限の設定を表しています。以降のセクションでは、クラスタ セキュリティ、アクセス制御リスト(ACL)、トピック管理、証明書管理などに対処するためのカスタム構成を示します。

基本的な Kafka クラスタを作成する

  1. CA ペアを生成します。

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    Confluent for Kubernetes は、Confluent Platform コンポーネントが TLS ネットワーク暗号化に使用する証明書を自動生成します。ユーザーは認証局(CA)を生成して提供する必要があります。

  2. 認証局の Kubernetes Secret を作成します。

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    Secret の名前は事前に定義されています。

  3. 基本構成を使用して新しい Kafka クラスタを作成します。

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    このコマンドは、CFK オペレーターの Kafka カスタム リソースと Zookeeper カスタム リソースを作成します。これには、CPU とメモリ リクエストと上限、ブロック ストレージ リクエスト、Kubernetes ノード間でプロビジョニングされた Pod を分散させる taint とアフィニティが含まれています。

  4. Kubernetes が必要なワークロードを開始するまで数分待ちます。

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. Kafka ワークロードが作成されたことを確認します。

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    出力は次のようになります。

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

オペレーターが次のリソースを作成します。

  • Kafka と ZooKeeper 用の 2 つの StatefulSet。
  • Kafka ブローカー レプリカ用に 3 つの Pod。
  • ZooKeeper レプリカ用に 3 つの Pod。
  • 2 つの PodDisruptionBudget リソース。クラスタの整合性を確保するため、使用できないレプリカを最大 1 つ確保します。
  • my-cluster という名前の Service。Kubernetes クラスタ内から接続する Kafka クライアントのブートストラップ サーバーとして機能します。この Service では、すべての内部 Kafka リスナーを使用できます。
  • zookeeper という名前の Service。これにより、Kafka ブローカーはクライアントとして ZooKeeper ノードに接続できます。

認証とユーザー管理

このセクションでは、Kafka リスナーを保護し、クライアントと認証情報を共有するために認証と認可を有効にする方法について説明します。

Confluent for Kubernetes は、次のような Kafka 向けのさまざまな認証方法をサポートしています。

  • SASL / PLAIN 認証: クライアントは認証にユーザー名とパスワードを使用します。ユーザー名とパスワードは Kubernetes Secret のサーバー側に保存されます。
  • LDAP による SASL / PLAIN 認証: クライアントは認証にユーザー名とパスワードを使用します。認証情報は LDAP サーバーに保存されます。
  • mTLS 認証: クライアントは認証に TLS 証明書を使用します。

制限事項

  • CFK では、ユーザー管理用のカスタム リソースは提供されません。ただし、認証情報を Secret に格納し、リスナー仕様で Secret を参照することはできます。
  • ACL を直接管理するためのカスタム リソースはありませんが、Kafka CLI を使用して ACL を構成する方法については、公式の Confluent for Kubernetes がガイダンスを提供しています。

ユーザーを作成する

このセクションでは、次のようなユーザー管理機能を実行する CFK オペレーターのデプロイ方法を示します。

  • いずれかのリスナーでパスワード ベースの認証(SASL / PLAIN)が有効になっている Kafka クラスタ
  • 3 つのレプリカを持つ KafkaTopic
  • 読み取り / 書き込み権限を含むユーザー認証情報
  1. ユーザー認証情報で Secret を作成します。

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    認証情報は次の形式で保存する必要があります。

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. ポート 9094 でパスワード ベースの認証である SCRAM-SHA-512 認証でリスナーを使用するように、Kafka クラスタを構成します。

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. Kafka クラスタとやり取りを行うトピックとクライアント Pod を設定し、Kafka コマンドを実行します。

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    GKE は、Secret my-user-credentialsVolume としてクライアント Pod にマウントします。

  4. クライアント Pod の準備ができたら接続して、提供された認証情報を使用してメッセージの生成と使用を開始します。

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. my-user 認証情報を使用してメッセージを生成し、メッセージで受信を確認します。

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    出力は次のようになります。

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    CTRL+C」と入力して、コンシューマー プロセスを停止します。Connect refused エラーが発生した場合は、数分待ってからもう一度試してください。

  6. Pod のシェルを終了します。

    exit
    

バックアップと障害復旧

Confluent オペレーターを使用すると、特定のパターンに従って効率的なバックアップ戦略を実装できます。

Backup for GKE を使用して、次のものをバックアップできます。

  • Kubernetes リソース マニフェスト。
  • バックアップを行うクラスタの Kubernetes API サーバーから抽出された Confluent API カスタム リソースとその定義。
  • マニフェスト内の PersistentVolumeClaim リソースに対応する Volume。

Backup for GKE を使用して Kafka クラスタのバックアップと復元を行う方法については、障害復旧の準備を行うをご覧ください。

Kafka クラスタの手動バックアップを実行することもできます。次のデータをバックアップする必要があります。

  • Kafka 構成。KafkaTopicsConnect など、Confluent API のすべてのカスタム リソースが含まれます。
  • Kafka ブローカーの PersistentVolume に保存されているデータ。

Confluent の構成を含む Kubernetes リソース マニフェストを Git リポジトリに保存すると、必要に応じてリソースを新しい Kubernetes クラスタに再適用できるため、Kafka 構成のバックアップを別途行う必要がなくなります。

Kafka サーバー インスタンスまたは Kafka がデプロイされた Kubernetes クラスタが失われた場合に Kafka のデータを復旧できるようにするため、reclaimPolicy オプションを Retain に設定して、Kafka ブローカーのボリュームのプロビジョニングに使用される Kubernetes ストレージ クラスを構成することをおすすめします。また、Kafka ブローカー ボリュームのスナップショットを取得することもおすすめします。

次のマニフェストでは、reclaimPolicy オプション Retain を使用する StorageClass を記述しています。

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

次の例は、Kafka クラスタのカスタム リソースの spec に追加された StorageClass を示しています。

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

この構成では、対応する PersistentVolumeClaim が削除されても、ストレージ クラスを使用してプロビジョニングされた PersistentVolume は削除されません。

既存の構成とブローカー インスタンス データを使用して、新しい Kubernetes クラスタに Kafka インスタンスを復元するには:

  1. 既存の Confluent カスタム リソース(KafkaKafkaTopicZookeeper など)を新しい Kubernetes クラスタに適用します。
  2. PersistentVolumeClaim の spec.volumeName プロパティを使用して、新しい Kafka ブローカー インスタンスの名前を持つ PersistentVolumeClaim を古い PersistentVolume に戻します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

個々のリソースを削除する

既存のプロジェクトを削除しない場合は、リソースを個別に削除します。

  1. 環境変数を設定します。

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    
  2. terraform destroy コマンドを実行します。

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID}   \
      -var region=${REGION}  \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
    

    FOLDER は、gke-autopilot または gke-standard に置き換えます。

    プロンプトが表示されたら、「yes」と入力します。

  3. アタッチされていないすべてのディスクを検索します。

    export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
    
  4. ディスクを削除します。

    for i in $disk_list; do
      disk_name=$(echo $i| cut -d'|' -f1)
      disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||')
      echo "Deleting $disk_name"
      gcloud compute disks delete $disk_name --zone $disk_zone --quiet
    done
    

次のステップ