Ce document explique comment écrire des données depuis Dataflow vers Apache Kafka.
Le connecteur d'E/S Apache Beam Kafka (KafkaIO
) est nativement disponible pour Java, et est également disponible pour Python et Go à l'aide du framework de pipelines multilangage Apache Beam.
Pour les pipelines Java, envisagez d'utiliser le connecteur d'E/S géré pour effectuer des opérations de lecture à partir de Kafka.
Traitement de type "exactement une fois"
Par défaut, le connecteur KafkaIO
ne propose pas de sémantique de type "exactement une fois" pour les opérations d'écriture. Cela signifie que les données peuvent être écrites plusieurs fois dans votre sujet Kafka. Pour activer les écritures de type "exactement une fois", appelez la méthode withEOS
. Les écritures de type "exactement une fois" garantissent que les données sont écrites précisément une seule fois dans le sujet Kafka de destination.
Toutefois, cela augmente également le coût du pipeline et réduit le débit.
Si vous n'avez pas d'exigences strictes concernant la sémantique de type "exactement une fois", et que la logique de votre pipeline peut gérer les enregistrements en double, envisagez d'activer le mode "au moins une fois" pour l'ensemble du pipeline afin de réduire les coûts. Pour en savoir plus, consultez la section Définir le mode de traitement par flux du pipeline.
Drainages de pipeline
La sémantique de type "exactement une fois" n'est pas garantie si vous videz le pipeline. La seule garantie est qu'aucune donnée confirmée n'est perdue. Par conséquent, certaines données peuvent être traitées pendant le drainage du pipeline, sans que les décalages de lecture ne soient validés dans Kafka. Pour obtenir une sémantique de type "au moins une fois" pour Kafka lorsque vous modifiez un pipeline, mettez à jour le pipeline au lieu d'annuler le job et d'en démarrer une autre.
Régler Kafka pour la sémantique de type "exactement une fois"
L'ajustement de transaction.max.timeout.ms
et de transactional.id.expiration.ms
peut compléter votre stratégie globale de tolérance aux pannes et votre stratégie de distribution de type "exactement une fois".
Toutefois, leur impact dépend de la nature de l'indisponibilité et de votre configuration spécifique. Définissez transaction.max.timeout.ms
sur une valeur proche de la durée de conservation de vos sujets Kafka, pour éviter la duplication de données résultant des indisponibilités de l'agent Kafka.
Si un courtier Kafka devient temporairement indisponible (par exemple, en raison d'une défaillance sur une partition réseau ou un nœud) et qu'un producteur a des transactions en cours, celles-ci peuvent expirer. L'augmentation de la valeur de transaction.max.timeout.ms
permet aux transactions de disposer d'un délai étendu après la récupération du courtier, ce qui évite de devoir potentiellement relancer les transactions et renvoyer des messages. Cette solution permet indirectement de conserver la sémantique de type "exactement une fois", en réduisant le risque de messages en double résultant de transactions relancées. En revanche, un délai d'expiration plus court peut permettre d'éliminer plus rapidement les ID de transaction inactifs, ce qui réduit l'utilisation potentielle des ressources.
Configurer la mise en réseau
Par défaut, Dataflow lance les instances au sein de votre réseau cloud privé virtuel (VPC) par défaut. Selon votre configuration Kafka, vous devrez peut-être configurer un réseau et un sous-réseau différents pour Dataflow. Pour plus d'informations, consultez la section Spécifier un réseau et un sous-réseau. Lors de la configuration de votre réseau, créez des règles de pare-feu permettant aux machines de nœud de calcul Dataflow d'atteindre les agents Kafka.
Si vous utilisez VPC Service Controls, placez le cluster Kafka dans le périmètre VPC Service Controls, ou étendez les périmètres au VPN autorisé ou à Cloud Interconnect.
Si votre cluster Kafka est déployé en dehors de Google Cloud, vous devez créer une connexion réseau entre Dataflow et le cluster Kafka. Il existe plusieurs options de mise en réseau avec différents compromis :
- Connectez-vous à l'aide d'un espace d'adressage RFC 1918 partagé en utilisant l'un des éléments suivants :
- Atteignez votre cluster Kafka hébergé en externe via des adresses IP publiques en utilisant l'une des méthodes suivantes :
- Internet public
- Appairage direct
- Appairage opérateur
Dedicated Interconnect est la meilleure option pour obtenir des performances et une fiabilité prévisibles. Toutefois, sa mise en place peut prendre plus de temps que pour les autres options, car des organisations tierces doivent approvisionner les nouveaux circuits. Avec une topologie basée sur une adresse IP publique, vous pouvez commencer rapidement à travailler, car la mise en réseau ne nécessite que peu d'efforts.
Les deux sections suivantes décrivent ces options plus en détail.
Espace d'adressage RFC 1918 partagé
Dedicated Interconnect et le VPN IPsec vous donnent tous deux un accès direct aux adresses IP RFC 1918 de votre cloud privé virtuel (VPC), ce qui peut simplifier la configuration de Kafka. Si vous utilisez une topologie basée sur un VPN, envisagez de configurer un VPN à haut débit.
Par défaut, Dataflow lance les instances sur votre réseau VPC par défaut. Dans la topologie d'un réseau privé avec des routes explicitement définies dans Cloud Router qui connectent des sous-réseaux de Google Cloud à ce cluster Kafka, vous avez besoin de contrôler davantage l'emplacement des instances Dataflow. Vous pouvez utiliser Dataflow pour configurer les paramètres d'exécution de network
et subnetwork
.
Assurez-vous que le sous-réseau correspondant dispose de suffisamment d'adresses IP disponibles pour permettre à Dataflow de lancer des instances lorsqu'il tente d'effectuer un scaling horizontal. De même, lorsque vous créez un réseau distinct pour le lancement de vos instances Dataflow, assurez-vous qu'une règle de pare-feu active le trafic TCP entre toutes les machines virtuelles du projet. Cette règle de pare-feu est déjà configurée sur le réseau par défaut.
Espace d'adressage IP public
Cette architecture utilise Transport Layer Security (TLS) pour sécuriser le trafic entre les clients externes et Kafka, et utilise du trafic non chiffré pour les communications entre agents. Lorsque l'écouteur Kafka se lie à une interface réseau utilisée à la fois pour la communication interne et externe, la configuration de l'écouteur s'effectue facilement. Toutefois, dans de nombreux scénarios, les adresses présentées en externe des agents Kafka du cluster diffèrent des interfaces réseau internes utilisées par Kafka. Dans de tels scénarios, vous pouvez utiliser la propriété advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Les clients externes se connectent à l'aide du port 9093 via un canal SSL, et les clients internes à l'aide du port 9092 via un canal de texte brut. Lorsque vous spécifiez une adresse sous advertised.listeners
, utilisez des noms DNS (kafkabroker-n.mydomain.com
, dans cet exemple) qui correspondent à la même instance pour le trafic externe et interne. L'utilisation d'adresses IP publiques peut ne pas fonctionner, car les adresses risquent de ne pas être résolues pour le trafic interne.
Journalisation
La journalisation à partir de KafkaIO
peut être assez verbeuse. Envisagez de réduire le niveau de journalisation en production, comme suit :
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Pour en savoir plus, consultez Définir les niveaux de journalisation des nœuds de calcul d'un pipeline.