Kafka から Pub/Sub Lite への移行

このドキュメントは、セルフマネージド Apache Kafka から Pub/Sub Lite への移行を検討している場合に役立ちます。

Pub/Sub Lite の概要

Pub/Sub Lite は、低コストで運用できる大容量のメッセージング サービスです。Pub/Sub Lite には、事前にプロビジョニングされた容量とともにゾーンストレージとリージョン ストレージが用意されています。Pub/Sub Lite では、ゾーンまたはリージョンの Lite トピックを選択できます。リージョン Lite トピックでは、Pub/Sub トピックと同じ可用性 SLA が提供されます。ただし、メッセージ レプリケーションの点では、Pub/Sub と Pub/Sub Lite の間に信頼性の違いがあります。

Pub/Sub と Pub/Sub Lite の詳細については、Pub/Sub とはをご覧ください。

Lite でサポートされているリージョンとゾーンの詳細については、Pub/Sub Lite のロケーションをご覧ください。

Pub/Sub Lite の用語

Pub/Sub Lite の主な用語は次のとおりです。

  • メッセージ。 Pub/Sub Lite サービスを介して移動するデータ。

  • トピック。メッセージのフィードを表す名前付きリソース。Pub/Sub Lite では、ゾーンまたはリージョンの Lite トピックを作成できます。Pub/Sub Lite リージョン トピックでは、単一リージョンの 2 つのゾーンにデータが保存されます。Pub/Sub Lite ゾーン トピックでは、データは 1 つのゾーン内でのみ複製されます。

  • 予約。リージョン内の複数の Lite トピックによって共有されるスループット容量の名前付きプール。

  • サブスクリプション。特定の Lite トピックからメッセージを受信することへの関心を表す、名前付きのリソース。サブスクリプションは、単一のトピックにのみ接続する Kafka のコンシューマー グループに似ています。

  • サブスクライバー。Lite トピックと指定されたサブスクリプションからメッセージを受信する Pub/Sub Lite のクライアント。サブスクリプションには複数のサブスクライバー クライアントを含めることができます。このような場合、メッセージはサブスクライバー クライアント間で負荷分散されます。Kafka では、サブスクライバーはコンシューマーと呼ばれます。

  • パブリッシャー。メッセージを作成して特定の Lite トピックに送信(パブリッシュ)するアプリケーション。トピックは複数のパブリッシャーを持つことができます。 Kafka では、パブリッシャーはプロデューサーと呼ばれます。

Kafka と Pub/Sub Lite の違い

Pub/Sub Lite は概念的に Kafka に似ていますが、データの取り込みに重点を置いた、より幅の狭い API を持つ別のシステムです。これらの違いはストリームの取り込みと処理には重要ではありませんが、その違いが重要となるユースケースがいくつかあります。

データベースとしての Kafka

現在 Pub/Sub Lite では、Kafka とは異なり、トランザクションのパブリッシュやログの圧縮はサポートされていませんが、べき等性はサポートされています。これらの Kafka 機能は、Kafka をメッセージ システムよりもデータベースとして使用する場合に役立ちます。Kafka をデータベースとして使用する場合は、独自の Kafka クラスタを実行するか、Confluent Cloud などのマネージド Kafka ソリューションを使用することを検討してください。どちらのソリューションも使用できない場合は、Cloud Spanner のような水平スケーリング可能なデータベースの使用を検討してください。

Kafka ストリーム

Kafka ストリームは Kafka の上に構築されたデータ処理システムです。コンシューマー クライアントを挿入できますが、すべての管理者オペレーションにアクセスする必要があります。Kafka ストリームはまた、内部メタデータの保存に Kafka のトランザクション データベース プロパティを使用します。そのため、現時点では Pub/Sub Lite を Kafka ストリーム アプリケーションに使用することはできません。

Apache Beam は、Kafka、Pub/Sub、Pub/Sub Lite と統合されている同様のストリーミング データ処理システムです。Beam パイプラインは Dataflow を使用したフルマネージドの方法、または既存の Apache Flink および Apache Spark クラスタで実行できます。

モニタリング

Kafka クライアントは、サーバー側の指標を読み取ることができます。Pub/Sub Lite では、パブリッシャーとサブスクライバーの動作に関連する指標は、追加の構成なしで Cloud Monitoring によって管理されます。

容量管理

Kafka トピックの容量は、クラスタの容量によって決まります。 レプリケーション、鍵の圧縮、バッチの設定により、Kafka クラスタで特定のトピックを処理するために必要な容量が決まります。Kafka トピックのスループットは、ブローカーが実行されているマシンの容量によって制限されます。一方、Pub/Sub Lite トピックにはストレージ容量とスループット容量の両方を定義する必要があります。 Pub/Sub Lite のストレージ容量は、トピックの構成可能なプロパティです。スループット容量は、構成された予約の容量と、パーティションごとの固有の上限または構成済み上限に基づいています。

認証とセキュリティ

Apache Kafka は、いくつかのオープン認証と暗号化メカニズムをサポートしています。Pub/Sub Lite では、認証は IAM システムに基づきます。セキュリティは、保存時と転送時の暗号化によって保証されます。Pub/Sub Lite 認証の詳細については、このドキュメントの後半の移行ワークフロー セクションをご覧ください。

Kafka プロパティを Pub/Sub Lite プロパティにマッピングする

Kafka には、トピックの構造、制限、ブローカーのプロパティを制御する構成オプションが多数あります。このセクションでは、データの取り込みに役立つ一般的なものについて説明し、Pub/Sub Lite での同等のものについても説明します。Pub/Sub Lite はマネージド システムであるため、多くのブローカー プロパティを考慮する必要はありません。

Topic の構成プロパティ

Kafka プロパティ Pub/Sub Lite プロパティ 説明
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="dV7DQA4y7L7qKMxOHGc8kmZb6gHWkZLxLSo7Ja3t8sQHgM1RbUvT8FDCmfNOUyfqProEiKpRjlB87eKAtLChinJJVZe4MddwFRESYOiHj80=">retention.bytes </aclass="external"> パーティションあたりのストレージ Lite トピック内のすべてのパーティションには、同じストレージ容量が設定されています。Lite トピックのストレージ容量は、トピック内のすべてのパーティションのストレージ容量の合計です。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="QnWVoAipx5CRcevnGcXwAGZb6gHWkZLxLSo7Ja3t8sQHgM1RbUvT8FDCmfNOUyfqProEiKpRjlB87eKAtLChis7tgVJs7v0+f6/z72IBgl0=">retention.ms </aclass="external"> メッセージ保持期間 Lite トピックでメッセージが保存される最長時間。メッセージ保持期間を指定しなかった場合、Lite トピックはストレージ容量を超過するまでメッセージを保存します。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="1EnWqBFYsbXCkJvluoFZHWZb6gHWkZLxLSo7Ja3t8sQHgM1RbUvT8FDCmfNOUyfqabn4RbnazUIXATmdGtrPyqrz+M3RfOj6o9TWwqB6UaE=">flush.ms, <aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="D8tZ+YryF011zrRlSlCouGZb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKEN/weLnW8ody8JZUs0+mzHw==">acks </aclass="external"></aclass="external"> Pub/Sub Lite では構成不可 パブリッシュは、複製されたストレージに保持されることが保証されるまで確認応答されません。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="w2C6ltfFLx0WhPQQ+xq7GmZb6gHWkZLxLSo7Ja3t8sQHgM1RbUvT8FDCmfNOUyfq+EJHRL9vqmxrEWQgG1UleYQ3+ziPJbJIEqItRIDSEGQ=">max.message.bytes </aclass="external"> Pub/Sub Lite では構成不可 3.5 MiB は、Pub/Sub Lite に送信できる最大メッセージ サイズです。メッセージ サイズは繰り返し可能な方法で計算されます。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="uByJEasoVxsHiHGi2vZFpWZb6gHWkZLxLSo7Ja3t8sQHgM1RbUvT8FDCmfNOUyfqQgNEpY+vqWEChVRJLLgf+Nbws6STZxXZA3AjaBgIpr4=">message.timestamp.type </aclass="external"> Pub/Sub Lite では構成不可 コンシューマー実装を使用する場合、イベント タイムスタンプが存在する場合はそれが選択されるか、代わりにパブリッシュ タイムスタンプが使用されます。Beam を使用する場合、パブリッシュ タイムスタンプとイベント タイムスタンプの両方を使用できます。

Lite トピックのプロパティの詳細については、Lite トピックのプロパティをご覧ください。

Producer の構成プロパティ

Pub/Sub Lite では Producer ワイヤ プロトコルがサポートされています。一部のプロパティでは、プロデューサー Cloud クライアント ライブラリの動作が変更されます。次の表で、一般的なプロパティについて説明します。

Kafka プロパティ Pub/Sub Lite プロパティ 説明
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="ka6ek6z5j1AUV6h0GyHmOWZb6gHWkZLxLSo7Ja3t8sRw5UOg9XUG3SbKp/CA/UGEa9PU2/IZ1QB5PzPEKwJwii23M7pePF8367yuVtiwSCgqumajQ5CIdQL0uKISUd+2">auto.create.topics.enable </aclass="external"> Pub/Sub Lite では構成不可 Pub/Sub Lite で、1 つのトピックについて、消費者グループとほぼ同じトピックとサブスクリプションを作成します。コンソール、gcloud CLI、API、または Cloud クライアント ライブラリを使用できます。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="w2C6ltfFLx0WhPQQ+xq7GmZb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKEX+jyP6Bz5/wh8WHRaM+1VDjc6rJHP+OLVS8WtNmcQtc=">key.serializer, <aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="pSX6SpdSWGS3KbkuJBUem2Zb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKESqwlEsmDjQNe631xyd/OBEmINreu72SEhtSVBm5Rw0w=">value.serializer </aclass="external"></aclass="external"> Pub/Sub Lite では構成不可

ワイヤ プロトコルを使用して通信する Kafka Producer または同等のライブラリを使用する場合は必須です。

<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="/nUcmkQCz7NdVnO/fs/U8WZb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKEp3gb5Htxs2OZOSLAVFokTJHSiK/dsgG9NhPtFx1kAW8=">batch.size </aclass="external"> Pub/Sub Lite でのサポート バッチ処理がサポートされています。最適なパフォーマンスを得るには、この値の推奨値は 10 MiB です。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="QnWVoAipx5CRcevnGcXwAGZb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKE5j8zTzynZ+3TyWdN4ior8uAuE8JMvyLpVzWxDJ8mdak=">linger.ms </aclass="external"> Pub/Sub Lite でのサポート バッチ処理がサポートされています。最高のパフォーマンスを得るには、この値の推奨値は 50 ミリ秒です。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="pSX6SpdSWGS3KbkuJBUem2Zb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKEj7ilyCdDp4ecsK36QQ4uhQDXcklDhZxjwV3LA3JE6Ds=">max.request.size </aclass="external"> Pub/Sub Lite でのサポート サーバーはバッチあたり 20 MiB という上限を設定しています。この値は、Kafka クライアントで 20 MiB 未満に設定します。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="9mE3X9aIpBVps4x1lRMwdGZb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKE8lAxDO6sfSpMmM8Pg2RTBboAuzIbyLjMx0U1d0C7yvo=">enable.idempotence </aclass="external"> Pub/Sub Lite でのサポート
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="pSX6SpdSWGS3KbkuJBUem2Zb6gHWkZLxLSo7Ja3t8sRqk6OxyqMTCHV662ootwKEufyeVlB0ORvuIlCupIX+AqOrJcy9AO52Q6rW+KM5+Uw=">compression.type </aclass="external"> Pub/Sub Lite ではサポートされていない この値は、明示的に none に設定する必要があります。

Consumer の構成プロパティ

Pub/Sub Lite では Consumer ワイヤ プロトコルがサポートされています。一部のプロパティは、コンシューマの Cloud クライアント ライブラリの動作を変更します。次の表で、一般的なプロパティについて説明します。

Kafka プロパティ 説明
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="pSX6SpdSWGS3KbkuJBUem2Zb6gHWkZLxLSo7Ja3t8sQyhicHhp9oG2sYdjqrOOFMesHcoabWpmRWc4QGiNw/Nt3N4FAWfkLBFxu1XIPZ/mc=">key.deserializer, <aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="9mE3X9aIpBVps4x1lRMwdGZb6gHWkZLxLSo7Ja3t8sQyhicHhp9oG2sYdjqrOOFMOo1Ck57YWd7phlgbFg6bnNPnjld49QCIfr57dmq9W6Y=">value.deserializer </aclass="external"></aclass="external">

ワイヤ プロトコルを使用して通信する Kafka Consumer または同等のライブラリを使用する場合は必須です。

<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="6Yq99Si1ofpPBF++wTnR6WZb6gHWkZLxLSo7Ja3t8sQyhicHhp9oG2sYdjqrOOFMukQRfA+0/fiRFxWe1F7rREaszCrdgu3LDJfajV9VYJw=">auto.offset.reset </aclass="external"> この構成はサポートされていないか、不要です。サブスクリプションが作成されると、オフセットのロケーションが必ず定義されます。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="uByJEasoVxsHiHGi2vZFpWZb6gHWkZLxLSo7Ja3t8sQHgM1RbUvT8FDCmfNOUyfqQgNEpY+vqWEChVRJLLgf+Nbws6STZxXZA3AjaBgIpr4=">message.timestamp.type </aclass="external"> パブリッシュ タイムスタンプは、常に Pub/Sub Lite から使用でき、パーティションごとに減少しないことが保証されています。イベント タイムスタンプは、パブリッシュ時にメッセージに添付されたかどうかによって、存在する場合と存在しない場合があります。Dataflow を使用する場合、パブリッシュ タイムスタンプとイベント タイムスタンプの両方を同時に使用できます。
<aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="C1w6rZRsJ3qdhnqUpePxxmZb6gHWkZLxLSo7Ja3t8sQyhicHhp9oG2sYdjqrOOFMUP9pWM0gsH8dBz8z/Znyl06tJetNspNa7O8sf20qNfcwoG4LrkKWpCzFW/9ML1dc">max.partition.fetch.bytes, <aclass="external" l10n-attrs-original-order="href,class" l10n-encrypted-href="pSX6SpdSWGS3KbkuJBUem2Zb6gHWkZLxLSo7Ja3t8sQyhicHhp9oG2sYdjqrOOFMUP9pWM0gsH8dBz8z/Znyl4gSgXAmfEcOnKjDPqz24u4=">max.poll.records </aclass="external"></aclass="external"> poll() 呼び出しから返されるレコードとバイトの数、および内部取得リクエストから返されるバイト数にソフトリミットを設定します。デフォルトの 1 MiB の「max.partition.fetch.bytes」では、クライアントのスループットが制限される場合があるため、この値を増やすことを検討してください。

Kafka と Pub/Sub Lite の機能の比較

次の表は、Apache Kafka の機能と Pub/Sub Lite の機能を比較したものです。

機能 Kafka Pub/Sub Lite
メッセージの順序指定
メッセージの重複除去 ○(Dataflow を使用)
プッシュ サブスクリプション × ○(Pub/Sub エクスポートを使用)
トランザクション ×
メッセージ ストレージ 利用可能なマシン ストレージによる制限 無制限
メッセージの再生
ロギングとモニタリング セルフマネージド Cloud Monitoring により自動化
ストリーム処理 ○(Kafka StreamsApache BeamDataproc を使用)。 ○(Beam または Dataproc を使用)。

次の表では、Kafka を使用した自己ホスト型の機能と、Pub/Sub Lite を使用した Google が管理する機能を比較します。

機能 Kafka Pub/Sub Lite
可用性 Kafka を他のロケーションに手動でデプロイする。 世界中にデプロイ。場所をご覧ください。
障害復旧 独自のバックアップとレプリケーションを設計して維持する。 Google で管理。
インフラストラクチャ管理 仮想マシン(VM)またはマシンを手動でデプロイして運用する。一貫したバージョニングとパッチを維持する。 Google で管理。
キャパシティ プランニング ストレージとコンピューティングのニーズを事前に計画する。 Google で管理。コンピューティングとストレージはいつでも増やすことができます。
サポート なし 24 時間対応の待機スタッフとサポート。

Kafka と Pub/Sub Lite の費用の比較

Pub/Sub Lite での費用の見積もりと管理の方法は、Kafka とは異なります。オンプレミスまたはクラウド上の Kafka クラスタの費用には、マシン、ディスク、ネットワーク、受信メッセージ、送信メッセージの費用が含まれます。また、システムや関連インフラストラクチャの管理と維持のためのオーバーヘッド コストも発生します。Kafka クラスタを管理する場合は、マシンを手動でアップグレードし、クラスタの容量を計画して、広範な計画とテストを含む障害復旧を実装する必要があります。さまざまな費用をすべて集計して、実際の総所有コスト(TCO)を決定する必要があります。

Pub/Sub Lite の料金には、予約費用(パブリッシュされたバイト数、サブスクライブされたバイト数、Kafka プロキシによって処理されるバイト数)、およびプロビジョニングされたストレージの費用が含まれます。送信メッセージの料金に加えて、予約したリソースに対して正確に料金を支払います。料金計算ツールを使用して、料金を概算できます。

移行ワークフロー

Kafka クラスタから Pub/Sub Lite にトピックを移行するには、次の手順に従います。

Pub/Sub Lite リソースを構成する

  1. 移行するすべてのトピックで予想されるスループットの Pub/Sub Lite 予約を作成します。

    Pub/Sub Lite の料金計算ツールを使用して、既存の Kafka トピックの集計スループット指標を計算します。予約を作成する方法の詳細については、Lite 予約の作成と管理をご覧ください。

  2. Kafka 内の対応するトピックごとに 1 つの Pub/Sub Lite トピックを作成します。

    Lite トピックを作成する方法の詳細については、Lite トピックの作成と管理をご覧ください。

  3. Kafka クラスタ内の対応するコンシューマー グループとトピックのペアごとに 1 つの Pub/Sub Lite サブスクリプションを作成します。

    たとえば、topic-atopic-b から使用する consumers という名前のコンシューマー グループの場合、topic-a に接続されたサブスクリプション consumers-a と、topic-b に接続されたサブスクリプション consumers-b を作成します。サブスクリプションの作成方法の詳細については、Lite サブスクリプションの作成と管理をご覧ください。

Pub/Sub Lite に対する認証

Kafka クライアントのタイプに応じて、次のいずれかの方法を選択します。

再ビルドしてバージョン 3.1.0 以降を実行している Java ベースの Kafka クライアント

Kafka クライアントを実行しているインスタンス上で再ビルドできるバージョン 3.1.0 以降の Java ベースの Kafka クライアントの場合。

  1. 次の方法で、com.google.cloud:pubsublite-kafka-auth パッケージをインストールします。

  2. com.google.cloud.pubsublite.kafka.ClientParameters.getParams を使用して、Pub/Sub Lite への認証に必要なパラメータを取得します。

    getParams() メソッド(コードサンプルを参照)によって、次の JAASSASL 構成が、Pub/Sub Lite への認証に使用するパラメータとして初期化されます。

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=http://localhost:14293
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    

再ビルドせずにバージョン 3.1.0 以降を実行している Java ベースの Kafka クライアント

KIP-768 をサポートする Kafka クライアントの場合、Python サイドカー スクリプトを使用する構成専用の OAUTHBEARER 認証がサポートされます。 これらのバージョンには、2022 年 1 月 Java バージョン 3.1.0 以降が含まれています。

Kafka クライアントを実行しているインスタンスで、次の手順を行います。

  1. Python 3.6 以降をインストールします。

    Python のインストールをご覧ください。

  2. Google 認証パッケージ pip install google-auth をインストールします。

    このライブラリは、Google API にアクセスするためのさまざまなサーバー間認証メカニズムを簡素化します。google-auth ページをご覧ください。

  3. kafka_gcp_credentials.py スクリプトを実行します。

    このスクリプトは、ローカル HTTP サーバーを起動し、google.auth.default() を使用して環境内のデフォルトの Google Cloud 認証情報を取得します。

    取得された認証情報のプリンシパルには、使用している Google Cloud プロジェクトおよび接続先のロケーションに対する pubsublite.locations.openKafkaStream 権限が必要です。Pub/Sub Lite パブリッシャー(roles/pubsublite.publisher)と Pub/Sub Lite サブスクライバー(roles/pubsublite.subscriber)のロールには、この必要な権限が含まれています。これらのロールをプリンシパルに追加します。

    認証情報は、Kafka クライアントの SASL/OAUTHBEARER 認証で使用されます。

    Kafka クライアントから Pub/Sub Lite への認証を行うには、次のパラメータがプロパティに必要です。

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=localhost:14293
    sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
      required clientId="unused" clientSecret="unused" \
      extension_pubsubProject="PROJECT_ID";
    

    PROJECT_ID は、Pub/Sub Lite を実行しているプロジェクトの ID に置き換えます。

再ビルドしない他のすべてのクライアント

他のすべてのクライアントの場合は、次の手順を行います。

  1. クライアントに使用するサービス アカウントのサービス アカウント キーの JSON ファイルをダウンロードします。

  2. 認証文字列として使用する base64 エンコードを使用して、サービス アカウント ファイルをエンコードします。

    Linux または macOS システムでは、次のように base64 コマンド(多くの場合デフォルトでインストールされています)を使用できます。

    base64 < my_service_account.json > password.txt
    

    次のパラメータで、認証用のパスワード ファイルの内容を使用できます。

    Java

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
     username="PROJECT_ID" \
     password="contents of base64 encoded password file";
    

    PROJECT_ID は、Pub/Sub を実行するプロジェクトの ID に置き換えます。

    librdkafka

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.username=PROJECT_ID
    sasl.password=contents of base64 encoded password file
    

    PROJECT_ID は、Pub/Sub を実行するプロジェクトの ID に置き換えます。

Kafka Connect を使用してデータのクローンを作成する

Pub/Sub Lite チームは、Kafka Connect シンクの実装を維持しています。この実装は、Kafka Connect クラスタを使用して Kafka トピックから Pub/Sub Lite トピックにデータをコピーするように構成できます。

データコピーを実行するようにコネクタを構成するには、Pub/Sub グループの Kafka コネクタをご覧ください。

パーティション アフィニティが移行プロセスの影響を受けないようにするには、kafka トピックと Pub/Sub Lite トピックのパーティション数が同じで、pubsublite.ordering.mode プロパティが KAFKA に設定されていることを確認します。これにより、コネクタは、最初にパブリッシュされた kafka パーティションと同じインデックスを持つ Pub/Sub Lite パーティションにメッセージをルーティングします。

コンシューマーを移行する

Pub/Sub Lite のリソースモデルは Kafka のリソースモデルとは異なります。最も重要なのは、コンシューマ グループとは異なり、サブスクリプションは明示的なリソースであり、1 つのトピックにしか関連付けられていないことです。この違いのため、topic を渡す必要がある Kafka Consumer API 内の場所には、代わりに完全なサブスクリプション パスを渡す必要があります。

Kafka クライアントの SASL 構成に加えて、Kafka Consumer API を使用して Pub/Sub Lite を操作する場合は、次の設定も必要です。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443
group.id=unused

REGION は、Pub/Sub Lite サブスクリプションが存在するリージョンに置き換えます。

特定のサブスクリプションの最初の Pub/Sub Lite コンシューマ ジョブを開始する前に、管理者のシーク オペレーションを待機することなく開始して、コンシューマの最初のロケーションを設定できます。

コンシューマを起動すると、コンシューマはメッセージ バックログにある現在のオフセットに再接続します。古いクライアントと新しいクライアントの両方を動作の確認に要する期間だけ並列に実行し、古いコンシューマー クライアントを停止します。

プロデューサーを移行する

Kafka クライアントの SASL 構成に加えて、Kafka Producer API を使用して Pub/Sub Lite とやり取りする場合は、プロデューサー パラメータとして以下も必要です。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443

REGION は、Pub/Sub Lite トピックが存在するリージョンに置き換えます。

トピックのすべてのコンシューマーを Pub/Sub Lite から読み取るように移行した後、プロデューサーのトラフィックを Pub/Sub Lite に直接書き込むように移行します。

Kafka トピックではなく Pub/Sub Lite トピックに書き込むように、プロデューサー クライアントを段階的に移行します。

プロデューサー クライアントを再起動して、新しい構成を読み込みます。

Kafka Connect を無効にする

Pub/Sub Lite に直接書き込むすべてのプロデューサーを移行した後は、コネクタがデータをコピーすることはありません。

Kafka Connect インスタンスは停止できます。

Kafka 接続のトラブルシューティング

Kafka クライアントはカスタム ワイヤ プロトコルを介して通信するため、すべてのリクエストで障害が発生した場合にエラー メッセージを提供することはできません。メッセージの一部として送信されるエラーコードを使用します。

クライアントで発生したエラーの詳細を表示するには、org.apache.kafka 接頭辞のロギングレベルを FINEST に設定します。

低スループットとバックログの増加

スループットが低く、バックログが増加する理由はいくつかあります。1 つの理由は、容量不足の場合です。

トピックレベルで、または予約を使用して、スループット容量を構成できます。サブスクライブとパブリッシュのスループット容量が構成されていない場合、サブスクライブとパブリッシュに対応するスループットが抑制されます。

このスループット エラーは、パブリッシャーの topic/flow_control_status 指標とサブスクライバーの subscription/flow_control_status 指標によって通知されます。この指標の状態は次のとおりです。

  • NO_PARTITION_CAPACITY: このメッセージは、パーティションごとのスループット上限に達したことを示します。

  • NO_RESERVATION_CAPACITY: このメッセージは、予約ごとのスループットの上限に到達したことを示します。

トピックまたは予約のパブリッシュとサブスクライブの割り当てに関する使用率グラフを表示して、使用率が 100% か、それに近くになっているかを確認できます。

この問題を解決するには、トピックまたは予約のスループット容量を増やします。

トピック承認失敗のエラー メッセージ

Kafka API を使用してパブリッシュするには、Lite サービス エージェントに Pub/Sub Lite トピックにパブリッシュする適切な権限が必要です。

Pub/Sub Lite トピックにパブリッシュする適切な権限がない場合、クライアントにエラー TOPIC_AUTHORIZATION_FAILED が表示されます。

この問題を解決するには、プロジェクトの Lite サービス エージェントが認証構成を渡したかどうかを確認します。

無効なトピックのエラー メッセージ

Kafka API を使用してサブスクライブするには、Kafka Consumer API で topic を想定しているすべての場所に完全なサブスクリプション パスを渡す必要があります。

正しい形式のサブスクリプション パスを渡さないと、コンシューマ クライアントでエラー INVALID_TOPIC_EXCEPTION が発生します。

予約を使用していない場合の無効なリクエスト

kafka ワイヤ プロトコルのサポートを使用するには、すべてのトピックに対して、関連する課金を予約しておく必要があります。