Pub/Sub を Apache Kafka に接続する

このドキュメントでは、Pub/Sub グループ Kafka コネクタを使用して Apache Kafka と Pub/Sub を統合する方法について説明します。

Pub/Sub Kafka コネクタの概要

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。

Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。コネクタ JAR には次のコネクタがパッケージ化されています。

  • シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub に公開します。
  • ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka にパブリッシュします。

Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。

  • Kafka ベースのアーキテクチャを Google Cloud に移行しようとしている。
  • Google Cloud の外部に Kafka にイベントを保存するフロントエンド システムがあるが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用している。
  • オンプレミスの Kafka ソリューションからログを収集して、Google Cloud に送信してデータ分析を行う。
  • Google Cloud を使用するフロントエンド システムがあるが、Kafka を使用してオンプレミスにもデータを保存している。

このコネクタには、Kafka と他のシステム間でデータをストリーミングするためのフレームワークである Kafka コネクト が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。

このドキュメントでは、Kafka と Pub/Sub の両方に精通していることを前提としています。この文書を読む前に、Pub/Sub クイックスタートのいずれかを完了することをお勧めします。

Pub/Sub コネクタは、Google Cloud IAM と Kafka Connect ACL の統合をサポートしていません。

コネクタを使ってみる

このセクションでは、次のタスクについて説明します。

  1. Pub/Sub グループの Kafka コネクタを構成する
  2. Kafka から Pub/Sub にイベントを送信する。
  3. Pub/Sub から Kafka にメッセージを送信する。

前提条件

Kafka をインストールする

Apache Kafka クイックスタートに沿って、ローカルマシンに単一ノードの Kafka をインストールします。クイックスタートで、次の手順を行います。

  1. 最新の Kafka リリースをダウンロードして展開します。
  2. Kafka 環境を開始します。
  3. Kafka トピックを作成します。

認証

Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

コネクタ JAR をダウンロードする

ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。

コネクタ構成ファイルをコピーする

  1. コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config ディレクトリの内容を Kafka インストールの config サブディレクトリにコピーします。

    cp config/* [path to Kafka installation]/config/
    

これらのファイルには、コネクタの構成設定が含まれています。

Kafka Connect 構成を更新する

  1. ダウンロードした Kafka コネクト バイナリが含まれているディレクトリに移動します。
  2. Kafka コネクト バイナリ ディレクトリにある config/connect-standalone.properties という名前のファイルをテキスト エディタで開きます。
  3. plugin.path property がコメントアウトされている場合は、コメント化解除します。
  4. plugin.path property を更新して、コネクタ JAR へのパスを追加します。

    例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename プロパティをローカル ファイル名に設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。

    例:

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka から Pub/Sub にイベントを転送する。

このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub から転送されたメッセージを読み取る方法について説明します。

  1. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    以下を置き換えます。

    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: トピックの Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-sink-connector.properties というファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    以下を置き換えます。

    • KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピック。
  3. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Apache Kafka クイックスタートの手順に沿って、Kafka トピックにイベントを書き込みます。

  5. gcloud CLI を使用して Pub/Sub からイベントを読み取ります。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Pub/Sub から Kafka にメッセージを転送する。

このセクションでは、ソースコネクタの起動、Pub/Sub へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。

  1. gcloud CLI を使用して、サブスクリプションで Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    以下を置き換えます。

    • PUBSUB_TOPIC: Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-source-connector.properties という名前のファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    以下を置き換えます。

    • KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • PUBSUB_TOPIC: Pub/Sub トピック。
  3. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. メッセージを gcloud CLI を使用して Pub/Sub にパブリッシュします。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。

メッセージ コンバージョン

Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。Kafka レコードには、必要に応じてヘッダー(Key-Value ペア)を含めることもできます。Pub/Sub メッセージは、メッセージ本文と 0 個以上の Key-Value 属性という 2 つの主要な部分で構成されています。

Kafka コネクトはコンバータを使用して、Kafka との間でキーと値をシリアル化します。 シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。

  • key.converter: レコードキーをシリアル化するために使用されるコンバータ。
  • value.converter: レコード値をシリアル化するために使用されるコンバータ。

Pub/Sub メッセージの本文は ByteString オブジェクトであるため、最も効率的な変換はペイロードを直接コピーすることです。そのため、可能であれば、同じメッセージ本文のシリアル化解除と再シリアル化を防ぐため、プリミティブ データ型(整数、浮動小数点、文字列、バイトスキーマ)を生成するコンバータの使用をおすすめします。

Kafka から Pub/Sub への変換

シンクコネクタは、次のように Kafka レコードを Pub/Sub メッセージに変換します。

  • Kafka レコードキーは、Pub/Sub メッセージに "key" という名前の属性として保存されます。
  • デフォルトでは、コネクタは Kafka レコードのヘッダーをすべてドロップします。ただし、headers.publish 構成オプションを true に設定すると、コネクタはヘッダーを Pub/Sub 属性として書き込みます。コネクタは、Pub/Sub のメッセージ属性の制限を超えるヘッダーをスキップします。
  • 整数、浮動小数点数、文字列、バイト スキーマの場合、コネクタは Kafka レコード値のバイトを Pub/Sub メッセージ本文に直接渡します。
  • 構造体スキーマの場合、コネクタは各フィールドを Pub/Sub メッセージの属性として書き込みます。たとえば、フィールドが { "id"=123 } の場合、生成される Pub/Sub メッセージには "id"="123" という属性が与えられます。フィールドの値は常に文字列に変換されます。マップ型と構造体型は、構造体内のフィールド型としてサポートされていません。
  • マップスキーマの場合、コネクタは各 Key-Value ペアを Pub/Sub メッセージの属性として書き込みます。たとえば、マップが {"alice"=1,"bob"=2} の場合、結果の Pub/Sub メッセージには "alice"="1""bob"="2" の 2 つの属性を持ちます。キーと値は文字列に変換されます。

構造体とマップのスキーマには、いくつかの追加の動作があります。

  • 必要に応じて、messageBodyName 構成プロパティを設定することで、特定の構造体フィールドまたはマップキーをメッセージ本文に指定できます。フィールドやキーの値はメッセージ本文に ByteString として格納されます。messageBodyName を設定しない場合、構造体とマップのスキーマのメッセージ本文は空になります。

  • 配列値の場合、コネクタはプリミティブ配列タイプのみをサポートします。配列内の値の順序は、1 つの ByteString オブジェクトに連結されます。

Pub/Sub から Kafka への変換

ソースコネクタは、次のように Pub/Sub メッセージを Kafka レコードに変換します。

  • Kafka レコードキー: デフォルトでは、キーは null に設定されています。必要に応じて、kafka.key.attribute 構成オプションを設定することで、キーとして使用する Pub/Sub メッセージ属性を指定できます。その場合、コネクタはその名前の属性を検索し、レコードキーを属性値に設定します。指定された属性が存在しない場合、レコードキーは null に設定されます。

  • Kafka レコード値. コネクタはレコード値を次のように書き込みます。

    • Pub/Sub メッセージにカスタム属性がない場合、コネクタは value.converter によって指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] 型として Kafka レコード値に直接書き込みます。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headersfalse の場合、コネクタはレコード値に構造体を書き込みます。この構造体は、属性ごとに 1 つのフィールドと、Pub/Sub メッセージ本文(バイトとして保存)を持つ "message" という名前のフィールドを含んでいます。

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      この場合、struct スキーマと互換性のある value.converterorg.apache.kafka.connect.json.JsonConverter など)を使用する必要があります。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headerstrue の場合、コネクタは属性を Kafka レコード ヘッダーとして書き込みます。value.converter で指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] タイプとして Kafka レコード値に直接書き込みます。

  • Kafka レコードのヘッダー。デフォルトでは、kafka.record.headerstrue に設定しない限りヘッダーは空です。

構成オプション

Kafka Connect API が提供する構成に加えて、Pub/Sub グループの Kafka コネクタは次の構成をサポートしています。

シンクコネクタ構成オプション

シンク コネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub シンクコネクタの場合、値は com.google.pubsub.kafka.sink.CloudPubSubSinkConnector にする必要があります。
cps.endpoint String

使用する Pub/Sub エンドポイント。

デフォルト: "pubsub.googleapis.com:443"

cps.project String 必須。Pub/Sub トピックを含む Google Cloud。
cps.topic String 必須。Kafka レコードをパブリッシュする Pub/Sub トピック。
gcp.credentials.file.path String 省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
headers.publish Boolean

true の場合、Kafka レコード ヘッダーを Pub/Sub メッセージ属性として含めます。

デフォルト: false

maxBufferBytes Long

Pub/Sub にパブリッシュする前に、トピック Kafka パーティションで受信する最大バイト数。

デフォルト: 10000000。

maxBufferSize Integer

Pub/Sub にパブリッシュする前に、Kafka トピック パーティションで受信するレコードの最大数。

デフォルト: 100。

maxDelayThresholdMs Integer

Pub/Sub に未処理のレコードをパブリッシュする前に、maxBufferSize または maxBufferBytes への到達まで待機する最大時間(ミリ秒単位)。

デフォルト: 100。

maxOutstandingMessages Long

パブリッシャーがそれ以上のパブリッシュをブロックする前に、未完了のバッチや保留中のバッチを含め、未処理のままにしておくことができるレコードの最大数。

デフォルト: Long.MAX_VALUE

maxOutstandingRequestBytes Long

パブリッシャーがそれ以上のパブリッシュをブロックする前に、未完了のバッチや保留中のバッチを含め、未処理のままにしておくことができる合計バイトの最大数。

デフォルト: Long.MAX_VALUE

maxRequestTimeoutMs Integer

Pub/Sub への個々のパブリッシュ リクエストのタイムアウト(ミリ秒単位)。

デフォルト: 10000。

maxTotalTimeoutMs Integer

Pub/Sub にパブリッシュするための呼び出しの、再試行を含めた合計タイムアウト(ミリ秒単位)。

デフォルト: 60000。

metadata.publish Boolean

true の場合、Kafka トピック、パーティション、オフセット、タイムスタンプを Pub/Sub メッセージ属性として含めます。

デフォルト: false

messageBodyName String

構造体またはマップ値スキーマを使用する場合は、Pub/Sub メッセージ本文として使用するフィールドまたはキーの名前を指定します。Kafka から Pub/Sub への変換をご覧ください。

デフォルト: "cps_message_body"

orderingKeySource String

Pub/Sub メッセージで順序指定キーを設定する方法を指定します。次のいずれかの値です。

  • none: 順序指定キーを設定しません。
  • key: Kafka レコードキーを順序指定キーとして使用します。
  • partition: 文字列に変換されたパーティション番号を順序指定キーとして使用します。この設定は、スループットの低いトピック、または数千のパーティションがあるトピックにのみ使用します。

デフォルト: none

topics String 必須。読み取る Kafka トピックのカンマ区切りのリスト。

ソース コネクタの構成オプション

ソースコネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub ソースコネクタの場合、値は com.google.pubsub.kafka.source.CloudPubSubSourceConnector にする必要があります。
cps.endpoint String

使用する Pub/Sub エンドポイント。

デフォルト: "pubsub.googleapis.com:443"

cps.makeOrderingKeyAttribute Boolean

true の場合、Pub/Sub メッセージ属性と同じ形式を使用して、Kafka レコードに順序指定キーを書き込みます。Pub/Sub から Kafka レコードへの変換をご覧ください。

デフォルト: false

cps.maxBatchSize Integer

Pub/Sub への pull リクエストごとにバッチ処理するメッセージの最大数。

デフォルト: 100。

cps.project String 必須。Pub/Sub トピックを含む Google Cloud プロジェクト。
cps.subscription String 必須。メッセージを pull する Pub/Sub サブスクリプションの名前。
gcp.credentials.file.path String 省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
kafka.key.attribute String

Kafka にパブリッシュされたメッセージのキーとして使用する Pub/Sub メッセージ属性。"orderingKey" に設定した場合は、メッセージの順序指定キーを使用します。null の場合、Kafka レコードに鍵はありません。

デフォルト: null

kafka.partition.count Integer

メッセージが公開される Kafka トピックの Kafka パーティションの数。パーティション スキームが "kafka_partitioner" の場合、このパラメータは無視されます。

デフォルト: 1。

kafka.partition.scheme String

Kafka でメッセージをパーティションに割り当てるスキーム。次のいずれかの値です。

  • round_robin: パーティションをラウンドロビン方式で割り当てます。
  • hash_key: レコードキーをハッシュしてパーティションを見つけます。
  • hash_value: レコード値をハッシュ化してパーティションを見つけます。
  • kafka_partitioner: パーティショニング ロジックを Kafka プロデューサーに委任します。デフォルトでは、Kafka プロデューサーはパーティションの数を自動的に検出し、レコードキーが指定されたかどうかに応じて、MurmurHash ベースのパーティション マッピングまたはラウンドロビンを実行します。
  • ordering_key: メッセージの順序指定キーのハッシュコードを使用します。順序指定キーが存在しない場合は、round_robin を使用します。

デフォルト: round_robin

kafka.record.headers Boolean

true の場合、Pub/Sub メッセージ属性を Kafka ヘッダーとして書き込みます。

kafka.topic String 必須。Pub/Sub からメッセージを受信する Kafka トピック。

サポートの利用

サポートが必要な場合は、サポート チケットを作成してください。一般的な質問やディスカッションについては、GitHub リポジトリで問題を作成してください。

次のステップ