Apache Kafka から Dataflow に読み込む

このドキュメントでは、Apache Kafka から Dataflow にデータを読み取る方法について説明します。

Apache Beam Kafka I/O コネクタ(KafkaIO)は Java でネイティブに使用できます。また、Apache Beam のマルチランゲージ パイプライン フレームワークを使用して PythonGo でも使用できます。

Java パイプラインの場合は、マネージド I/O コネクタを使用して Kafka から読み取ることを検討してください。

並列処理

並列処理は、ワーカーの最大数max_num_workers)と Kafka パーティションの数の 2 つの要素によって制限されます。Dataflow のデフォルトは、4 x max_num_workers の並列処理ファンアウトです。ただし、ファンアウトはパーティション数によって制限されます。たとえば、100 個の vCPU が使用可能で、パイプラインが 10 個の Kafka パーティションからのみ読み取る場合、最大並列処理は 10 になります。

並列処理を最大化するには、少なくとも 4 つの max_num_workers Kafka パーティションを用意することをおすすめします。ジョブで Runner v2 を使用している場合は、並列処理をさらに高く設定することを検討してください。ワーカー vCPU 数の 2 倍のパーティションを作成することをおすすめします。

パーティション数を増やせない場合は、Kafka 読み取りステップの後に Reshuffle ステップまたは Redistribute ステップを挿入することを検討してください。このステップにより、Dataflow はデータをより効率的に再分散して並列化できますが、シャッフル ステップの実行に追加のオーバーヘッドが発生します。詳細については、並列処理に影響する要因をご覧ください。

パーティション間の負荷が比較的均等で、偏らないようにします。負荷が偏っていると、ワーカーの使用率が低下する可能性があります。負荷の軽いパーティションから読み取るワーカーは比較的アイドル状態になる可能性がありますが、負荷の高いパーティションから読み取るワーカーは遅れる可能性があります。Dataflow は、パーティションごとのバックログの指標を提供します。

負荷が偏っている場合は、動的作業分散を使用して作業を分散できます。たとえば、Dataflow は、複数の小容量パーティションから読み取るワーカーを 1 つ割り当て、単一の大容量パーティションから読み取るワーカーを別のワーカーに割り当てることができます。ただし、2 つのワーカーが同じパーティションから読み取ることはできないため、負荷の高いパーティションによってパイプラインが遅れる可能性があります。

ベスト プラクティス

このセクションでは、Kafka から Dataflow に読み込む際の推奨事項について説明します。

件数の少ないトピック

一般的なシナリオは、少量のトピック(お客様ごとに 1 つのトピックなど)を同時に読み取ることです。トピックごとに個別の Dataflow ジョブを作成することは、各ジョブに少なくとも 1 つの完全なワーカーが必要なため、費用対効果が低くなります。代わりに、次のオプションを検討してください。

  • トピックを統合する。トピックを Dataflow に取り込む前に結合します。少数の大容量トピックをインジェストする方が、多数の小容量トピックをインジェストするよりもはるかに効率的です。各大容量トピックは、ワーカーを最大限に活用する単一の Dataflow ジョブで処理できます。

  • 複数のトピックを読む。トピックを Dataflow に取り込む前に結合できない場合は、複数のトピックから読み取るパイプラインの作成を検討してください。このアプローチでは、Dataflow が複数のトピックを同じワーカーに割り当てることができます。このアプローチを実装するには、次の 2 つの方法があります。

    • 単一の読み取りステップKafkaIO コネクタの単一インスタンスを作成し、複数のトピックを読み取るように構成します。次に、トピック名でフィルタして、トピックごとに異なるロジックを適用します。コード例については、複数のトピックから読み取るをご覧ください。すべてのトピックが同じクラスタに配置されている場合は、このオプションを検討してください。1 つのシンクまたは変換で問題が発生すると、すべてのトピックでバックログが蓄積される可能性があるという欠点があります。

      高度なユースケースの場合は、読み取るトピックを指定する KafkaSourceDescriptor オブジェクトのセットを渡します。KafkaSourceDescriptor を使用すると、必要に応じてトピックリストを後で更新できます。この機能を使用するには、Runner v2 を搭載した Java が必要です。

    • 複数の読み取りステップ。異なるクラスタにあるトピックから読み取るには、パイプラインに複数の KafkaIO インスタンスを含めることができます。ジョブの実行中に、変換マッピングを使用して個々のソースを更新できます。新しいトピックまたはクラスタの設定は、Runner v2 を使用している場合にのみサポートされます。このアプローチでは、パイプライン レベルの指標に依存するのではなく、個々の読み取り変換を個別にモニタリングする必要があるため、オブザーバビリティが課題になる可能性があります。

Kafka に commit する

デフォルトでは、KafkaIO コネクタは Kafka オフセットを使用して進行状況を追跡せず、Kafka に commit しません。commitOffsetsInFinalize を呼び出すと、コネクタは Dataflow でレコードが commit された後、Kafka に commit バックするために最善を尽くします。Dataflow で commit されたレコードが完全に処理されていない可能性があります。そのため、パイプラインをキャンセルすると、レコードが完全に処理されずにオフセットが commit される可能性があります。

enable.auto.commit=True の設定は、Dataflow による処理なしで Kafka から読み取られた直後にオフセットを commit するため、このオプションの使用はおすすめしません。enable.auto.commit=FalsecommitOffsetsInFinalize=True の両方を設定することをおすすめします。enable.auto.commitTrue に設定すると、処理中にパイプラインが中断された場合にデータが失われる可能性があります。Kafka にすでに commit されたレコードが破棄される可能性があります。

ウォーターマーク

デフォルトでは、KafkaIO コネクタは現在の処理時間を使用して、出力ウォーターマークとイベント時刻を割り当てます。この動作を変更するには、withTimestampPolicyFactory を呼び出して TimestampPolicy を割り当てます。Beam には、Kafka のログ追加時間またはメッセージの作成時間に基づいてウォーターマークを計算する TimestampPolicy の実装が用意されています。

ランナーに関する考慮事項

KafkaIO コネクタには、Kafka 読み取り用の基盤となる実装が 2 つあります。古い ReadFromKafkaViaUnbounded と新しい ReadFromKafkaViaSDF です。Dataflow は、SDK 言語とジョブの要件に基づいて、ジョブに最適な実装を自動的に選択します。ランナーまたは Kafka の実装でのみ利用可能な特定の機能が必要な場合を除き、ランナーまたは Kafka の実装を明示的にリクエストしないでください。ランナーの選択の詳細については、Dataflow Runner v2 を使用するをご覧ください。

パイプラインで withTopic または withTopics を使用している場合、古い実装では、パイプラインの構築時に使用可能なパーティションについて Kafka にクエリを実行します。パイプラインを作成するマシンには、Kafka に接続する権限が必要です。権限エラーが表示された場合は、ローカルで Kafka に接続する権限があることを確認します。この問題を回避するには、withTopicPartitions を使用します。これは、パイプラインの構築時に Kafka に接続しません。

本番環境にデプロイ

ソリューションを本番環境にデプロイする場合は、Flex テンプレートを使用することをおすすめします。Flex テンプレートを使用すると、パイプラインは一貫した環境から起動されるため、ローカル構成の問題を軽減できます。

KafkaIO からのロギングは非常に冗長になる可能性があります。本番環境では、次のようにロギング レベルを下げることを検討してください。

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

詳細については、パイプライン ワーカーのログレベルを設定するをご覧ください。

ネットワークを構成する

デフォルトでは、Dataflow はデフォルトの Virtual Private Cloud(VPC)ネットワーク内でインスタンスを起動します。Kafka の構成によっては、Dataflow に異なるネットワークとサブネットを構成しなければならない場合があります。詳細については、ネットワークとサブネットワークの指定をご覧ください。ネットワークを構成するときは、Dataflow ワーカーマシンが Kafka ブローカーにアクセスできるようにファイアウォール ルールを作成します。

VPC Service Controls を使用している場合は、Kafka クラスタを VPC Service Controls の境界内に配置するか、認可済み VPN または Cloud Interconnect に境界を拡張します。

Kafka クラスタが Google Cloud の外部にデプロイされている場合は、Dataflow と Kafka クラスタの間にネットワーク接続を作成する必要があります。複数のネットワーク オプションがあり、それぞれトレードオフがあります。

予測可能なパフォーマンスと信頼性という点で Dedicated Interconnect が最良のオプションですが、サードパーティが新しい回線をプロビジョニングする必要があるためセットアップに時間がかかることがあります。パブリック IP ベースのトポロジでは、必要となるネットワーキング作業がほとんどないため、すぐに使い始めることができます。

以降のセクションでは、これらのオプションについて詳しく説明します。

共有 RFC 1918 アドレス空間

Dedicated Interconnect と IPsec VPN の両方式は、Virtual Private Cloud(VPC)の RFC 1918 IP アドレスに直接アクセスできるため、Kafka の構成を簡素化できます。VPN ベースのトポロジを使用している場合は、ハイスループット VPN のセットアップを検討してください。

デフォルトでは、Dataflow はデフォルトの VPC ネットワークでインスタンスを起動します。プライベート ネットワーク トポロジで、Cloud Router で明示的に定義されたルートによって Google Cloud 内のサブネットワークが Kafka クラスタに接続されている場合は、Dataflow インスタンスをどこに配置するかを自分で制御できることが必要になります。Dataflow を使用して、networksubnetwork実行パラメータを構成できます。

対応するサブネットワークで、Dataflow がスケールアウトを目的にインスタンスを起動する際に十分な数の IP アドレスを使用できることを確認してください。また、Dataflow インスタンスを起動するために別のネットワークを作成する場合は、プロジェクト内のすべての仮想マシン間の TCP トラフィックを有効にするファイアウォール ルールを設定するようにしてください。このファイアウォール ルールは、デフォルトのネットワークにはすでに構成されています。

パブリック IP アドレス空間

このアーキテクチャでは、Transport Layer Security(TLS)を使用して外部クライアントと Kafka 間のトラフィックを保護し、ブローカー間の通信に暗号化されていないトラフィックを使用します。Kafka リスナーが、内部通信と外部通信の両方に使用されるネットワーク インターフェースにバインドする場合、リスナーを構成するのは簡単です。ただし、多くのシナリオでは、外部にアドバタイズされる、クラスタ内の Kafka ブローカーのアドレスは、Kafka が使用する内部ネットワーク インターフェースのものとは異なります。このような場合は、advertised.listeners プロパティを使用できます。

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

外部クライアントはポート 9093 を使用して「SSL」チャネル経由で接続し、内部クライアントはポート 9092 を使用して平文チャネル経由で接続します。advertised.listeners でアドレスを指定する際は、外部トラフィックと内部トラフィックの両方で同じインスタンスに解決される DNS 名(このサンプルの場合は、kafkabroker-n.mydomain.com)を使用してください。パブリック IP アドレスは内部トラフィックでは解決できない可能性があるため、パブリック IP アドレスを使用すると機能しない場合があります。

Kafka をチューニングする

Kafka クラスタと Kafka クライアントの設定は、パフォーマンスに大きな影響を与える可能性があります。特に、次の設定が低すぎる可能性があります。このセクションでは、開始点として推奨される値をいくつか示しますが、特定のワークロードに合わせてこれらの値をテストする必要があります。

  • unboundedReaderMaxElements。デフォルトは 10,000 です。100,000 などの大きな値にすると、バンドルのサイズが大きくなる可能性があります。パイプラインに集計が含まれている場合、パフォーマンスが大幅に向上する可能性があります。ただし、値を大きくするとレイテンシも増加する可能性があります。値を設定するには、setUnboundedReaderMaxElements を使用します。この設定は Runner v2 には適用されません。

  • unboundedReaderMaxReadTimeMs。デフォルトは 10,000 ミリ秒です。20,000 ミリ秒などの大きな値にするとバンドルサイズが大きくなりますが、5, 000 ミリ秒などの小さな値にするとレイテンシやバックログを減らすことができます。値を設定するには、setUnboundedReaderMaxReadTimeMs を使用します。この設定は Runner v2 には適用されません。

  • max.poll.records。デフォルトは 500 です。値を大きくすると、特に Runner v2 を使用している場合は、より多くの受信レコードをまとめて取得できるため、パフォーマンスが向上する可能性があります。値を設定するには、withConsumerConfigUpdates を呼び出します。

  • fetch.max.bytes。デフォルトは 1 MB です。値を大きくすると、特に Runner v2 を使用する場合は、リクエスト数を減らしてスループットが向上する可能性があります。ただし、値を高く設定しすぎるとレイテンシが増加する可能性がありますが、ダウンストリーム処理が主なボトルネックになる可能性が高いです。推奨される開始値は 100 MB です。値を設定するには、withConsumerConfigUpdates を呼び出します。

  • max.partition.fetch.bytes。デフォルトは 1 MB です。このパラメータは、サーバーが返すパーティションあたりのデータの最大量を設定します。値を増やすと、特に Runner v2 を使用する場合は、リクエスト数を減らしてスループットを改善できます。ただし、設定値を高く設定しすぎるとレイテンシが増加する可能性があります。ただし、ダウンストリーム処理が主なボトルネックになる可能性が高いです。推奨される開始値は 100 MB です。値を設定するには、withConsumerConfigUpdates を呼び出します。

  • consumerPollingTimeout。デフォルトは 2 秒です。レコードを読み取る前にコンシューマ クライアントがタイムアウトする場合は、値を大きくしてみてください。この設定は、クロスリージョン読み取りやネットワーク速度が遅い読み取りを行う場合に最も関連性があります。値を設定するには、withConsumerPollingTimeout を呼び出します。

receive.buffer.bytes がメッセージのサイズを処理するのに十分な大きさであることを確認します。値が小さすぎると、コンシューマが継続的に再作成され、特定のオフセットにシークされていることがログに表示されることがあります。

次のコードサンプルは、Kafka から読み取る Dataflow パイプラインの作成方法を示しています。アプリケーションのデフォルト認証情報を Google Cloud Managed Service for Apache Kafka 提供のコールバック ハンドラと組み合わせて使用する場合は、kafka-clients バージョン 3.7.0 以降が必要です。

単一トピックから読み取る

この例では、Kafka トピックから読み取り、メッセージ ペイロードをテキスト ファイルに書き込みます。

Java

Dataflow で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Dataflow で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

複数のトピックから読み取る

この例では、複数の Kafka トピックから読み取り、トピックごとに個別のパイプライン ロジックを適用します。

高度なユースケースでは、読み取るトピックのリストを更新できるように、一連の KafkaSourceDescriptor オブジェクトを動的に渡します。この方法では、Runner v2 で Java が必要です。

Java

Dataflow で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Dataflow で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))