Strimzi を使用して Apache Kafka を GKE にデプロイする


このガイドでは、Strimzi オペレーターを使用して Apache Kafka クラスタをデプロイする方法について説明します。

Kafka は、大規模なリアルタイム ストリーミング データを高スループットで処理するように設計されたオープンソースの分散メッセージング システムです。さまざまなシステムやアプリケーション間で信頼性の高いデータ転送を実現するストリーミング データ パイプラインを構築し、処理タスクと分析タスクをサポートできます。

オペレーターは、カスタム リソースを使用してアプリケーションとそのコンポーネントを管理するソフトウェア拡張機能です。オペレーターを使用する理由について詳しくは、オープンソースの Kubernetes ドキュメントのオペレーター パターンをご覧ください。Strimzi オペレーターを使用すると、デプロイ方法を柔軟に選択できます。Kubernetes taint と toleration を使用して、専用ノードで Kafka を実行できます。

このガイドは、GKE に Kafka クラスタをデプロイすることを検討しているプラットフォーム管理者、クラウド アーキテクト、運用担当者を対象としています。

このソリューションは、サードパーティのオペレーターを使用して Kafka クラスタをデプロイし、管理を自動化してエラーを減らす方法を学習する場合に適しています。よりきめ細かい運用管理を行う必要がある場合は、GKE に高可用性 Kafka クラスタをデプロイするをご覧ください。

目標

  • Apache Kafka 用の GKE インフラストラクチャを計画してデプロイする
  • Strimzi オペレーターをデプロイして構成する
  • Strimzi オペレーターを使用して Apache Kafka を構成する

利点

Strimzi には次のようなメリットがあります。

  • Strimzi オペレーターを使用すると、簡素化された Kubernetes ネイティブの方法で Kafka クラスタを管理できます。Strimzi は、Kafka トピックとユーザーを表すカスタム リソースを利用します。これにより、クラスタ管理が大幅に簡素化され、Kubernetes のベスト プラクティスに準拠したクラスタ管理が可能になります。
  • Strimzi は、デフォルトでセキュリティを優先して処理を行います。リスナーの証明書を生成し、TLS、SCRAM-SHA、OAuth などの安全な認証方法をサポートします。また、Strimzi はすべての Kafka リスナーの NetworkPolicy も処理します。
  • Strimzi は外部依存関係に依存しません。これには、指標エクスポータが組み込まれた Kafka クラスタと ZooKeeper クラスタが含まれているため、追加のツールを使う必要はありません。特定の要件を満たすようにブローカー構成を微調整することもできます。

デプロイ アーキテクチャ

Kafka クラスタは 1 つ以上のサーバー(ブローカー)で構成されています。これらのサーバーが連携して受信データ ストリームを管理し、Kafka クライアント(コンシューマー)のパブリッシュ / サブスクライブ メッセージングを容易にします。

Kafka クラスタ内のすべてのデータ パーティションにはリーダー ブローカーが割り当てられます。このブローカーは、パーティションに対するすべての読み取りと書き込みのオペレーションを管理します。パーティションには、リーダー ブローカーのアクションをパッシブに複製する 1 つ以上のフォロワー ブローカーを指定することもできます。

一般的な設定では、ZooKeeper がブローカーの中からリーダーを選択できるように Kafka クラスタを調整し、問題が発生した場合にスムーズなフェイルオーバーを実現します。

KRaft モードを有効にして、ZooKeeper を使用せずに Kafka 構成をデプロイすることもできますが、この方法では KafkaTopic リソースや認証情報による認証などの機能がサポートされていないため、Strimzi コミュニティでは本番環境に対応していないとみなされます。

可用性と障害復旧

このチュートリアルでは、Kafka クラスタと ZooKeeper クラスタに個別のノードプールゾーンを使用して、高可用性を確保して障害から復旧できるようにします。

Google Cloud で高可用性 Kubernetes クラスタを実現するには、次の理由から複数のノードとゾーンを使用することが重要になります。

  • フォールト トレランス: 複数のノードを使用してクラスタ全体にワークロードを分散します。これにより、1 つのノードで障害が発生しても、他のノードがタスクを引き継ぐことができるため、ダウンタイムとサービス中断を防ぐことができます。
  • スケーラビリティ: 複数のノードを使用すると、水平方向のスケーリングで必要に応じてノードの追加や削除を行うことができます。これにより、最適なリソース割り当てが確保され、増加したトラフィックやワークロードの需要に対応できます。
  • 高可用性: 1 つのリージョン内で複数のゾーンを使用することで、冗長性を確保し、単一障害点のリスクを最小限に抑えることができます。アベイラビリティ ゾーン全体が停止した場合でも、クラスタは他のゾーンで稼働を継続し、サービスの可用性を維持できます。
  • 地理的冗長性: 複数のリージョンにまたがってノードを配置することで、クラスタのデータとサービスが地理的に分散されます。これにより、単一ゾーンに影響を及ぼす可能性のある自然災害、停電、その他のローカルな障害に対する復元力が確保されます。
  • ローリング アップデートとメンテナンス: 複数のゾーンを使用することで、クラスタ全体の可用性に影響を与えることなく、個々のノードでローリング アップデートとメンテナンスを実行できます。これにより、サービスを継続しながら、必要なアップデートとパッチをシームレスに適用できます。
  • サービスレベル契約(SLA): Google Cloud は、マルチゾーン デプロイの SLA を提供し、最低レベルの稼働時間と可用性を保証します。

デプロイ図

次の図は、GKE クラスタ内の複数のノードとゾーンで実行されている Kafka クラスタを示しています。

この図では、Kafka StrimziPodSet が 3 つの異なるゾーンの 3 つのノードにデプロイされています。この構成を制御するには、必要な Pod アフィニティ ルールとトポロジ分散ルールを StrimziPodSet カスタム リソース仕様に設定します。

1 つのゾーンに障害が発生した場合、GKE は推奨構成を使用して新しいノードで Pod を再スケジュールし、残りのレプリカからデータを複製します。この処理は Kafka と Zookeeper の両方に対して行われます。

次の図は、3 つの異なるゾーンの 3 つのノードにデプロイされた ZooKeeper StrimziPodSet を示しています。

StrimziPodSet カスタム リソース

このチュートリアルでは、StatefulSets ではなく、Strimzi のバージョン 0.29 で導入された StrimziPodSet カスタム リソースを使用します。

StrimziPodSet リソースを使用すると、クラスタのスケーラビリティが強化されます。構成オプションを渡して、Pod に対してきめ細かい変更を行うことができます。Strimzi バージョン 0.35 以降では、StrimziPodSet リソースがデフォルトで有効になっています。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Compute Engine, IAM, GKE, Backup for GKE, and Resource Manager API を有効にします。

    gcloud services enable compute.googleapis.comiam.googleapis.comcontainer.googleapis.comgkebackup.googleapis.comcloudresourcemanager.googleapis.com
  7. Google Cloud CLI をインストールします。
  8. gcloud CLI を初期化するには:

    gcloud init
  9. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  10. Google Cloud プロジェクトで課金が有効になっていることを確認します

  11. Compute Engine, IAM, GKE, Backup for GKE, and Resource Manager API を有効にします。

    gcloud services enable compute.googleapis.comiam.googleapis.comcontainer.googleapis.comgkebackup.googleapis.comcloudresourcemanager.googleapis.com
  12. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/storage.objectViewer, roles/logging.logWriter, roles/container.clusterAdmin, roles/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。

環境を準備する

このチュートリアルでは、Cloud Shell を使用して Google Cloud でホストされるリソースを管理します。Cloud Shell には、このチュートリアルに必要なソフトウェア(kubectlgcloud CLIHelmTerraform など)がプリインストールされています

Cloud Shell を使用して環境を設定するには、次の操作を行います。

  1. Google Cloud コンソールCloud Shell 有効化アイコンCloud Shell をアクティブにする」をクリックして、Google Cloud コンソールから Cloud Shell セッションを起動します。Google Cloud コンソールの下部ペインでセッションが起動します。

  2. 環境変数を設定します。

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    PROJECT_ID の置き換え: Google Cloud のプロジェクト ID に置き換えます。

  3. GitHub リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. 作業ディレクトリを変更します。

    cd kubernetes-engine-samples/streaming/
    

クラスタ インフラストラクチャを作成する

このセクションでは、Terraform スクリプトを実行して、限定公開の高可用性リージョン GKE クラスタを作成します。次の手順では、コントロール プレーンへの公開アクセスを許可します。アクセスを制限するため、限定公開クラスタを作成します。

オペレーターは、Standard または Autopilot クラスタを使用してインストールできます。

Standard

次の図は、3 つの異なるゾーンにデプロイされた限定公開のリージョン GKE Standard クラスタを示しています。

このインフラストラクチャをデプロイするには、Cloud Shell から次のコマンドを実行します。

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

プロンプトが表示されたら、「yes」と入力します。このコマンドが完了し、クラスタが準備完了ステータスになるまでに数分かかることがあります。

Terraform が次のリソースを作成します。

  • Kubernetes ノードの VPC ネットワークとプライベート サブネット。
  • NAT 経由でインターネットにアクセスするためのルーター。
  • us-central1 リージョンの限定公開 GKE クラスタ。
  • 自動スケーリングが有効な 2 つのノードプール(ゾーンあたり 1~2 ノード、ゾーンあたり 1 ノード以上)
  • ロギングとモニタリングの権限を持つ ServiceAccount
  • Backup for GKE(障害復旧用)。
  • Google Cloud Managed Service for Prometheus(クラスタ モニタリング用)。

出力は次のようになります。

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"

Autopilot

次の図は、限定公開のリージョン GKE Autopilot クラスタを示しています。

このインフラストラクチャをデプロイするには、Cloud Shell から次のコマンドを実行します。

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

プロンプトが表示されたら、「yes」と入力します。このコマンドが完了し、クラスタが準備完了ステータスになるまでに数分かかることがあります。

Terraform が次のリソースを作成します。

  • Kubernetes ノードの VPC ネットワークとプライベート サブネット
  • NAT 経由でインターネットにアクセスするためのルーター。
  • us-central1 リージョンの限定公開 GKE クラスタ。
  • ロギングとモニタリングの権限を持つ ServiceAccount
  • Google Cloud Managed Service for Prometheus(クラスタ モニタリング用)。

出力は次のようになります。

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"

クラスタへの接続

クラスタと通信を行うように kubectl を構成します。

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

Strimzi オペレーターをクラスタにデプロイする

このセクションでは、Helm チャートを使用して Strimzi オペレーターをデプロイします。Strimzi をデプロイする方法は他にもいくつかあります。

  1. Strimzi Helm チャート リポジトリを追加します。

    helm repo add strimzi https://strimzi.io/charts/
    
  2. Strimzi オペレーターと Kafka クラスタの Namespace を追加します。

    kubectl create ns kafka
    
  3. Helm を使用して Strimzi クラスタ オペレーターをデプロイします。

    helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka
    

    Strimzi クラスタ オペレーターと Kafka クラスタを異なる Namespace にデプロイする場合は、Helm コマンドに --set watchNamespaces="{kafka-namespace,kafka-namespace-2,...}" パラメータを追加します。

  4. Helm を使用して、Strimzi クラスタ オペレーターが正常にデプロイされたことを確認します。

    helm ls -n kafka
    

    出力は次のようになります。

    NAME            NAMESPACE    REVISION    UPDATED                              STATUS    CHART                        APP VERSION
    strimzi-operator    kafka      1       2023-06-27 11:22:15.850545 +0200 CEST    deployed    strimzi-kafka-operator-0.35.0    0.35.0
    

Kafka をデプロイする

オペレーターがクラスタにデプロイされると、Kafka クラスタ インスタンスをデプロイする準備が整います。

このセクションでは、Kafka を基本構成にデプロイしてから、可用性、セキュリティ、オブザーバビリティの要件を満たすため、さまざまな高度な構成シナリオを試してみます。

基本構成

Kafka インスタンスの基本構成には、次のコンポーネントが含まれています。

  • Kafka ブローカーの 3 つのレプリカ。クラスタの整合性を確保するため、利用可能なレプリカが少なくとも 2 つ必要です。
  • クラスタを形成する ZooKeeper ノードの 3 つのレプリカ。
  • 2 つの Kafka リスナー: 1 つは認証を利用しません。もう 1 つは Strimzi によって生成された証明書を使用して TLS 認証を利用します。
  • Kafka では Java の MaxHeapSize と MinHeapSize が 4 GB に設定され、ZooKeeper では 2 GB に設定されています。
  • 1 CPU リクエストの CPU リソースの割り当ては、Kafka、ZooKeeper ともに 2 CPU に制限されています。メモリ リクエストの割り当ては、Kafka の場合 5 GB で、メインサービスに 4 GB、指標エクスポータに 0.5 GB という制限があります。ZooKeeper の場合は 2.5 GB で、メインサービスに 2 GB、指標エクスポータに 0.5 GB という制限があります。
  • 次のリクエストと上限を持つ entity-operator。
    • tlsSidecar: 100 m / 500 m CPU と 128 Mi メモリ。
    • topicOperator: 100 m / 500 m CPU と 512 Mi メモリ。
    • userOperator: 500 m CPU と 2 Gi メモリ。
  • premium-rwo storageClass を使用して各 Pod に 100 GB のストレージが割り当てられます。
  • 各ワークロードに構成された toleration、nodeAffinities、podAntiAffinities。それぞれのノードプールと異なるゾーンを使用して、ノード間で適切に分散されます。
  • 自己署名証明書で保護されているクラスタ内の通信: クラスタとクライアント(mTLS)で個別の認証局(CA)。別の認証局を使用するように構成することもできます。

この構成は、本番環境に対応した Kafka クラスタの作成に必要な最小限の設定を表しています。以降のセクションでは、クラスタ セキュリティ、アクセス制御リスト(ACL)、トピック管理、証明書管理などに対処するためのカスタム構成を示します。

基本的な Kafka クラスタを作成する

  1. 基本構成を使用して新しい Kafka クラスタを作成します。

    kubectl apply -n kafka -f kafka-strimzi/manifests/01-basic-cluster/my-cluster.yaml
    

    このコマンドは Strimzi オペレーターの Kafka カスタム リソースを作成します。これにより、CPU とメモリのリクエストと上限、ブロック ストレージ リクエスト、プロビジョニングされた Pod を Kubernetes ノード間で分散するための taint とアフィニティの組み合わせが含まれます。

  2. Kubernetes が必要なワークロードを開始するまで数分待ちます。

    kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s -n kafka
    
  3. Kafka ワークロードが作成されたことを確認します。

    kubectl get pod,service,deploy,pdb -l=strimzi.io/cluster=my-cluster -n kafka
    

    出力は次のようになります。

    NAME                                            READY   STATUS  RESTARTS   AGE
    pod/my-cluster-entity-operator-848698874f-j5m7f   3/3   Running   0        44m
    pod/my-cluster-kafka-0                          1/1   Running   0        5m
    pod/my-cluster-kafka-1                          1/1   Running   0        5m
    pod/my-cluster-kafka-2                          1/1   Running   0        5m
    pod/my-cluster-zookeeper-0                      1/1   Running   0        6m
    pod/my-cluster-zookeeper-1                      1/1   Running   0        6m
    pod/my-cluster-zookeeper-2                      1/1   Running   0        6m
    
    NAME                                TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                             AGE
    service/my-cluster-kafka-bootstrap  ClusterIP   10.52.8.80   <none>      9091/TCP,9092/TCP,9093/TCP          5m
    service/my-cluster-kafka-brokers    ClusterIP   None         <none>      9090/TCP,9091/TCP,9092/TCP,9093/TCP   5m
    service/my-cluster-zookeeper-client   ClusterIP   10.52.11.144   <none>      2181/TCP                            6m
    service/my-cluster-zookeeper-nodes  ClusterIP   None         <none>      2181/TCP,2888/TCP,3888/TCP          6m
    
    NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/my-cluster-entity-operator   1/1   1          1         44m
    
    NAME                                            MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster-kafka     2             N/A             1                   5m
    poddisruptionbudget.policy/my-cluster-zookeeper   2             N/A             1                   6m
    

オペレーターが次のリソースを作成します。

  • Kafka と ZooKeeper に 2 つの StrimziPodSets
  • Kafka ブローカー レプリカ用に 3 つの Pod。
  • ZooKeeper レプリカ用に 3 つの Pod。
  • 2 つの PodDisruptionBudgets。クラスタの整合性のため、少なくとも 2 つのレプリカの可用性を確保します。
  • my-cluster-kafka-bootstrap という名前の Service。Kubernetes クラスタ内から接続する Kafka クライアントのブートストラップ サーバーとして機能します。この Service では、すべての内部 Kafka リスナーを使用できます。
  • my-cluster-kafka-brokers という名前のヘッドレス Service。Kafka ブローカー Pod IP アドレスの DNS 解決を直接有効にします。この Service はブローカー間の通信に使用されます。
  • my-cluster-zookeeper-client という名前の Service。Kafka ブローカーがクライアントとして ZooKeeper ノードに接続できるようにします。
  • ZooKeeper Pod IP アドレスの DNS 解決を直接行う my-cluster-zookeeper-nodes という名前のヘッドレス Service。この Service は ZooKeeper レプリカ間の接続に使用されます。
  • my-cluster-entity-operator という名前の Deployment。topic-operator と user-operator を含み、カスタム リソースである KafkaTopicsKafkaUsers の簡単を容易にします。

2 つの NetworkPolicies を構成して、任意の Pod と Namespace から Kafka リスナーへの接続を容易にすることもできます。また、これらのポリシーは、ZooKeeper への接続をブローカーに制限し、クラスタ Pod とクラスタ通信専用の内部 Service ポート間の通信を有効にします。

認証とユーザー管理

このセクションでは、Kafka リスナーを保護し、クライアントと認証情報を共有するために認証と認可を有効にする方法について説明します。

Strimzi では、Kubernetes ネイティブの方法でユーザー管理を行います。個別の User Operator とそれに対応し、ユーザー構成が定義されている Kubernetes カスタム リソース(KafkaUser)を使用します。ユーザー構成には認証と認可の設定が含まれており、対応するユーザーを Kafka にプロビジョニングします。

Strimzi では、ユーザー名とパスワードに基づく認証(SCRAM-SHA-512)や TLS など、複数の認証メカニズムをサポートする Kafka リスナーとユーザーを作成できます。また、OAuth 2.0 認証も使用できます。この認証方法は、セキュリティと外部で認証情報を管理するという点で、パスワードや証明書を使用する方法よりも優れていると考えられています。

Kafka クラスタをデプロイする

このセクションでは、ユーザー管理機能を実行する Strimzi オペレーターのデプロイ方法を示します。ここでは、次のものをデプロイします。

  • いずれかのリスナーでパスワード ベースの認証(SCRAM-SHA-512)が有効になっている Kafka クラスタ。
  • 3 つのレプリカを持つ KafkaTopic
  • トピックに対するユーザーの読み取り権限と書き込み権限を指定する ACL を持つ KafkaUser
  1. パスワード ベースの SCRAM-SHA-512 認証と単純な認証を行うリスナーをポート 9094 で使用するように、Kafka クラスタを構成します。

    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-cluster.yaml
    
  2. TopicUser、Kafka クラスタにコマンドを実行するクライアント Pod を作成します。

    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/topic.yaml
    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-user.yaml
    

    ユーザー認証情報を含む Secret my-user は、Volume としてクライアント Pod にマウントされます。

    これらの認証情報により、パスワード ベースの認証(SCRAM-SHA-512)を有効にしたリスナーを使用してトピックにメッセージをパブリッシュする権限がユーザーにあることが確認されます。

  3. クライアント Pod を作成します。

    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/kafkacat.yaml
    
  4. クライアント Pod が Ready になるまで数分待ってから接続します。

    kubectl wait --for=condition=Ready pod --all -n kafka --timeout=600s
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. my-user 認証情報を使用して新しいメッセージを生成し、使用します。

    echo "Message from my-user" |kcat \
      -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=SCRAM-SHA-512 \
      -X sasl.username=my-user \
      -X sasl.password=$(cat /my-user/password) \
      -t my-topic -P
    kcat -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=SCRAM-SHA-512 \
      -X sasl.username=my-user \
      -X sasl.password=$(cat /my-user/password) \
      -t my-topic -C
    

    出力は次のようになります。

    Message from my-user
    % Reached end of topic my-topic [0] at offset 0
    % Reached end of topic my-topic [2] at offset 1
    % Reached end of topic my-topic [1] at offset 0
    

    CTRL+C」と入力して、コンシューマー プロセスを停止します。

  6. Pod のシェルを終了します。

    exit
    

バックアップと障害復旧

Strimzi オペレーターにはバックアップ機能が組み込まれていませんが、特定のパターンに従うことで効率的なバックアップ戦略を実装できます。

Backup for GKE を使用して、次のものをバックアップできます。

  • Kubernetes リソース マニフェスト。
  • バックアップを行うクラスタの Kubernetes API サーバーから抽出された Strimzi API カスタム リソースとその定義。
  • マニフェスト内の PersistentVolumeClaim リソースに対応する Volume。

Backup for GKE を使用して Kafka クラスタのバックアップと復元を行う方法については、障害復旧の準備を行うをご覧ください。

Strimzi オペレーターを使用して、デプロイされた Kafka クラスタのバックアップを実行することもできます。次のデータをバックアップする必要があります。

  • Kafka 構成。KafkaTopicsKafkaUsers など、Strimzi API のすべてのカスタム リソースが含まれます。
  • Kafka ブローカーの PersistentVolume に保存されているデータ。

Strimzi の構成を含む Kubernetes リソース マニフェストを Git リポジトリに保存すると、必要に応じてリソースを新しい Kubernetes クラスタに再適用できるため、Kafka 構成のバックアップを別途行う必要がなくなります。

Kafka サーバー インスタンスまたは Kafka がデプロイされた Kubernetes クラスタが失われた場合に Kafka のデータを復旧できるようにするため、reclaimPolicy オプションを Retain に設定して、Kafka ブローカーのボリュームのプロビジョニングに使用される Kubernetes ストレージ クラスを構成することをおすすめします。また、Kafka ブローカー ボリュームのスナップショットを取得することもおすすめします。

次のマニフェストでは、reclaimPolicy オプション Retain を使用する StorageClass を記述しています。

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

次の例は、Kafka クラスタのカスタム リソースの spec に追加された StorageClass を示しています。

# ...
spec:
  kafka:
    # ...
    storage:
      type: persistent-claim
      size: 100Gi
      class: premium-rwo-retain

この構成では、対応する PersistentVolumeClaim が削除されても、ストレージ クラスを使用してプロビジョニングされた PersistentVolume は削除されません。

既存の構成とブローカー インスタンス データを使用して、新しい Kubernetes クラスタに Kafka インスタンスを復元するには:

  1. 既存の Strimzi Kafka カスタム リソース(KakfaKafkaTopicKafkaUser など)を新しい Kubernetes クラスタに適用します。
  2. PersistentVolumeClaim の spec.volumeName プロパティを使用して、新しい Kafka ブローカー インスタンスの名前を持つ PersistentVolumeClaim を古い PersistentVolume に戻します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

個々のリソースを削除する

既存のプロジェクトを削除しない場合は、リソースを個別に削除します。

  1. 環境変数を設定します。

    export PROJECT_ID=${PROJECT_ID}
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    
  2. terraform destroy コマンドを実行します。

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=kafka/terraform/FOLDER destroy -var project_id=${PROJECT_ID}   \
      -var region=${REGION}  \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
    

    FOLDER は、gke-autopilot または gke-standard に置き換えます。

    プロンプトが表示されたら、「yes」と入力します。

  3. 接続されていないすべてのディスクを検索します。

    export disk_list=$(gcloud compute disks list --filter="-users:* AND labels.name=${KUBERNETES_CLUSTER_PREFIX}-cluster" --format "value[separator=|](name,zone)")
    

    Strimzi のデフォルトではストレージに deleteClaim: false パラメータが使用されているため、この手順が必要になります。クラスタを削除しても、すべてのディスクを引き続き使用できます。

  4. ディスクを削除します。

    for i in $disk_list; do
      disk_name=$(echo $i| cut -d'|' -f1)
      disk_zone=$(echo $i| cut -d'|' -f2|sed 's|.*/||')
      echo "Deleting $disk_name"
      gcloud compute disks delete $disk_name --zone $disk_zone --quiet
    done
    

次のステップ