このドキュメントでは、Dataflow から Apache Kafka にデータを書き込む方法について説明します。
Apache Beam Kafka I/O コネクタ(KafkaIO
)は Java でネイティブに使用できます。また、Apache Beam の多言語パイプライン フレームワークを使用して、Python と Go でも使用できます。
Java パイプラインの場合は、マネージド I/O コネクタを使用して Kafka から読み取ることを検討してください。
1 回限りの処理
デフォルトでは、KafkaIO
コネクタは書き込み用に「1 回限り」セマンティクスを提供しません。つまり、データが Kafka トピックに複数回書き込まれる可能性があります。1 回限りの書き込みを有効にするには、withEOS
メソッドを呼び出します。1 回限りの書き込みでは、データが宛先 Kafka トピックに 1 回だけ書き込まれることが保証されます。ただし、この方法ではパイプラインのコストが増加し、スループットが低下します。
「1 回限り」セマンティクスに厳格な要件がなく、パイプラインのロジックで重複レコードを処理できる場合は、パイプライン全体で「1 回以上」モードを有効にして、コストを削減することを検討してください。詳細については、パイプライン ストリーミング モードを設定するをご覧ください。
パイプラインのドレイン
パイプラインをドレインする場合、「1 回限り」セマンティクスは保証されません。保証されるのは、確認済みのデータが失われることはないということだけです。その結果、パイプラインのドレイン中に、読み取りオフセットが Kafka に commit されずに一部のデータが処理される可能性があります。パイプラインを変更するときに Kafka の「1 回以上」セマンティクスを実現するには、ジョブをキャンセルして新しいジョブを開始するのではなく、パイプラインを更新します。
「1 回限り」セマンティクス用に Kafka をチューニングする
transaction.max.timeout.ms
と transactional.id.expiration.ms
を調整すると、全体的なフォールト トレランスと 1 回限りの配信戦略を補完できます。ただし、その影響は、サービス停止の性質と固有の設定によって異なります。Kafka ブローカーの停止によるデータの重複を防ぐため、transaction.max.timeout.ms
を Kafka トピックの保持時間に近づけて設定します。
Kafka ブローカーが一時的に使用できなくなった場合(ネットワーク パーティションやノードの障害など)、プロデューサーに進行中のトランザクションがあると、これらのトランザクションがタイムアウトする可能性があります。transaction.max.timeout.ms
の値を増やすと、ブローカーの復元後にトランザクションを完了するまでの時間が長くなるため、トランザクションの再起動やメッセージの再送信が不要になる可能性があります。この緩和策は、トランザクションの再起動による重複メッセージの発生を減らすことで、「1 回限り」セマンティクスを間接的に維持するのに役立ちます。一方、有効期限を短くすると、無効なトランザクション ID をより迅速にクリーンアップし、リソース使用量を削減できます。
ネットワークを構成する
デフォルトでは、Dataflow はデフォルトの Virtual Private Cloud(VPC)ネットワーク内でインスタンスを起動します。Kafka の構成によっては、Dataflow に異なるネットワークとサブネットを構成しなければならない場合があります。詳細については、ネットワークとサブネットワークの指定をご覧ください。 ネットワークを構成するときは、Dataflow ワーカーマシンが Kafka ブローカーにアクセスできるようにファイアウォール ルールを作成します。
VPC Service Controls を使用している場合は、Kafka クラスタを VPC Service Controls の境界内に配置するか、認可済み VPN または Cloud Interconnect に境界を拡張します。
Kafka クラスタが Google Cloud の外部にデプロイされている場合は、Dataflow と Kafka クラスタの間にネットワーク接続を作成する必要があります。複数のネットワーク オプションがあり、それぞれトレードオフがあります。
- 次のいずれかを使用して、共有 RFC 1918 アドレス空間を使用して接続します。
- 次のいずれかを使用して、外部でホストされている Kafka クラスタにパブリック IP アドレスを介してアクセスします。
- 公共のインターネット
- ダイレクト ピアリング
- キャリア ピアリング
予測可能なパフォーマンスと信頼性という点で Dedicated Interconnect が最良のオプションですが、サードパーティが新しい回線をプロビジョニングする必要があるためセットアップに時間がかかることがあります。パブリック IP ベースのトポロジでは、必要となるネットワーキング作業がほとんどないため、すぐに使い始めることができます。
以降のセクションでは、これらのオプションについて詳しく説明します。
共有 RFC 1918 アドレス空間
Dedicated Interconnect と IPsec VPN の両方式は、Virtual Private Cloud(VPC)の RFC 1918 IP アドレスに直接アクセスできるため、Kafka の構成を簡素化できます。VPN ベースのトポロジを使用している場合は、ハイスループット VPN のセットアップを検討してください。
デフォルトでは、Dataflow はデフォルトの VPC ネットワークでインスタンスを起動します。プライベート ネットワーク トポロジで、Cloud Router で明示的に定義されたルートによって Google Cloud 内のサブネットワークが Kafka クラスタに接続されている場合は、Dataflow インスタンスをどこに配置するかを自分で制御できることが必要になります。Dataflow を使用して、network
と subnetwork
の実行パラメータを構成できます。
対応するサブネットワークで、Dataflow がスケールアウトを目的にインスタンスを起動する際に、十分な数の IP アドレスを使用できることを確認してください。また、Dataflow インスタンスを起動するために別のネットワークを作成する場合は、プロジェクト内のすべての仮想マシン間の TCP トラフィックを有効にするファイアウォール ルールを設定するようにしてください。このファイアウォール ルールは、デフォルトのネットワークにはすでに構成されています。
パブリック IP アドレス空間
このアーキテクチャでは、Transport Layer Security(TLS)を使用して外部クライアントと Kafka 間のトラフィックを保護し、ブローカー間の通信に暗号化されていないトラフィックを使用します。Kafka リスナーが、内部通信と外部通信の両方に使用されるネットワーク インターフェースにバインドする場合、リスナーを構成するのは簡単です。ただし、多くのシナリオでは、外部にアドバタイズされる、クラスタ内の Kafka ブローカーのアドレスは、Kafka が使用する内部ネットワーク インターフェースのものとは異なります。このような場合は、advertised.listeners
プロパティを使用できます。
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
外部クライアントはポート 9093 を使用して SSL チャネル経由で接続し、内部クライアントはポート 9092 を使用して平文チャネル経由で接続します。advertised.listeners
でアドレスを指定する際は、外部トラフィックと内部トラフィックの両方で同じインスタンスに解決される DNS 名を使用してください(このサンプルの場合は kafkabroker-n.mydomain.com
)。パブリック IP アドレスは内部トラフィックでは解決できない可能性があるため、パブリック IP アドレスを使用すると機能しない場合があります。
ロギング
KafkaIO
からのロギングは非常に冗長になる可能性があります。本番環境では、次のようにロギング レベルを下げることを検討してください。
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
詳細については、パイプラインのワーカーログ レベルを設定するをご覧ください。