このドキュメントでは、Pub/Sub グループ Kafka コネクタを使用して Apache Kafka と Pub/Sub を統合する方法について説明します。
Pub/Sub Kafka コネクタの概要
Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。
Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。コネクタ JAR には次のコネクタがパッケージ化されています。
- シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub に公開します。
- ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka にパブリッシュします。
Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。
- Kafka ベースのアーキテクチャを Google Cloud に移行しようとしている。
- Google Cloud の外部に Kafka にイベントを保存するフロントエンド システムがあるが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Google Cloud も使用している。
- オンプレミスの Kafka ソリューションからログを収集して、Google Cloud に送信してデータ分析を行う。
- Google Cloud を使用するフロントエンド システムがあるが、Kafka を使用してオンプレミスにもデータを保存している。
このコネクタには、Kafka と他のシステム間でデータをストリーミングするためのフレームワークである Kafka コネクト が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。
このドキュメントでは、Kafka と Pub/Sub の両方に精通していることを前提としています。この文書を読む前に、Pub/Sub クイックスタートのいずれかを完了することをお勧めします。
Pub/Sub コネクタは、Google Cloud IAM と Kafka Connect ACL の統合をサポートしていません。
コネクタを使ってみる
このセクションでは、次のタスクについて説明します。- Pub/Sub グループの Kafka コネクタを構成する
- Kafka から Pub/Sub にイベントを送信する。
- Pub/Sub から Kafka にメッセージを送信する。
前提条件
Kafka をインストールする
Apache Kafka クイックスタートに沿って、ローカルマシンに単一ノードの Kafka をインストールします。クイックスタートで、次の手順を行います。
- 最新の Kafka リリースをダウンロードして展開します。
- Kafka 環境を開始します。
- Kafka トピックを作成します。
認証
Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
コネクタ JAR をダウンロードする
ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。
コネクタ構成ファイルをコピーする
コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
config
ディレクトリの内容を Kafka インストールのconfig
サブディレクトリにコピーします。cp config/* [path to Kafka installation]/config/
これらのファイルには、コネクタの構成設定が含まれています。
Kafka Connect 構成を更新する
- ダウンロードした Kafka コネクト バイナリが含まれているディレクトリに移動します。
- Kafka コネクト バイナリ ディレクトリにある
config/connect-standalone.properties
という名前のファイルをテキスト エディタで開きます。 plugin.path property
がコメントアウトされている場合は、コメント化解除します。plugin.path property
を更新して、コネクタ JAR へのパスを追加します。例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
offset.storage.file.filename
プロパティをローカル ファイル名に設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。例:
offset.storage.file.filename=/tmp/connect.offsets
Kafka から Pub/Sub にイベントを転送する。
このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub から転送されたメッセージを読み取る方法について説明します。
Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub トピックを作成します。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
以下を置き換えます。
- PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピックの名前。
- PUBSUB_SUBSCRIPTION: トピックの Pub/Sub サブスクリプションの名前。
テキスト エディタで
/config/cps-sink-connector.properties
というファイルを開きます。コメントで"TODO"
とマークされている次のプロパティの値を追加します。topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
以下を置き換えます。
- KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
- PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
- PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピック。
Kafka ディレクトリから次のコマンドを実行します。
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Apache Kafka クイックスタートの手順に沿って、Kafka トピックにイベントを書き込みます。
gcloud CLI を使用して Pub/Sub からイベントを読み取ります。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Pub/Sub から Kafka にメッセージを転送する。
このセクションでは、ソースコネクタの起動、Pub/Sub へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。
gcloud CLI を使用して、サブスクリプションで Pub/Sub トピックを作成します。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
以下を置き換えます。
- PUBSUB_TOPIC: Pub/Sub トピックの名前。
- PUBSUB_SUBSCRIPTION: Pub/Sub サブスクリプションの名前。
テキスト エディタで
/config/cps-source-connector.properties
という名前のファイルを開きます。コメントで"TODO"
とマークされている次のプロパティの値を追加します。kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
以下を置き換えます。
- KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
- PROJECT_ID: Pub/Sub トピックを含む Google Cloud プロジェクト。
- PUBSUB_TOPIC: Pub/Sub トピック。
Kafka ディレクトリから次のコマンドを実行します。
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
メッセージを gcloud CLI を使用して Pub/Sub にパブリッシュします。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。
メッセージ コンバージョン
Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。Kafka レコードには、必要に応じてヘッダー(Key-Value ペア)を含めることもできます。Pub/Sub メッセージは、メッセージ本文と 0 個以上の Key-Value 属性という 2 つの主要な部分で構成されています。
Kafka コネクトはコンバータを使用して、Kafka との間でキーと値をシリアル化します。 シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。
key.converter
: レコードキーをシリアル化するために使用されるコンバータ。value.converter
: レコード値をシリアル化するために使用されるコンバータ。
Pub/Sub メッセージの本文は ByteString
オブジェクトであるため、最も効率的な変換はペイロードを直接コピーすることです。そのため、可能であれば、同じメッセージ本文のシリアル化解除と再シリアル化を防ぐため、プリミティブ データ型(整数、浮動小数点、文字列、バイトスキーマ)を生成するコンバータの使用をおすすめします。
Kafka から Pub/Sub への変換
シンクコネクタは、次のように Kafka レコードを Pub/Sub メッセージに変換します。
- Kafka レコードキーは、Pub/Sub メッセージに
"key"
という名前の属性として保存されます。 - デフォルトでは、コネクタは Kafka レコードのヘッダーをすべてドロップします。ただし、
headers.publish
構成オプションをtrue
に設定すると、コネクタはヘッダーを Pub/Sub 属性として書き込みます。コネクタは、Pub/Sub のメッセージ属性の制限を超えるヘッダーをスキップします。 - 整数、浮動小数点数、文字列、バイト スキーマの場合、コネクタは Kafka レコード値のバイトを Pub/Sub メッセージ本文に直接渡します。
- 構造体スキーマの場合、コネクタは各フィールドを Pub/Sub メッセージの属性として書き込みます。たとえば、フィールドが
{ "id"=123 }
の場合、生成される Pub/Sub メッセージには"id"="123"
という属性が与えられます。フィールドの値は常に文字列に変換されます。マップ型と構造体型は、構造体内のフィールド型としてサポートされていません。 - マップスキーマの場合、コネクタは各 Key-Value ペアを Pub/Sub メッセージの属性として書き込みます。たとえば、マップが
{"alice"=1,"bob"=2}
の場合、結果の Pub/Sub メッセージには"alice"="1"
と"bob"="2"
の 2 つの属性を持ちます。キーと値は文字列に変換されます。
構造体とマップのスキーマには、いくつかの追加の動作があります。
必要に応じて、
messageBodyName
構成プロパティを設定することで、特定の構造体フィールドまたはマップキーをメッセージ本文に指定できます。フィールドやキーの値はメッセージ本文にByteString
として格納されます。messageBodyName
を設定しない場合、構造体とマップのスキーマのメッセージ本文は空になります。配列値の場合、コネクタはプリミティブ配列タイプのみをサポートします。配列内の値の順序は、1 つの
ByteString
オブジェクトに連結されます。
Pub/Sub から Kafka への変換
ソースコネクタは、次のように Pub/Sub メッセージを Kafka レコードに変換します。
Kafka レコードキー: デフォルトでは、キーは
null
に設定されています。必要に応じて、kafka.key.attribute
構成オプションを設定することで、キーとして使用する Pub/Sub メッセージ属性を指定できます。その場合、コネクタはその名前の属性を検索し、レコードキーを属性値に設定します。指定された属性が存在しない場合、レコードキーはnull
に設定されます。Kafka レコード値. コネクタはレコード値を次のように書き込みます。
Pub/Sub メッセージにカスタム属性がない場合、コネクタは
value.converter
によって指定されたコンバータを使用して、Pub/Sub メッセージ本文をbyte[]
型として Kafka レコード値に直接書き込みます。Pub/Sub メッセージにカスタム属性があり、
kafka.record.headers
がfalse
の場合、コネクタはレコード値に構造体を書き込みます。この構造体は、属性ごとに 1 つのフィールドと、Pub/Sub メッセージ本文(バイトとして保存)を持つ"message"
という名前のフィールドを含んでいます。{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
この場合、
struct
スキーマと互換性のあるvalue.converter
(org.apache.kafka.connect.json.JsonConverter
など)を使用する必要があります。Pub/Sub メッセージにカスタム属性があり、
kafka.record.headers
がtrue
の場合、コネクタは属性を Kafka レコード ヘッダーとして書き込みます。value.converter
で指定されたコンバータを使用して、Pub/Sub メッセージ本文をbyte[]
タイプとして Kafka レコード値に直接書き込みます。
Kafka レコードのヘッダー。デフォルトでは、
kafka.record.headers
をtrue
に設定しない限りヘッダーは空です。
構成オプション
Kafka Connect API が提供する構成に加えて、Pub/Sub グループの Kafka コネクタは次の構成をサポートしています。
シンクコネクタ構成オプション
シンク コネクタは、次の構成オプションをサポートしています。
設定 | データ型 | 説明 |
---|---|---|
connector.class |
String |
必須。コネクタの Java クラス。Pub/Sub シンクコネクタの場合、値は com.google.pubsub.kafka.sink.CloudPubSubSinkConnector にする必要があります。 |
cps.endpoint |
String |
使用する Pub/Sub エンドポイント。 デフォルト: |
cps.project |
String |
必須。Pub/Sub トピックを含む Google Cloud。 |
cps.topic |
String |
必須。Kafka レコードをパブリッシュする Pub/Sub トピック。 |
gcp.credentials.file.path |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。 |
gcp.credentials.json |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。 |
headers.publish |
Boolean |
デフォルト: |
maxBufferBytes |
Long |
Pub/Sub にパブリッシュする前に、トピック Kafka パーティションで受信する最大バイト数。 デフォルト: 10000000。 |
maxBufferSize |
Integer |
Pub/Sub にパブリッシュする前に、Kafka トピック パーティションで受信するレコードの最大数。 デフォルト: 100。 |
maxDelayThresholdMs |
Integer |
Pub/Sub に未処理のレコードをパブリッシュする前に、 デフォルト: 100。 |
maxOutstandingMessages |
Long |
パブリッシャーがそれ以上のパブリッシュをブロックする前に、未完了のバッチや保留中のバッチを含め、未処理のままにしておくことができるレコードの最大数。 デフォルト: |
maxOutstandingRequestBytes |
Long |
パブリッシャーがそれ以上のパブリッシュをブロックする前に、未完了のバッチや保留中のバッチを含め、未処理のままにしておくことができる合計バイトの最大数。 デフォルト: |
maxRequestTimeoutMs |
Integer |
Pub/Sub への個々のパブリッシュ リクエストのタイムアウト(ミリ秒単位)。 デフォルト: 10000。 |
maxTotalTimeoutMs |
Integer |
Pub/Sub にパブリッシュするための呼び出しの、再試行を含めた合計タイムアウト(ミリ秒単位)。 デフォルト: 60000。 |
metadata.publish |
Boolean |
デフォルト: |
messageBodyName |
String |
構造体またはマップ値スキーマを使用する場合は、Pub/Sub メッセージ本文として使用するフィールドまたはキーの名前を指定します。Kafka から Pub/Sub への変換をご覧ください。 デフォルト: |
orderingKeySource |
String |
Pub/Sub メッセージで順序指定キーを設定する方法を指定します。次のいずれかの値です。
デフォルト: |
topics |
String |
必須。読み取る Kafka トピックのカンマ区切りのリスト。 |
ソース コネクタの構成オプション
ソースコネクタは、次の構成オプションをサポートしています。
設定 | データ型 | 説明 |
---|---|---|
connector.class |
String |
必須。コネクタの Java クラス。Pub/Sub ソースコネクタの場合、値は com.google.pubsub.kafka.source.CloudPubSubSourceConnector にする必要があります。 |
cps.endpoint |
String |
使用する Pub/Sub エンドポイント。 デフォルト: |
cps.makeOrderingKeyAttribute |
Boolean |
デフォルト: |
cps.maxBatchSize |
Integer |
Pub/Sub への pull リクエストごとにバッチ処理するメッセージの最大数。 デフォルト: 100。 |
cps.project |
String |
必須。Pub/Sub トピックを含む Google Cloud プロジェクト。 |
cps.subscription |
String |
必須。メッセージを pull する Pub/Sub サブスクリプションの名前。 |
gcp.credentials.file.path |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud 認証情報を保存するファイルへのパス。 |
gcp.credentials.json |
String |
省略可。Pub/Sub Lite を認証するための Google Cloud を含む JSON blob。 |
kafka.key.attribute |
String |
Kafka にパブリッシュされたメッセージのキーとして使用する Pub/Sub メッセージ属性。 デフォルト: |
kafka.partition.count |
Integer |
メッセージが公開される Kafka トピックの Kafka パーティションの数。パーティション スキームが デフォルト: 1。 |
kafka.partition.scheme |
String |
Kafka でメッセージをパーティションに割り当てるスキーム。次のいずれかの値です。
デフォルト: |
kafka.record.headers |
Boolean |
|
kafka.topic |
String |
必須。Pub/Sub からメッセージを受信する Kafka トピック。 |
サポートの利用
サポートが必要な場合は、サポート チケットを作成してください。一般的な質問やディスカッションについては、GitHub リポジトリで問題を作成してください。
次のステップ
- Kafka と Pub/Sub の違いについて。
- Pub/Sub グループの Kafka コネクタの詳細
- Pub/Sub グループの Kafka コネクタの GitHub リポジトリを確認する。