Dataflow-Daten in Apache Kafka schreiben

In diesem Dokument wird beschrieben, wie Sie Daten aus Dataflow in Apache Kafka schreiben.

Der Apache Beam Kafka-E/A-Connector (KafkaIO) ist nativ für Java verfügbar und kann mit dem mehrsprachigen Pipeline-Framework von Apache Beam auch für Python und Go verwendet werden.

Verwenden Sie bei Java-Pipelines den verwalteten E/A-Connector, um Daten aus Kafka zu lesen.

Genau einmalige Verarbeitung

Standardmäßig bietet der KafkaIO-Connector keine „Genau einmal“-Semantik für Schreibvorgänge. Das bedeutet, dass Daten möglicherweise mehrmals in Ihr Kafka-Thema geschrieben werden. Rufen Sie die Methode withEOS auf, um „Genau einmal“-Schreibvorgänge zu ermöglichen. Bei Schreibvorgängen vom Typ „Genau einmal“ wird sichergestellt, dass Daten genau einmal in das Ziel-Kafka-Thema geschrieben werden. Dies erhöht jedoch auch die Pipelinekosten und verringert den Durchsatz.

Wenn Sie keine strengen Anforderungen an die „Genau einmal“-Semantik haben und die Logik in Ihrer Pipeline doppelte Einträge verarbeiten kann, sollten Sie den Modus „Mindestens einmal“ für die gesamte Pipeline aktivieren, um die Kosten zu senken. Weitere Informationen finden Sie unter Pipeline-Streamingmodus festlegen.

Pipeline leeren

Wenn Sie die Pipeline leeren, kann die „Genau einmal“-Semantik nicht garantiert werden. Es kann nur garantiert werden, dass keine bestätigten Daten verloren gehen. Daher werden einige Daten möglicherweise verarbeitet, während die Pipeline geleert wird, ohne dass Leseoffsets an Kafka zurückgegeben werden. Wenn Sie beim Ändern einer Pipeline die „Mindestens einmal“-Semantik für Kafka erreichen möchten, aktualisieren Sie die Pipeline, anstatt den Job abzubrechen und einen neuen zu starten.

Kafka für die „Genau einmal“-Semantik optimieren

Durch die Anpassung von transaction.max.timeout.ms und transactional.id.expiration.ms können Sie Ihre allgemeine Fehlertoleranz und Ihre Strategie für die Zustellung genau einmal ergänzen. Die Auswirkungen hängen jedoch von der Art des Ausfalls und Ihrer spezifischen Konfiguration ab. Legen Sie transaction.max.timeout.ms nahe an der Aufbewahrungszeit Ihrer Kafka-Themen fest, um Datenduplikate aufgrund von Ausfällen von Kafka-Brokern zu vermeiden.

Wenn ein Kafka-Broker vorübergehend nicht verfügbar ist (z. B. aufgrund einer Netzwerkpartition oder eines Knotenausfalls) und ein Producer laufende Transaktionen hat, kann es zu einem Zeitüberschreitungsfehler bei diesen Transaktionen kommen. Wenn Sie den Wert von transaction.max.timeout.ms erhöhen, haben Transaktionen nach der Wiederherstellung eines Brokers mehr Zeit zum Abschließen. Dadurch müssen Transaktionen möglicherweise nicht neu gestartet und Nachrichten nicht noch einmal gesendet werden. Diese Maßnahme trägt indirekt dazu bei, die „Genau einmal“-Semantik beizubehalten, da die Wahrscheinlichkeit von doppelten Nachrichten durch Transaktionsneustarts verringert wird. Eine kürzere Gültigkeitsdauer kann jedoch dazu beitragen, inaktive Transaktions-IDs schneller zu bereinigen und so die potenzielle Ressourcennutzung zu reduzieren.

Netzwerk konfigurieren

Normalerweise startet Dataflow Instanzen in Ihrem VPC-Standardnetzwerk (Virtual Private Cloud). Je nach Kafka-Konfiguration müssen Sie möglicherweise ein anderes Netzwerk und Subnetz für Dataflow konfigurieren. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben. Erstellen Sie beim Konfigurieren Ihres Netzwerks Firewallregeln, mit denen die Dataflow-Worker-Maschinen die Kafka-Broker erreichen können.

Wenn Sie VPC Service Controls verwenden, platzieren Sie den Kafka-Cluster innerhalb des VPC Service Controls-Perimeters oder weiten Sie die Perimeter auf das autorisierte VPN oder Cloud Interconnect aus.

Wenn Ihr Kafka-Cluster außerhalb von Google Cloud bereitgestellt wird, müssen Sie eine Netzwerkverbindung zwischen Dataflow und dem Kafka-Cluster erstellen. Es gibt mehrere Netzwerkoptionen mit unterschiedlichen Vor- und Nachteilen:

Dedicated Interconnect ist die beste Option für eine vorhersagbare Leistung und Zuverlässigkeit. Die Einrichtung kann jedoch länger dauern, da die neuen Verbindungen durch Dritte bereitgestellt werden müssen. Mit einer Topologie auf Basis einer öffentlichen IP-Adresse können Sie schnell starten, da nur wenig Netzwerkarbeit erforderlich ist.

In den nächsten beiden Abschnitten werden diese Optionen ausführlicher beschrieben.

Freigegebener RFC 1918-Adressraum

Sowohl Dedicated Interconnect als auch IPsec-VPN bieten Ihnen direkten Zugriff auf RFC 1918-IP-Adressen in Ihrer Virtual Private Cloud (VPC), was Ihre Kafka-Konfiguration vereinfachen kann. Wenn Sie eine VPN-basierte Topologie verwenden, sollten Sie ein VPN mit hohem Durchsatz erstellen.

Normalerweise startet Dataflow Instanzen in Ihrem standardmäßigen VPC-Netzwerk. In einer privaten Netzwerktopologie mit explizit in Cloud Router definierten Routen, die Subnetzwerke in Google Cloud mit diesem Kafka-Cluster verbinden, müssen Sie die Standorte Ihrer Dataflow-Instanzen besser steuern können. Sie können mit Dataflow die Ausführungsparameter network und subnetwork konfigurieren.

Achten Sie darauf, dass das entsprechende Subnetzwerk über genügend IP-Adressen verfügt, damit Dataflow Instanzen beim horizontalen Skalieren starten kann. Wenn Sie zum Starten der Dataflow-Instanzen ein separates Netzwerk erstellen, ist außerdem wichtig, dass Sie eine Firewallregel haben, die TCP-Traffic zwischen allen virtuellen Maschinen im Projekt zulässt. Im Standardnetzwerk ist diese Firewallregel bereits konfiguriert.

Öffentlicher IP-Adressraum

Diese Architektur verwendet zum Schutz des Traffics zwischen externen Clients und Kafka Transport Layer Security (TLS). Die Kommunikation zwischen den Brokern erfolgt in Klartext. Wenn sich der Kafka-Listener an eine Netzwerkschnittstelle bindet, die sowohl für die interne als auch für die externe Kommunikation verwendet wird, ist die Konfiguration des Listeners unkompliziert. In vielen Szenarien unterscheiden sich die extern veröffentlichten Adressen der Kafka-Broker im Cluster jedoch von den internen Netzwerkschnittstellen, die Kafka verwendet. In solchen Szenarien können Sie das Attribut advertised.listeners verwenden:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Externe Clients verbinden sich über Port 9093 über einen „SSL“-Kanal her und interne Clients verbinden sich über einen Klartextkanal über Port 9092. Wenn Sie unter advertised.listeners eine Adresse angeben, verwenden Sie DNS-Namen (in diesem Beispiel kafkabroker-n.mydomain.com), die für externen und internen Traffic in dieselbe Instanz aufgelöst werden. Öffentliche IP-Adressen funktionieren eventuell nicht, da die Adressen möglicherweise für den internen Traffic nicht richtig aufgelöst werden.

Logging

Das Logging von KafkaIO kann sehr ausführlich sein. Sie können die Loggingebene in der Produktion so reduzieren:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Weitere Informationen finden Sie unter Logebenen für Pipeline-Worker festlegen.