Pub/Sub を Apache Kafka に接続する

このドキュメントでは、Pub/Sub グループの Kafka コネクタを使用して Apache Kafka と Pub/Sub Lite を統合する方法について説明します。

Pub/Sub Kafka コネクタの概要

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub Lite は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub Lite を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。

Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。次のコネクタは、Connector JAR にパッケージ化されています。

  • シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub Lite に公開します。
  • ソースコネクタが Pub/Sub Lite トピックからメッセージを読み取り、Kafka にパブリッシュします。

Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。

  • Kafka ベースのアーキテクチャを Google Cloud に移行しようとしている。
  • Google Cloud の外部に Kafka にイベントを保存するフロントエンド システムがあるが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用している。
  • オンプレミスの Kafka ソリューションからログを収集して、Google Cloud に送信してデータ分析を行う。
  • Google Cloud を使用するフロントエンド システムがあるが、Kafka を使用してオンプレミスにもデータを保存している。

コネクタには Kafka Connect が必要です。これは、Kafka と他のシステムの間でデータをストリーミングするためのフレームワークです。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。

このドキュメントは、Kafka と Pub/Sub Lite の両方に精通していることを前提としています。Pub/Sub Lite の使用を開始するには、Google Cloud コンソールを使用して Pub/Sub Lite にメッセージをパブリッシュし、受信するをご覧ください。

Pub/Sub グループの Kafka コネクタを使ってみる

このセクションでは、次のタスクについて説明します。

  1. Pub/Sub グループの Kafka コネクタを構成する
  2. Kafka から Pub/Sub Lite にイベントを送信する。
  3. Pub/Sub Lite から Kafka にメッセージを送信する。

前提条件

Kafka をインストールする

Apache Kafka クイックスタートに従って、ローカルマシンに単一ノードの Kafka をインストールします。クイックスタートで、次の手順を行います。

  1. 最新の Kafka リリースをダウンロードして展開します。
  2. Kafka 環境を起動します。
  3. Kafka トピックを作成します。

認証

Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  6. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  7. Google Cloud CLI をインストールします。
  8. gcloud CLI を初期化するには:

    gcloud init
  9. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  10. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  11. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。

コネクタ JAR をダウンロードする

ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。

コネクタ構成ファイルをコピーする

  1. コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config ディレクトリの内容を Kafka インストールの config サブディレクトリにコピーします。

    cp config/* [path to Kafka installation]/config/
    

これらのファイルには、コネクタの構成設定が含まれています。

Kafka Connect 構成を更新する

  1. ダウンロードした Kafka Connect バイナリが含まれているディレクトリに移動します。
  2. Kafka Connect バイナリ ディレクトリにある config/connect-standalone.properties という名前のファイルをテキスト エディタで開きます。
  3. plugin.path property がコメントアウトされている場合は、コメント化解除します。
  4. plugin.path property を更新して、コネクタ JAR へのパスを追加します。

    例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename プロパティにローカル ファイル名を設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。

    例:

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka から Pub/Sub Lite にイベントを転送する

このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub Lite から転送されたメッセージを読み取る方法について説明します。

  1. Google Cloud CLI を使用して Pub/Sub Lite 予約を作成します。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    以下を置き換えます。

  2. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub Lite トピックを作成します。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    以下を置き換えます。

    • LITE_TOPIC: Kafka からメッセージを受信する Pub/Sub Lite トピックの名前。
    • LOCATION: トピックの場所。値は予約した場所と一致する必要があります。
    • RESERVATION_NAME: Pub/Sub Lite 予約の名前。
    • LITE_SUBSCRIPTION: トピックの Pub/Sub Lite サブスクリプションの名前。
  3. テキスト エディタで /config/pubsub-lite-sink-connector.properties というファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC
    

    以下を置き換えます。

    • KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
    • PROJECT_ID: Pub/Sub Lite トピックを含む Google Cloud プロジェクト。
    • LOCATION: Pub/Sub Lite トピックの場所。
    • LITE_TOPIC: Kafka からメッセージを受信する Pub/Sub Lite トピック。
  4. Kafka ディレクトリから、次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Apache Kafka クイックスタートの手順に沿って、Kafka トピックにいくつかのイベントを書き込みます。

  6. Lite サブスクリプションからメッセージを受信するのいずれかの方法で、Pub/Sub Lite サブスクリプションに登録します。

Pub/Sub Lite から Kafka にメッセージを転送する。

このセクションでは、ソースコネクタの起動、Pub/Sub Lite へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。

  1. Google Cloud CLI を使用して Pub/Sub Lite 予約を作成します。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    以下を置き換えます。

  2. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub Lite トピックを作成します。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    以下を置き換えます。

    • LITE_TOPIC: Pub/Sub Lite トピックの名前。
    • LOCATION: トピックの場所。値は予約した場所と一致する必要があります。
    • RESERVATION_NAME: Pub/Sub Lite 予約の名前。
    • LITE_SUBSCRIPTION: トピックの Pub/Sub Lite サブスクリプションの名前。
  3. テキスト エディタで /config/pubsub-lite-source-connector.properties という名前のファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION
    

    以下を置き換えます。

    • KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • LOCATION: Pub/Sub Lite トピックの場所。
    • LITE_SUBSCRIPTION: Pub/Sub Lite トピック。
  4. Kafka ディレクトリから、次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Lite トピックにメッセージをパブリッシュするに示されている方法のいずれかを使用して、Pub/Sub Lite トピックにメッセージをパブリッシュします。

  6. Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。

メッセージ コンバージョン

Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。必要に応じて、Kafka レコードに Key-Value ペアのヘッダーを含めることもできます。Pub/Sub Lite メッセージには、次のフィールドがあります。

  • key: メッセージキー(bytes
  • data: メッセージ データ (bytes)
  • attributes: 0 個以上の属性。各属性は (key,values[]) マップです。1 つの属性には複数の値を指定できます。
  • event_time: ユーザー指定のイベントのタイムスタンプ(省略可)。

Kafka Connect は、コンバータを使用して、Kafka との間でキーと値をシリアル化します。シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。

  • key.converter: レコードキーをシリアル化するために使用されるコンバータ。
  • value.converter: レコード値をシリアル化するために使用されるコンバータ。

Kafka から Pub/Sub Lite への変換

シンクコネクタは、次のように Kafka レコードを Pub/Sub Lite メッセージに変換します。

Kafka レコード(SinkRecord Pub/Sub Lite のメッセージ
キー key
data
ヘッダー attributes
タイムスタンプ eventTime
タイムスタンプ型 attributes["x-goog-pubsublite-source-kafka-event-time-type"]
トピック attributes["x-goog-pubsublite-source-kafka-topic"]
Partition attributes["x-goog-pubsublite-source-kafka-offset"]
オフセット attributes["x-goog-pubsublite-source-kafka-partition"]

キー、値、ヘッダーは次のようにエンコードされます。

  • Null スキーマは、文字列スキーマとして扱われます。
  • バイト ペイロードは変換されずに直接書き込まれます。
  • 文字列、整数、浮動小数点のペイロードは、UTF-8 バイトのシーケンスにエンコードされます。
  • 他のすべてのペイロードは、プロトコル バッファの Value 型にエンコードされ、バイト文字列に変換されます。
    • ネストされた文字列フィールドは、protobuf Value にエンコードされます。
    • ネストされたバイト フィールドは、base64 でエンコードされたバイトを保持する protobuf Value にエンコードされます。
    • ネストされた数値フィールドは、double として Value の protobuf にエンコードされます。
    • 配列、マップ、構造体キーを含むマップはサポートされていません。

Pub/Sub Lite から Kafka への変換

ソースコネクタは、次のように Pub/Sub Lite メッセージを Kafka レコードに変換します。

Pub/Sub Lite のメッセージ Kafka レコード(SourceRecord
key キー
data
attributes ヘッダー
event_time タイムスタンプ event_time が存在しない場合は、パブリッシュ時間が使用されます。

構成オプション

コネクタは、Kafka Connect API で提供される構成に加えて、次の Pub/Sub Lite 構成をサポートしています。

シンクコネクタの構成オプション

シンクコネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub Lite シンクコネクタの場合、値は com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector にする必要があります。
gcp.credentials.file.path String 省略できます。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略できます。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
pubsublite.location String 必須。Pub/Sub Lite トピックの場所。
pubsublite.project String 必須。Pub/Sub Lite トピックを含む Google Cloud。
pubsublite.topic String 必須。Kafka レコードをパブリッシュする Pub/Sub Lite トピック。
topics String 必須。読み取る Kafka トピックのカンマ区切りのリスト。

ソースコネクタの構成オプション

ソースコネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub Lite ソースコネクタの場合、値は com.google.pubsublite.kafka.source.PubSubLiteSourceConnector にする必要があります。
gcp.credentials.file.path String 省略できます。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略できます。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
kafka.topic String 必須。Pub/Sub Lite からメッセージを受信する Kafka トピック。
pubsublite.location String 必須。Pub/Sub Lite トピックの場所。
pubsublite.partition_flow_control.bytes Long

Pub/Sub Lite パーティションあたりの未処理のバイト数の最大数。

デフォルト: 20,000,000

pubsublite.partition_flow_control.messages Long

Pub/Sub Lite パーティションあたりの未処理のメッセージの最大数。

デフォルト: Long.MAX_VALUE

pubsublite.project String 必須。Pub/Sub Lite トピックを含む Google Cloud プロジェクト。
pubsublite.subscription String 必須。メッセージを pull する Pub/Sub Lite サブスクリプションの名前。

次のステップ