In diesem Dokument wird beschrieben, wie Sie Daten aus Apache Kafka in Dataflow lesen.
Der Apache Beam Kafka-E/A-Connector (KafkaIO
) ist nativ für Java verfügbar und kann mit dem mehrsprachigen Pipeline-Framework von Apache Beam auch für Python und Go verwendet werden.
Verwenden Sie bei Java-Pipelines den verwalteten E/A-Connector, um Daten aus Kafka zu lesen.
Parallelität
Die Parallelität wird durch zwei Faktoren eingeschränkt: die maximale Anzahl von Workern (max_num_workers
) und die Anzahl der Kafka-Partitionen. In Dataflow ist standardmäßig ein Parallelitäts-Fanout von 4 × max_num_workers
festgelegt. Der Fanout ist jedoch durch die Anzahl der Partitionen begrenzt. Wenn beispielsweise 100 vCPUs verfügbar sind, die Pipeline aber nur aus 10 Kafka-Partitionen liest, beträgt die maximale Parallelität 10.
Um die Parallelität zu maximieren, sollten Sie mindestens 4 × max_num_workers
Kafka-Partitionen verwenden. Wenn für Ihren Job Runner v2 verwendet wird, sollten Sie die Parallelität noch höher ansetzen.
Ein guter Ausgangspunkt ist eine Anzahl von Partitionen, die doppelt so hoch ist wie die Anzahl der Worker-vCPUs.
Wenn Sie die Anzahl der Partitionen nicht erhöhen können, sollten Sie nach dem Kafka-Leseschritt einen Reshuffle
- oder Redistribute
-Schritt einfügen. Durch diesen Schritt können die Daten in Dataflow effizienter neu verteilt und parallelisiert werden. Allerdings erhöht sich der Overhead für die Durchführung des Shuffle-Schritts. Weitere Informationen finden Sie unter Faktoren, die sich auf die Parallelität auswirken.
Achten Sie darauf, dass die Auslastung zwischen den Partitionen relativ gleichmäßig und nicht verzerrt ist. Wenn die Last ungleich verteilt ist, kann dies zu einer schlechten Auslastung der Worker führen. Worker, die aus Partitionen mit geringerer Auslastung lesen, sind möglicherweise relativ inaktiv, während Worker, die aus Partitionen mit hoher Auslastung lesen, ins Hintertreffen geraten. Dataflow bietet Messwerte für den Rückstand pro Partition.
Wenn die Last ungleich verteilt ist, kann dynamisches Work-Balancing helfen, die Arbeit zu verteilen. So kann Dataflow beispielsweise einen Worker für das Lesen aus mehreren Partitionen mit geringem Volumen und einen anderen Worker für das Lesen aus einer einzelnen Partition mit hohem Volumen zuweisen. Zwei Worker können jedoch nicht aus derselben Partition lesen. Eine stark ausgelastete Partition kann daher dazu führen, dass sich die Verarbeitung der Pipeline verzögert.
Best Practices
Dieser Abschnitt enthält Empfehlungen zum Lesen von Kafka in Dataflow.
Themen mit geringem Volumen
Ein häufiges Szenario ist das gleichzeitige Lesen aus vielen Themen mit geringem Volumen, z. B. ein Thema pro Kunde. Das Erstellen separater Dataflow-Jobs für jedes Thema ist kostenineffizient, da für jeden Job mindestens ein vollständiger Worker erforderlich ist. Sie haben stattdessen folgende Möglichkeiten:
Themen zusammenführen. Themen kombinieren, bevor sie in Dataflow aufgenommen werden. Das Aufnehmen weniger Themen mit hohem Volumen ist viel effizienter als das Aufnehmen vieler Themen mit geringem Volumen. Jedes Thema mit hohem Volumen kann von einem einzelnen Dataflow-Job verarbeitet werden, der seine Worker voll auslastet.
Mehrere Themen lesen Wenn Sie Themen nicht kombinieren können, bevor sie in Dataflow aufgenommen werden, können Sie eine Pipeline erstellen, die Daten aus mehreren Themen liest. So kann Dataflow demselben Worker mehrere Themen zuweisen. Es gibt zwei Möglichkeiten, diesen Ansatz zu implementieren:
Ein Schritt zum Lesen. Erstellen Sie eine einzelne Instanz des
KafkaIO
-Connectors und konfigurieren Sie sie so, dass mehrere Themen gelesen werden. Filtern Sie dann nach dem Themennamen, um unterschiedliche Logik pro Thema anzuwenden. Beispielcode finden Sie unter Aus mehreren Themen lesen. Diese Option ist sinnvoll, wenn sich alle Ihre Themen im selben Cluster befinden. Ein Nachteil ist, dass Probleme mit einer einzelnen Datensenke oder einer einzelnen Transformation dazu führen können, dass sich bei allen Themen ein Rückstand ansammelt.Für erweiterte Anwendungsfälle übergeben Sie eine Reihe von
KafkaSourceDescriptor
-Objekten, die die Themen angeben, aus denen gelesen werden soll. MitKafkaSourceDescriptor
können Sie die Themenliste bei Bedarf später aktualisieren. Für diese Funktion ist Java mit Runner v2 erforderlich.Mehrere Leseschritte Wenn Sie Daten aus Themen in verschiedenen Clustern lesen möchten, kann Ihre Pipeline mehrere
KafkaIO
-Instanzen enthalten. Während der Job ausgeführt wird, können Sie einzelne Quellen mithilfe von Transformierungszuordnungen aktualisieren. Das Festlegen eines neuen Themas oder Clusters wird nur bei Verwendung von Runner v2 unterstützt. Bei diesem Ansatz kann die Beobachtbarkeit eine Herausforderung darstellen, da Sie jede einzelne Lesetransformation überwachen müssen, anstatt sich auf Messwerte auf Pipelineebene zu verlassen.
Zurück zu Kafka committen
Standardmäßig verwendet der KafkaIO
-Connector keine Kafka-Offsets, um den Fortschritt zu verfolgen, und speichert keine Daten zurück in Kafka. Wenn Sie commitOffsetsInFinalize
aufrufen, versucht der Connector, die Datensätze nach dem Commit in Dataflow wieder in Kafka zu committen. Gespeicherte Datensätze in Dataflow werden möglicherweise nicht vollständig verarbeitet. Wenn Sie die Pipeline abbrechen, wird möglicherweise ein Offset gespeichert, ohne dass die Datensätze vollständig verarbeitet wurden.
Da mit enable.auto.commit=True
Offsets committet werden, sobald sie aus Kafka gelesen werden, ohne dass sie von Dataflow verarbeitet werden, wird die Verwendung dieser Option nicht empfohlen.
Wir empfehlen, sowohl enable.auto.commit=False
als auch commitOffsetsInFinalize=True
festzulegen. Wenn Sie enable.auto.commit
auf True
festlegen, können Daten verloren gehen, wenn die Pipeline während der Verarbeitung unterbrochen wird. Bereits in Kafka committete Einträge werden möglicherweise gelöscht.
Wasserzeichen
Standardmäßig verwendet der KafkaIO
-Connector die aktuelle Verarbeitungszeit, um das Ausgabewasserzeichen und die Ereigniszeit zuzuweisen. Wenn Sie dieses Verhalten ändern möchten, rufen Sie withTimestampPolicyFactory
auf und weisen Sie eine TimestampPolicy
zu. Beam bietet Implementierungen von TimestampPolicy
, die das Wasserzeichen entweder basierend auf der Zeit des Anhängens an das Kafka-Protokoll oder der Erstellungszeit der Nachricht berechnen.
Hinweise zu Runnern
Der KafkaIO
-Connector hat zwei zugrunde liegende Implementierungen für Kafka-Lesungen: die ältere ReadFromKafkaViaUnbounded
und die neuere ReadFromKafkaViaSDF
. Dataflow wählt automatisch die beste Implementierung für Ihren Job basierend auf der SDK-Sprache und den Jobanforderungen aus. Fordern Sie einen Runner oder eine Kafka-Implementierung nur dann explizit an, wenn Sie bestimmte Funktionen benötigen, die nur in dieser Implementierung verfügbar sind. Weitere Informationen zur Auswahl eines Runners finden Sie unter Dataflow Runner v2 verwenden.
Wenn in Ihrer Pipeline withTopic
oder withTopics
verwendet wird, fragt die ältere Implementierung beim Erstellen der Pipeline Kafka nach den verfügbaren Partitionen ab. Der Computer, auf dem die Pipeline erstellt wird, muss eine Berechtigung zum Herstellen einer Verbindung zu Kafka haben. Wenn Sie einen Berechtigungsfehler erhalten, prüfen Sie, ob Sie Berechtigungen zum Herstellen einer lokalen Verbindung zu Kafka haben. Sie können dieses Problem vermeiden, indem Sie withTopicPartitions
verwenden, das beim Erstellen der Pipeline keine Verbindung zu Kafka herstellt.
Für Produktion bereitstellen
Wenn Sie Ihre Lösung in der Produktionsumgebung bereitstellen, empfehlen wir die Verwendung von Flex-Vorlagen. Wenn Sie eine flexible Vorlage verwenden, wird die Pipeline in einer einheitlichen Umgebung gestartet. Dies kann dazu beitragen, lokale Konfigurationsprobleme zu vermeiden.
Das Logging von KafkaIO
kann sehr ausführlich sein. Sie können die Loggingebene in der Produktion so reduzieren:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Weitere Informationen finden Sie unter Logebenen für Pipeline-Worker festlegen.
Netzwerk konfigurieren
Normalerweise startet Dataflow Instanzen in Ihrem VPC-Standardnetzwerk (Virtual Private Cloud). Je nach Kafka-Konfiguration müssen Sie möglicherweise ein anderes Netzwerk und Subnetz für Dataflow konfigurieren. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben. Erstellen Sie beim Konfigurieren Ihres Netzwerks Firewallregeln, mit denen die Dataflow-Worker-Maschinen die Kafka-Broker erreichen können.
Wenn Sie VPC Service Controls verwenden, platzieren Sie den Kafka-Cluster innerhalb des VPC Service Controls-Perimeters oder weiten Sie die Perimeter auf das autorisierte VPN oder Cloud Interconnect aus.
Wenn Ihr Kafka-Cluster außerhalb von Google Cloud bereitgestellt wird, müssen Sie eine Netzwerkverbindung zwischen Dataflow und dem Kafka-Cluster erstellen. Es gibt mehrere Netzwerkoptionen mit unterschiedlichen Vor- und Nachteilen:
- Verbindung über einen freigegebenen RFC 1918-Adressbereich herstellen. Verwenden Sie dazu eine der folgenden Methoden:
- Sie können Ihren extern gehosteten Kafka-Cluster über öffentliche IP-Adressen erreichen. Verwenden Sie dazu eine der folgenden Methoden:
- Öffentliches Internet
- Direct Peering
- Carrier Peering
Dedicated Interconnect ist die beste Option für eine vorhersagbare Leistung und Zuverlässigkeit. Die Einrichtung kann jedoch länger dauern, da die neuen Verbindungen durch Dritte bereitgestellt werden müssen. Mit einer Topologie auf Basis einer öffentlichen IP-Adresse können Sie schnell starten, da nur wenig Netzwerkarbeit erforderlich ist.
In den nächsten beiden Abschnitten werden diese Optionen ausführlicher beschrieben.
Freigegebener RFC 1918-Adressraum
Sowohl Dedicated Interconnect als auch IPsec-VPN bieten Ihnen direkten Zugriff auf RFC 1918-IP-Adressen in Ihrer Virtual Private Cloud (VPC), was Ihre Kafka-Konfiguration vereinfachen kann. Wenn Sie eine VPN-basierte Topologie verwenden, sollten Sie ein VPN mit hohem Durchsatz erstellen.
Normalerweise startet Dataflow Instanzen in Ihrem standardmäßigen VPC-Netzwerk. In einer privaten Netzwerktopologie mit explizit in Cloud Router definierten Routen, die Subnetzwerke in Google Cloud mit diesem Kafka-Cluster verbinden, müssen Sie die Standorte Ihrer Dataflow-Instanzen besser steuern können. Sie können mit Dataflow die Ausführungsparameter network
und subnetwork
konfigurieren.
Achten Sie darauf, dass das entsprechende Subnetzwerk über genügend IP-Adressen verfügt, damit Dataflow Instanzen beim horizontalen Skalieren starten kann. Wenn Sie zum Starten der Dataflow-Instanzen ein separates Netzwerk erstellen, ist außerdem wichtig, dass Sie eine Firewallregel haben, die TCP-Traffic zwischen allen virtuellen Maschinen im Projekt zulässt. Im Standardnetzwerk ist diese Firewallregel bereits konfiguriert.
Öffentlicher IP-Adressraum
Diese Architektur verwendet zum Schutz des Traffics zwischen externen Clients und Kafka Transport Layer Security (TLS). Die Kommunikation zwischen den Brokern erfolgt in Klartext. Wenn sich der Kafka-Listener an eine Netzwerkschnittstelle bindet, die sowohl für die interne als auch für die externe Kommunikation verwendet wird, ist die Konfiguration des Listeners unkompliziert. In vielen Szenarien unterscheiden sich die extern veröffentlichten Adressen der Kafka-Broker im Cluster jedoch von den internen Netzwerkschnittstellen, die Kafka verwendet. In solchen Szenarien können Sie das Attribut advertised.listeners
verwenden:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Externe Clients verbinden sich über Port 9093 über einen „SSL“-Kanal her und interne Clients verbinden sich über einen Klartextkanal über Port 9092. Wenn Sie unter advertised.listeners
eine Adresse angeben, verwenden Sie DNS-Namen (in diesem Beispiel kafkabroker-n.mydomain.com
), die für externen und internen Traffic in dieselbe Instanz aufgelöst werden. Öffentliche IP-Adressen funktionieren eventuell nicht, da die Adressen möglicherweise für den internen Traffic nicht richtig aufgelöst werden.
Kafka optimieren
Die Einstellungen für Ihren Kafka-Cluster und Ihren Kafka-Client können einen großen Einfluss auf die Leistung haben. Insbesondere die folgenden Einstellungen sind möglicherweise zu niedrig. In diesem Abschnitt finden Sie einige Vorschläge für Startwerte. Sie sollten jedoch mit diesen Werten für Ihre jeweilige Arbeitslast experimentieren.
unboundedReaderMaxElements
. Die Standardeinstellung ist 10.000. Ein höherer Wert wie 100.000 kann die Größe der Bundles erhöhen. Dies kann die Leistung erheblich verbessern, wenn Ihre Pipeline Aggregationen enthält. Ein höherer Wert kann jedoch auch die Latenz erhöhen. Verwenden SiesetUnboundedReaderMaxElements
, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.unboundedReaderMaxReadTimeMs
. Der Standardwert ist 10.000 ms. Ein höherer Wert wie 20.000 ms kann die Bundle-Größe erhöhen, während ein niedrigerer Wert wie 5.000 ms die Latenz oder den Rückstand reduzieren kann. Verwenden SiesetUnboundedReaderMaxReadTimeMs
, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.max.poll.records
. Die Standardeinstellung ist 500. Ein höherer Wert kann zu einer besseren Leistung führen, da mehr eingehende Datensätze gleichzeitig abgerufen werden, insbesondere bei Verwendung von Runner v2. Rufen SiewithConsumerConfigUpdates
auf, um den Wert festzulegen.fetch.max.bytes
. Der Standardwert ist 1 MB. Ein höherer Wert kann den Durchsatz verbessern, indem die Anzahl der Anfragen reduziert wird, insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch festlegen, kann die Latenz steigen. Die Downstream-Verarbeitung ist jedoch wahrscheinlich der Hauptengpass. Als Anfangswert wird 100 MB empfohlen. Rufen SiewithConsumerConfigUpdates
auf, um den Wert festzulegen.max.partition.fetch.bytes
. Der Standardwert ist 1 MB. Mit diesem Parameter wird die maximale Datenmenge pro Partition festgelegt, die der Server zurückgibt. Wenn Sie den Wert erhöhen, kann der Durchsatz durch eine Verringerung der Anzahl der Anfragen verbessert werden, insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch festlegen, kann die Latenz steigen. Die Downstream-Verarbeitung ist jedoch wahrscheinlich der Hauptengpass. Als Anfangswert wird 100 MB empfohlen. Rufen SiewithConsumerConfigUpdates
auf, um den Wert festzulegen.consumerPollingTimeout
. Die Standardeinstellung ist 2 Sekunden. Wenn die Zeitüberschreitung des Verbraucherclients eintritt, bevor Datensätze gelesen werden können, versuchen Sie, einen höheren Wert festzulegen. Diese Einstellung ist am häufigsten relevant, wenn zwischen Regionen gelesen wird oder Lesevorgänge über ein langsames Netzwerk erfolgen. Rufen SiewithConsumerPollingTimeout
auf, um den Wert festzulegen.
Achten Sie darauf, dass receive.buffer.bytes
groß genug ist, um die Größe der Nachrichten zu verarbeiten. Ist der Wert zu niedrig, wird in den Protokollen möglicherweise angezeigt, dass Verbraucher kontinuierlich neu erstellt und auf einen bestimmten Offset gesetzt werden.
Beispiele
In den folgenden Codebeispielen wird gezeigt, wie Dataflow-Pipelines erstellt werden, die Daten aus Kafka lesen.
Aus einem einzelnen Thema lesen
In diesem Beispiel werden Daten aus einem Kafka-Thema gelesen und die Nachrichten-Nutzlasten in Textdateien geschrieben.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Python
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Aus mehreren Themen lesen
In diesem Beispiel werden Daten aus mehreren Kafka-Themen gelesen und für jedes Thema wird eine separate Pipelinelogik angewendet.
Bei komplexeren Anwendungsfällen können Sie dynamisch eine Reihe von KafkaSourceDescriptor
-Objekten übergeben, um die Liste der Themen zu aktualisieren, aus denen gelesen werden soll. Für diesen Ansatz ist Java mit Runner v2 erforderlich.
Python
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.