Aus Apache Kafka in Dataflow lesen

In diesem Dokument wird beschrieben, wie Sie Daten aus Apache Kafka in Dataflow lesen.

Der Apache Beam Kafka-E/A-Connector (KafkaIO) ist nativ für Java verfügbar und kann mit dem mehrsprachigen Pipeline-Framework von Apache Beam auch für Python und Go verwendet werden.

Verwenden Sie bei Java-Pipelines den verwalteten E/A-Connector, um Daten aus Kafka zu lesen.

Parallelität

Die Parallelität wird durch zwei Faktoren eingeschränkt: die maximale Anzahl von Workern (max_num_workers) und die Anzahl der Kafka-Partitionen. In Dataflow ist standardmäßig ein Parallelitäts-Fanout von 4 × max_num_workers festgelegt. Der Fanout ist jedoch durch die Anzahl der Partitionen begrenzt. Wenn beispielsweise 100 vCPUs verfügbar sind, die Pipeline aber nur aus 10 Kafka-Partitionen liest, beträgt die maximale Parallelität 10.

Um die Parallelität zu maximieren, sollten Sie mindestens 4 × max_num_workers Kafka-Partitionen verwenden. Wenn für Ihren Job Runner v2 verwendet wird, sollten Sie die Parallelität noch höher ansetzen. Ein guter Ausgangspunkt ist eine Anzahl von Partitionen, die doppelt so hoch ist wie die Anzahl der Worker-vCPUs.

Wenn Sie die Anzahl der Partitionen nicht erhöhen können, sollten Sie nach dem Kafka-Leseschritt einen Reshuffle- oder Redistribute-Schritt einfügen. Durch diesen Schritt können die Daten in Dataflow effizienter neu verteilt und parallelisiert werden. Allerdings erhöht sich der Overhead für die Durchführung des Shuffle-Schritts. Weitere Informationen finden Sie unter Faktoren, die sich auf die Parallelität auswirken.

Achten Sie darauf, dass die Auslastung zwischen den Partitionen relativ gleichmäßig und nicht verzerrt ist. Wenn die Last ungleich verteilt ist, kann dies zu einer schlechten Auslastung der Worker führen. Worker, die aus Partitionen mit geringerer Auslastung lesen, sind möglicherweise relativ inaktiv, während Worker, die aus Partitionen mit hoher Auslastung lesen, ins Hintertreffen geraten. Dataflow bietet Messwerte für den Rückstand pro Partition.

Wenn die Last ungleich verteilt ist, kann dynamisches Work-Balancing helfen, die Arbeit zu verteilen. So kann Dataflow beispielsweise einen Worker für das Lesen aus mehreren Partitionen mit geringem Volumen und einen anderen Worker für das Lesen aus einer einzelnen Partition mit hohem Volumen zuweisen. Zwei Worker können jedoch nicht aus derselben Partition lesen. Eine stark ausgelastete Partition kann daher dazu führen, dass sich die Verarbeitung der Pipeline verzögert.

Best Practices

Dieser Abschnitt enthält Empfehlungen zum Lesen von Kafka in Dataflow.

Themen mit geringem Volumen

Ein häufiges Szenario ist das gleichzeitige Lesen aus vielen Themen mit geringem Volumen, z. B. ein Thema pro Kunde. Das Erstellen separater Dataflow-Jobs für jedes Thema ist kostenineffizient, da für jeden Job mindestens ein vollständiger Worker erforderlich ist. Sie haben stattdessen folgende Möglichkeiten:

  • Themen zusammenführen. Themen kombinieren, bevor sie in Dataflow aufgenommen werden. Das Aufnehmen weniger Themen mit hohem Volumen ist viel effizienter als das Aufnehmen vieler Themen mit geringem Volumen. Jedes Thema mit hohem Volumen kann von einem einzelnen Dataflow-Job verarbeitet werden, der seine Worker voll auslastet.

  • Mehrere Themen lesen Wenn Sie Themen nicht kombinieren können, bevor sie in Dataflow aufgenommen werden, können Sie eine Pipeline erstellen, die Daten aus mehreren Themen liest. So kann Dataflow demselben Worker mehrere Themen zuweisen. Es gibt zwei Möglichkeiten, diesen Ansatz zu implementieren:

    • Ein Schritt zum Lesen. Erstellen Sie eine einzelne Instanz des KafkaIO-Connectors und konfigurieren Sie sie so, dass mehrere Themen gelesen werden. Filtern Sie dann nach dem Themennamen, um unterschiedliche Logik pro Thema anzuwenden. Beispielcode finden Sie unter Aus mehreren Themen lesen. Diese Option ist sinnvoll, wenn sich alle Ihre Themen im selben Cluster befinden. Ein Nachteil ist, dass Probleme mit einer einzelnen Datensenke oder einer einzelnen Transformation dazu führen können, dass sich bei allen Themen ein Rückstand ansammelt.

      Für erweiterte Anwendungsfälle übergeben Sie eine Reihe von KafkaSourceDescriptor-Objekten, die die Themen angeben, aus denen gelesen werden soll. Mit KafkaSourceDescriptor können Sie die Themenliste bei Bedarf später aktualisieren. Für diese Funktion ist Java mit Runner v2 erforderlich.

    • Mehrere Leseschritte Wenn Sie Daten aus Themen in verschiedenen Clustern lesen möchten, kann Ihre Pipeline mehrere KafkaIO-Instanzen enthalten. Während der Job ausgeführt wird, können Sie einzelne Quellen mithilfe von Transformierungszuordnungen aktualisieren. Das Festlegen eines neuen Themas oder Clusters wird nur bei Verwendung von Runner v2 unterstützt. Bei diesem Ansatz kann die Beobachtbarkeit eine Herausforderung darstellen, da Sie jede einzelne Lesetransformation überwachen müssen, anstatt sich auf Messwerte auf Pipelineebene zu verlassen.

Zurück zu Kafka committen

Standardmäßig verwendet der KafkaIO-Connector keine Kafka-Offsets, um den Fortschritt zu verfolgen, und speichert keine Daten zurück in Kafka. Wenn Sie commitOffsetsInFinalize aufrufen, versucht der Connector, die Datensätze nach dem Commit in Dataflow wieder in Kafka zu committen. Gespeicherte Datensätze in Dataflow werden möglicherweise nicht vollständig verarbeitet. Wenn Sie die Pipeline abbrechen, wird möglicherweise ein Offset gespeichert, ohne dass die Datensätze vollständig verarbeitet wurden.

Da mit enable.auto.commit=True Offsets committet werden, sobald sie aus Kafka gelesen werden, ohne dass sie von Dataflow verarbeitet werden, wird die Verwendung dieser Option nicht empfohlen. Wir empfehlen, sowohl enable.auto.commit=False als auch commitOffsetsInFinalize=True festzulegen. Wenn Sie enable.auto.commit auf True festlegen, können Daten verloren gehen, wenn die Pipeline während der Verarbeitung unterbrochen wird. Bereits in Kafka committete Einträge werden möglicherweise gelöscht.

Wasserzeichen

Standardmäßig verwendet der KafkaIO-Connector die aktuelle Verarbeitungszeit, um das Ausgabewasserzeichen und die Ereigniszeit zuzuweisen. Wenn Sie dieses Verhalten ändern möchten, rufen Sie withTimestampPolicyFactory auf und weisen Sie eine TimestampPolicy zu. Beam bietet Implementierungen von TimestampPolicy, die das Wasserzeichen entweder basierend auf der Zeit des Anhängens an das Kafka-Protokoll oder der Erstellungszeit der Nachricht berechnen.

Hinweise zu Runnern

Der KafkaIO-Connector hat zwei zugrunde liegende Implementierungen für Kafka-Lesungen: die ältere ReadFromKafkaViaUnbounded und die neuere ReadFromKafkaViaSDF. Dataflow wählt automatisch die beste Implementierung für Ihren Job basierend auf der SDK-Sprache und den Jobanforderungen aus. Fordern Sie einen Runner oder eine Kafka-Implementierung nur dann explizit an, wenn Sie bestimmte Funktionen benötigen, die nur in dieser Implementierung verfügbar sind. Weitere Informationen zur Auswahl eines Runners finden Sie unter Dataflow Runner v2 verwenden.

Wenn in Ihrer Pipeline withTopic oder withTopics verwendet wird, fragt die ältere Implementierung beim Erstellen der Pipeline Kafka nach den verfügbaren Partitionen ab. Der Computer, auf dem die Pipeline erstellt wird, muss eine Berechtigung zum Herstellen einer Verbindung zu Kafka haben. Wenn Sie einen Berechtigungsfehler erhalten, prüfen Sie, ob Sie Berechtigungen zum Herstellen einer lokalen Verbindung zu Kafka haben. Sie können dieses Problem vermeiden, indem Sie withTopicPartitions verwenden, das beim Erstellen der Pipeline keine Verbindung zu Kafka herstellt.

Für Produktion bereitstellen

Wenn Sie Ihre Lösung in der Produktionsumgebung bereitstellen, empfehlen wir die Verwendung von Flex-Vorlagen. Wenn Sie eine flexible Vorlage verwenden, wird die Pipeline in einer einheitlichen Umgebung gestartet. Dies kann dazu beitragen, lokale Konfigurationsprobleme zu vermeiden.

Das Logging von KafkaIO kann sehr ausführlich sein. Sie können die Loggingebene in der Produktion so reduzieren:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Weitere Informationen finden Sie unter Logebenen für Pipeline-Worker festlegen.

Netzwerk konfigurieren

Normalerweise startet Dataflow Instanzen in Ihrem VPC-Standardnetzwerk (Virtual Private Cloud). Je nach Kafka-Konfiguration müssen Sie möglicherweise ein anderes Netzwerk und Subnetz für Dataflow konfigurieren. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben. Erstellen Sie beim Konfigurieren Ihres Netzwerks Firewallregeln, mit denen die Dataflow-Worker-Maschinen die Kafka-Broker erreichen können.

Wenn Sie VPC Service Controls verwenden, platzieren Sie den Kafka-Cluster innerhalb des VPC Service Controls-Perimeters oder weiten Sie die Perimeter auf das autorisierte VPN oder Cloud Interconnect aus.

Wenn Ihr Kafka-Cluster außerhalb von Google Cloud bereitgestellt wird, müssen Sie eine Netzwerkverbindung zwischen Dataflow und dem Kafka-Cluster erstellen. Es gibt mehrere Netzwerkoptionen mit unterschiedlichen Vor- und Nachteilen:

Dedicated Interconnect ist die beste Option für eine vorhersagbare Leistung und Zuverlässigkeit. Die Einrichtung kann jedoch länger dauern, da die neuen Verbindungen durch Dritte bereitgestellt werden müssen. Mit einer Topologie auf Basis einer öffentlichen IP-Adresse können Sie schnell starten, da nur wenig Netzwerkarbeit erforderlich ist.

In den nächsten beiden Abschnitten werden diese Optionen ausführlicher beschrieben.

Freigegebener RFC 1918-Adressraum

Sowohl Dedicated Interconnect als auch IPsec-VPN bieten Ihnen direkten Zugriff auf RFC 1918-IP-Adressen in Ihrer Virtual Private Cloud (VPC), was Ihre Kafka-Konfiguration vereinfachen kann. Wenn Sie eine VPN-basierte Topologie verwenden, sollten Sie ein VPN mit hohem Durchsatz erstellen.

Normalerweise startet Dataflow Instanzen in Ihrem standardmäßigen VPC-Netzwerk. In einer privaten Netzwerktopologie mit explizit in Cloud Router definierten Routen, die Subnetzwerke in Google Cloud mit diesem Kafka-Cluster verbinden, müssen Sie die Standorte Ihrer Dataflow-Instanzen besser steuern können. Sie können mit Dataflow die Ausführungsparameter network und subnetwork konfigurieren.

Achten Sie darauf, dass das entsprechende Subnetzwerk über genügend IP-Adressen verfügt, damit Dataflow Instanzen beim horizontalen Skalieren starten kann. Wenn Sie zum Starten der Dataflow-Instanzen ein separates Netzwerk erstellen, ist außerdem wichtig, dass Sie eine Firewallregel haben, die TCP-Traffic zwischen allen virtuellen Maschinen im Projekt zulässt. Im Standardnetzwerk ist diese Firewallregel bereits konfiguriert.

Öffentlicher IP-Adressraum

Diese Architektur verwendet zum Schutz des Traffics zwischen externen Clients und Kafka Transport Layer Security (TLS). Die Kommunikation zwischen den Brokern erfolgt in Klartext. Wenn sich der Kafka-Listener an eine Netzwerkschnittstelle bindet, die sowohl für die interne als auch für die externe Kommunikation verwendet wird, ist die Konfiguration des Listeners unkompliziert. In vielen Szenarien unterscheiden sich die extern veröffentlichten Adressen der Kafka-Broker im Cluster jedoch von den internen Netzwerkschnittstellen, die Kafka verwendet. In solchen Szenarien können Sie das Attribut advertised.listeners verwenden:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Externe Clients verbinden sich über Port 9093 über einen „SSL“-Kanal her und interne Clients verbinden sich über einen Klartextkanal über Port 9092. Wenn Sie unter advertised.listeners eine Adresse angeben, verwenden Sie DNS-Namen (in diesem Beispiel kafkabroker-n.mydomain.com), die für externen und internen Traffic in dieselbe Instanz aufgelöst werden. Öffentliche IP-Adressen funktionieren eventuell nicht, da die Adressen möglicherweise für den internen Traffic nicht richtig aufgelöst werden.

Kafka optimieren

Die Einstellungen für Ihren Kafka-Cluster und Ihren Kafka-Client können einen großen Einfluss auf die Leistung haben. Insbesondere die folgenden Einstellungen sind möglicherweise zu niedrig. In diesem Abschnitt finden Sie einige Vorschläge für Startwerte. Sie sollten jedoch mit diesen Werten für Ihre jeweilige Arbeitslast experimentieren.

  • unboundedReaderMaxElements. Die Standardeinstellung ist 10.000. Ein höherer Wert wie 100.000 kann die Größe der Bundles erhöhen. Dies kann die Leistung erheblich verbessern, wenn Ihre Pipeline Aggregationen enthält. Ein höherer Wert kann jedoch auch die Latenz erhöhen. Verwenden Sie setUnboundedReaderMaxElements, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.

  • unboundedReaderMaxReadTimeMs. Der Standardwert ist 10.000 ms. Ein höherer Wert wie 20.000 ms kann die Bundle-Größe erhöhen, während ein niedrigerer Wert wie 5.000 ms die Latenz oder den Rückstand reduzieren kann. Verwenden Sie setUnboundedReaderMaxReadTimeMs, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.

  • max.poll.records. Die Standardeinstellung ist 500. Ein höherer Wert kann zu einer besseren Leistung führen, da mehr eingehende Datensätze gleichzeitig abgerufen werden, insbesondere bei Verwendung von Runner v2. Rufen Sie withConsumerConfigUpdates auf, um den Wert festzulegen.

  • fetch.max.bytes. Der Standardwert ist 1 MB. Ein höherer Wert kann den Durchsatz verbessern, indem die Anzahl der Anfragen reduziert wird, insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch festlegen, kann die Latenz steigen. Die Downstream-Verarbeitung ist jedoch wahrscheinlich der Hauptengpass. Als Anfangswert wird 100 MB empfohlen. Rufen Sie withConsumerConfigUpdates auf, um den Wert festzulegen.

  • max.partition.fetch.bytes. Der Standardwert ist 1 MB. Mit diesem Parameter wird die maximale Datenmenge pro Partition festgelegt, die der Server zurückgibt. Wenn Sie den Wert erhöhen, kann der Durchsatz durch eine Verringerung der Anzahl der Anfragen verbessert werden, insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch festlegen, kann die Latenz steigen. Die Downstream-Verarbeitung ist jedoch wahrscheinlich der Hauptengpass. Als Anfangswert wird 100 MB empfohlen. Rufen Sie withConsumerConfigUpdates auf, um den Wert festzulegen.

  • consumerPollingTimeout. Die Standardeinstellung ist 2 Sekunden. Wenn die Zeitüberschreitung des Verbraucherclients eintritt, bevor Datensätze gelesen werden können, versuchen Sie, einen höheren Wert festzulegen. Diese Einstellung ist am häufigsten relevant, wenn zwischen Regionen gelesen wird oder Lesevorgänge über ein langsames Netzwerk erfolgen. Rufen Sie withConsumerPollingTimeout auf, um den Wert festzulegen.

Achten Sie darauf, dass receive.buffer.bytes groß genug ist, um die Größe der Nachrichten zu verarbeiten. Ist der Wert zu niedrig, wird in den Protokollen möglicherweise angezeigt, dass Verbraucher kontinuierlich neu erstellt und auf einen bestimmten Offset gesetzt werden.

Beispiele

In den folgenden Codebeispielen wird gezeigt, wie Dataflow-Pipelines erstellt werden, die Daten aus Kafka lesen.

Aus einem einzelnen Thema lesen

In diesem Beispiel werden Daten aus einem Kafka-Thema gelesen und die Nachrichten-Nutzlasten in Textdateien geschrieben.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Aus mehreren Themen lesen

In diesem Beispiel werden Daten aus mehreren Kafka-Themen gelesen und für jedes Thema wird eine separate Pipelinelogik angewendet.

Bei komplexeren Anwendungsfällen können Sie dynamisch eine Reihe von KafkaSourceDescriptor-Objekten übergeben, um die Liste der Themen zu aktualisieren, aus denen gelesen werden soll. Für diesen Ansatz ist Java mit Runner v2 erforderlich.

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))