Pub/Sub を Apache Kafka に接続する

このドキュメントでは、Pub/Sub グループの Kafka コネクタを使用して Apache Kafka と Pub/Sub を統合する方法について説明します。

Pub/Sub Kafka コネクタの概要

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。

Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。次のコネクタがコネクタ JAR にパッケージ化されています。

  • シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub にパブリッシュします。
  • ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka にパブリッシュします。

Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。

  • Kafka ベースのアーキテクチャを Google Cloud に移行しようとしている。
  • Google Cloud の外部に Kafka にイベントを保存するフロントエンド システムがあるが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用している。
  • オンプレミスの Kafka ソリューションからログを収集して、Google Cloud に送信してデータ分析を行う。
  • Google Cloud を使用するフロントエンド システムがあるが、Kafka を使用してオンプレミスにもデータを保存している。

コネクタには、Kafka と他のシステムの間でデータをストリーミングするためのフレームワークである Kafka Connect が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。

このドキュメントは、Kafka と Pub/Sub の両方に精通していることを前提としています。このドキュメントを読む前に、Pub/Sub クイックスタートのいずれかを完了することをおすすめします。

Pub/Sub コネクタは、Google Cloud IAM と Kafka Connect ACL の間の統合をサポートしていません。

コネクタを使ってみる

このセクションでは、次のタスクについて説明します。

  1. Pub/Sub グループの Kafka コネクタを構成する
  2. Kafka から Pub/Sub にイベントを送信する。
  3. Pub/Sub から Kafka にメッセージを送信する。

前提条件

Kafka をインストールする

Apache Kafka クイックスタートに従って、ローカルマシンに単一ノードの Kafka をインストールします。クイックスタートで、次の手順を行います。

  1. 最新の Kafka リリースをダウンロードして展開します。
  2. Kafka 環境を起動します。
  3. Kafka トピックを作成します。

認証

Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  6. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  7. Google Cloud CLI をインストールします。
  8. gcloud CLI を初期化するには:

    gcloud init
  9. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  10. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  11. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。

コネクタ JAR をダウンロードする

ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。

コネクタ構成ファイルをコピーする

  1. コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config ディレクトリの内容を Kafka インストールの config サブディレクトリにコピーします。

    cp config/* [path to Kafka installation]/config/
    

これらのファイルには、コネクタの構成設定が含まれています。

Kafka Connect 構成を更新する

  1. ダウンロードした Kafka Connect バイナリが含まれているディレクトリに移動します。
  2. Kafka Connect バイナリ ディレクトリにある config/connect-standalone.properties という名前のファイルをテキスト エディタで開きます。
  3. plugin.path property がコメントアウトされている場合は、コメント化解除します。
  4. plugin.path property を更新して、コネクタ JAR へのパスを追加します。

    例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename プロパティにローカル ファイル名を設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。

    例:

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka から Pub/Sub にイベントを転送する

このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub から転送されたメッセージを読み取る方法について説明します。

  1. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    以下を置き換えます。

    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: トピックの Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-sink-connector.properties というファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    以下を置き換えます。

    • KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピック。
  3. Kafka ディレクトリから、次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Apache Kafka クイックスタートの手順に沿って、Kafka トピックにいくつかのイベントを書き込みます。

  5. gcloud CLI を使用して Pub/Sub からイベントを読み取ります。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Pub/Sub から Kafka にメッセージを転送する。

このセクションでは、ソースコネクタの起動、Pub/Sub へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。

  1. gcloud CLI を使用して、サブスクリプションで Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    以下を置き換えます。

    • PUBSUB_TOPIC: Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-source-connector.properties という名前のファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    以下を置き換えます。

    • KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
    • PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
    • PUBSUB_TOPIC: Pub/Sub トピック。
  3. Kafka ディレクトリから、次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. メッセージを gcloud CLI を使用して Pub/Sub にパブリッシュします。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。

メッセージ コンバージョン

Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。必要に応じて、Kafka レコードに Key-Value ペアのヘッダーを含めることもできます。Pub/Sub メッセージには、メッセージ本文と 0 個以上の Key-Value 属性という 2 つの主要な部分があります。

Kafka Connect は、コンバータを使用して、Kafka との間でキーと値をシリアル化します。シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。

  • key.converter: レコードキーをシリアル化するために使用されるコンバータ。
  • value.converter: レコード値をシリアル化するために使用されるコンバータ。

Pub/Sub メッセージの本文は ByteString オブジェクトであるため、ペイロードを直接コピーするのが最も効率的な変換です。そのため、可能であれば、同じメッセージ本文のシリアル化解除と再シリアル化を防ぐため、プリミティブ データ型(整数、浮動小数点、文字列、バイトスキーマ)を生成するコンバータの使用をおすすめします。

Kafka から Pub/Sub への変換

シンクコネクタは、次のように Kafka レコードを Pub/Sub メッセージに変換します。

  • Kafka レコードキーは、Pub/Sub メッセージに "key" という名前の属性として保存されます。
  • デフォルトでは、コネクタは Kafka レコードのヘッダーをすべてドロップします。ただし、headers.publish 構成オプションを true に設定すると、コネクタはヘッダーを Pub/Sub 属性として書き込みます。コネクタは、Pub/Sub のメッセージ属性の上限を超えるヘッダーをスキップします。
  • 整数、浮動小数点数、文字列、バイトスキーマの場合、コネクタは Kafka レコード値のバイトを Pub/Sub メッセージ本文に直接渡します。
  • 構造体スキーマの場合、コネクタは各フィールドを Pub/Sub メッセージの属性として書き込みます。たとえば、フィールドが { "id"=123 } の場合、生成される Pub/Sub メッセージには "id"="123" という属性が与えられます。フィールドの値は常に文字列に変換されます。マップと構造体の型は、構造体内のフィールド型としてはサポートされていません。
  • マップスキーマの場合、コネクタは各 Key-Value ペアを Pub/Sub メッセージの属性として書き込みます。たとえば、マップが {"alice"=1,"bob"=2} の場合、結果の Pub/Sub メッセージには "alice"="1""bob"="2" の 2 つの属性を持ちます。キーと値は文字列に変換されます。

構造体スキーマとマップスキーマには追加の動作があります。

  • 必要に応じて、messageBodyName 構成プロパティを設定することで、特定の構造体フィールドまたはマップキーをメッセージ本文に指定できます。フィールドまたはキーの値は、メッセージ本文に ByteString として保存されます。messageBodyName を設定しない場合、構造体とマップのスキーマのメッセージ本文は空です。

  • 配列値の場合、コネクタはプリミティブ配列タイプのみをサポートします。配列内の値の順序は、1 つの ByteString オブジェクトに連結されます。

Pub/Sub から Kafka への変換

ソースコネクタは、次のように Pub/Sub メッセージを Kafka レコードに変換します。

  • Kafka レコードキー: デフォルトでは、キーは null に設定されています。必要に応じて、kafka.key.attribute 構成オプションを設定して、鍵として使用する Pub/Sub メッセージ属性を指定できます。その場合、コネクタはその名前の属性を検索し、レコードキーを属性値に設定します。指定された属性が存在しない場合、レコードキーは null に設定されます。

  • Kafka のレコード値。コネクタは次のようにレコード値を書き込みます。

    • Pub/Sub メッセージにカスタム属性がない場合、コネクタは value.converter によって指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] 型として Kafka レコード値に直接書き込みます。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headersfalse の場合、コネクタはレコード値に構造体を書き込みます。この構造体は、属性ごとに 1 つのフィールドと、Pub/Sub メッセージ本文(バイトとして保存)を持つ "message" という名前のフィールドを含んでいます。

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      この場合、struct スキーマと互換性のある value.converterorg.apache.kafka.connect.json.JsonConverter など)を使用する必要があります。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headerstrue の場合、コネクタは属性を Kafka レコード ヘッダーとして書き込みます。value.converter で指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] タイプとして Kafka レコード値に直接書き込みます。

  • Kafka レコード ヘッダー デフォルトでは、kafka.record.headerstrue に設定しない限りヘッダーは空です。

構成オプション

Kafka Connect API が提供する構成に加えて、Pub/Sub グループの Kafka コネクタは次の構成をサポートしています。

シンクコネクタ構成オプション

シンクコネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub シンクコネクタの場合、値は com.google.pubsub.kafka.sink.CloudPubSubSinkConnector にする必要があります。
cps.endpoint String

使用する Pub/Sub エンドポイント。

デフォルト: "pubsub.googleapis.com:443"

cps.project String 必須。Pub/Sub トピックを含む Google Cloud。
cps.topic String 必須。Kafka レコードをパブリッシュする Pub/Sub トピック。
gcp.credentials.file.path String 省略できます。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略できます。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
headers.publish Boolean

true の場合、Kafka レコード ヘッダーを Pub/Sub メッセージ属性として含めます。

デフォルト: false

maxBufferBytes Long

Pub/Sub にパブリッシュする前に、トピック Kafka パーティションで受信する最大バイト数。

デフォルト: 10000000。

maxBufferSize Integer

Pub/Sub にパブリッシュする前に、Kafka トピック パーティションで受信するレコードの最大数。

デフォルト: 100。

maxDelayThresholdMs Integer

Pub/Sub に未処理のレコードをパブリッシュする前に、maxBufferSize または maxBufferBytes への到達まで待機する最大時間(ミリ秒単位)。

デフォルト: 100。

maxOutstandingMessages Long

パブリッシャーがそれ以上のパブリッシュをブロックする前に、未完了のバッチや保留中のバッチを含め、未処理のままにしておくことができるレコードの最大数。

デフォルト: Long.MAX_VALUE

maxOutstandingRequestBytes Long

パブリッシャーがそれ以上のパブリッシュをブロックする前に、未完了のバッチや保留中のバッチを含め、未処理のままにしておくことができる合計バイトの最大数。

デフォルト: Long.MAX_VALUE

maxRequestTimeoutMs Integer

Pub/Sub への個々のパブリッシュ リクエストのタイムアウト(ミリ秒単位)。

デフォルト: 10000。

maxTotalTimeoutMs Integer

Pub/Sub にパブリッシュするための呼び出しの、再試行を含めた合計タイムアウト(ミリ秒単位)。

デフォルト: 60000。

metadata.publish Boolean

true の場合、Kafka トピック、パーティション、オフセット、タイムスタンプを Pub/Sub メッセージ属性に含めます。

デフォルト: false

messageBodyName String

構造体またはマップの値のスキーマを使用する場合は、Pub/Sub メッセージ本文として使用するフィールドまたはキーの名前を指定します。Kafka から Pub/Sub への変換をご覧ください。

デフォルト: "cps_message_body"

orderingKeySource String

Pub/Sub メッセージに順序指定キーを設定する方法を指定します。次のいずれかの値です。

  • none: 順序指定キーを設定しないでください。
  • key: Kafka レコードキーを順序指定キーとして使用します。
  • partition: 文字列に変換されたパーティション番号を順序指定キーとして使用します。この設定は、スループットの低いトピック、または数千のパーティションがあるトピックにのみ使用します。

デフォルト: none

topics String 必須。読み取る Kafka トピックのカンマ区切りのリスト。

ソースコネクタの構成オプション

ソースコネクタは、次の構成オプションをサポートしています。

設定 データ型 説明
connector.class String 必須。コネクタの Java クラス。Pub/Sub ソースコネクタの場合、値は com.google.pubsub.kafka.source.CloudPubSubSourceConnector にする必要があります。
cps.endpoint String

使用する Pub/Sub エンドポイント。

デフォルト: "pubsub.googleapis.com:443"

cps.makeOrderingKeyAttribute Boolean

true の場合、Pub/Sub メッセージ属性と同じ形式を使用して、Kafka レコードに順序指定キーを書き込みます。Pub/Sub から Kafka レコードへの変換をご覧ください。

デフォルト: false

cps.maxBatchSize Integer

Pub/Sub への pull リクエストごとに一括で処理するメッセージの最大数。

デフォルト: 100。

cps.project String 必須。Pub/Sub トピックを含む Google Cloud プロジェクト。
cps.subscription String 必須。メッセージを pull する Pub/Sub サブスクリプションの名前。
gcp.credentials.file.path String 省略できます。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。
gcp.credentials.json String 省略できます。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。
kafka.key.attribute String

Kafka にパブリッシュされたメッセージのキーとして使用する Pub/Sub メッセージ属性。"orderingKey" に設定した場合は、メッセージの順序指定キーを使用します。null の場合、Kafka レコードに鍵はありません。

デフォルト: null

kafka.partition.count Integer

メッセージがパブリッシュされる Kafka トピックの Kafka パーティションの数。パーティション スキームが "kafka_partitioner" の場合、このパラメータは無視されます。

デフォルト: 1。

kafka.partition.scheme String

Kafka 内のパーティションにメッセージを割り当てるためのスキーム。次のいずれかの値です。

  • round_robin: ラウンドロビン方式でパーティションを割り当てます。
  • hash_key: レコードキーをハッシュしてパーティションを見つけます。
  • hash_value: レコード値をハッシュしてパーティションを見つけます。
  • kafka_partitioner: パーティショニング ロジックを Kafka プロデューサーに委任します。デフォルトでは、Kafka プロデューサーはパーティションの数を自動的に検出し、レコードキーが指定されたかどうかに応じて、MurmurHash ベースのパーティション マッピングまたはラウンドロビンを実行します。
  • ordering_key: メッセージの順序指定キーのハッシュコードを使用します。順序指定キーが存在しない場合は、round_robin を使用します。

デフォルト: round_robin

kafka.record.headers Boolean

true の場合、Pub/Sub メッセージ属性を Kafka ヘッダーとして書き込みます。

kafka.topic String 必須。Pub/Sub からメッセージを受信する Kafka トピック。

サポートの利用

サポートが必要な場合は、サポート チケットを作成してください。一般的な質問やディスカッションについては、GitHub リポジトリで問題を作成してください。

次のステップ