Lire des données depuis Apache Kafka vers Dataflow

Ce document explique comment lire des données depuis Apache Kafka vers Dataflow. Il inclut des conseils et des bonnes pratiques en termes de performances.

Dans la plupart des cas d'utilisation, envisagez d'utiliser le connecteur d'E/S géré pour effectuer des opérations de lecture à partir de Kafka.

Si vous avez besoin d'un ajustement des performances plus avancé, envisagez d'utiliser le connecteur KafkaIO. Le connecteur KafkaIO est disponible pour Java ou à l'aide du framework de pipelines multilangage pour Python et Go.

Parallélisme

Le parallélisme est limité par deux facteurs: le nombre maximal de nœuds de calcul (max_num_workers) et le nombre de partitions Kafka. Par défaut, Dataflow utilise une distribution ramifiée de parallélisme de 4 x max_num_workers. Toutefois, la distribution ramifiée est limitée par le nombre de partitions. Par exemple, si 100 vCPU sont disponibles, mais que le pipeline ne lit que 10 partitions Kafka, le parallélisme maximal est de 10.

Pour optimiser le parallélisme, nous vous recommandons d'avoir au moins quatre fois la taille de la partition Kafka max_num_workers. Si votre tâche utilise Runner v2, envisagez de définir le parallélisme encore plus haut. Un bon point de départ consiste à définir le nombre de partitions sur deux fois le nombre de processeurs virtuels de nœuds de calcul.

Si vous ne pouvez pas augmenter le nombre de partitions, vous pouvez augmenter le parallélisme en appelant KafkaIO.Read.withRedistribute. Cette méthode ajoute une transformation Redistribute au pipeline, ce qui fournit une indication à Dataflow pour redistribuer et mettre en parallèle les données plus efficacement. Vous pouvez également spécifier le nombre optimal de fragments à utiliser lors de l'étape de redistribution en appelant KafkaIO.Read.withRedistributeNumKeys. Dataflow traite cette valeur comme une indication d'optimisation. La redistribution des données ajoute une surcharge supplémentaire pour effectuer l'étape de brassage. Pour en savoir plus, consultez la section Empêcher la fusion.

Essayez de vous assurer que la charge entre les partitions est relativement uniforme et non biaisée. Si la charge est biaisée, cela peut entraîner une mauvaise utilisation des nœuds de calcul. Les nœuds de calcul qui lisent à partir de partitions avec une charge plus légère peuvent être relativement inactifs, tandis que les nœuds de calcul qui lisent à partir de partitions avec une charge importante peuvent prendre du retard. Dataflow fournit des métriques pour le backlog par partition.

Si la charge est biaisée, l'équilibrage de charge dynamique peut aider à la répartir. Par exemple, Dataflow peut allouer un nœud de calcul pour lire à partir de plusieurs partitions à faible volume et allouer un autre nœud de calcul pour lire à partir d'une seule partition à volume élevé. Toutefois, deux nœuds de calcul ne peuvent pas lire à partir de la même partition. Par conséquent, une partition fortement chargée peut toujours entraîner un retard du pipeline.

Bonnes pratiques

Cette section contient des recommandations pour lire des données depuis Kafka vers Dataflow.

Sujets à faible volume

Un scénario courant consiste à lire de nombreux sujets à faible volume en même temps (par exemple, un sujet par client). Créez des jobs Dataflow distinctes pour chaque sujet n'est pas rentable, car chaque job nécessite au moins un nœud de calcul complet. Envisagez plutôt les options suivantes:

  • Fusionnez les sujets. Combinez les sujets avant qu'ils ne soient ingérés dans Dataflow. L'ingestion de quelques sujets à volume élevé est beaucoup plus efficace que l'ingestion de nombreux sujets à faible volume. Chaque sujet à volume élevé peut être traité par un seul job Dataflow qui utilise pleinement ses nœuds de calcul.

  • Lisez plusieurs sujets. Si vous ne pouvez pas combiner des sujets avant de les ingérer dans Dataflow, envisagez de créer un pipeline qui lit à partir de plusieurs sujets. Cette approche permet à Dataflow d'attribuer plusieurs sujets au même nœud de calcul. Vous pouvez implémenter cette approche de deux façons:

    • Étape de lecture unique. Créez une seule instance du connecteur KafkaIO et configurez-la pour lire plusieurs sujets. Filtrez ensuite par nom de sujet pour appliquer une logique différente par sujet. Pour obtenir un exemple de code, consultez la section Lire à partir de plusieurs sujets. Envisagez cette option si tous vos sujets sont regroupés dans le même cluster. L'inconvénient est que les problèmes liés à un seul récepteur ou à une seule transformation peuvent entraîner l'accumulation de retards pour tous les sujets.

      Pour les cas d'utilisation plus avancés, transmettez un ensemble d'objets KafkaSourceDescriptor qui spécifient les sujets à lire. L'utilisation de KafkaSourceDescriptor vous permet de mettre à jour la liste des sujets ultérieurement, si nécessaire. Cette fonctionnalité nécessite Java avec Runner v2.

    • Étapes de lecture multiples Pour lire des sujets situés dans différents clusters, votre pipeline peut inclure plusieurs instances KafkaIO. Pendant l'exécution du job, vous pouvez mettre à jour des sources individuelles à l'aide de mappages de transformation. La définition d'un nouveau cluster ou d'un nouveau sujet n'est possible qu'avec Runner v2. L'observabilité est un défi potentiel avec cette approche, car vous devez surveiller chaque transformation de lecture individuelle au lieu de vous appuyer sur des métriques au niveau du pipeline.

Committer à nouveau dans Kafka

Par défaut, le connecteur KafkaIO n'utilise pas les décalages Kafka pour suivre la progression et n'effectue pas le commit dans Kafka. Si vous appelez commitOffsetsInFinalize, le connecteur s'efforce de revenir à Kafka après le commit des enregistrements dans Dataflow. Les enregistrements ayant fait l'objet d'un commit dans Dataflow peuvent ne pas être entièrement traités. Par conséquent, si vous annulez le pipeline, un décalage peut être validé sans que les enregistrements soient entièrement traités.

Étant donné que le paramètre enable.auto.commit=True effectue des commits des décalages dès qu'il lit des données à partir de Kafka sans aucun traitement par Dataflow, il n'est pas recommandé d'utiliser cette option. Nous vous recommandons de définir à la fois enable.auto.commit=False et commitOffsetsInFinalize=True. Si vous définissez enable.auto.commit sur True, des données peuvent être perdues si le pipeline est interrompu pendant le traitement. Les enregistrements déjà validés sur Kafka peuvent être supprimés.

Filigranes

Par défaut, le connecteur KafkaIO utilise le temps de traitement actuel pour attribuer le filigrane de sortie et l'heure de l'événement. Pour modifier ce comportement, appelez withTimestampPolicyFactory et attribuez un paramètre TimestampPolicy. Beam fournit des implémentations de TimestampPolicy qui calculent le filigrane en fonction de l'heure d'ajout au journal de Kafka ou de l'heure de création du message.

Considérations liées aux exécuteurs

Le connecteur KafkaIO comporte deux implémentations sous-jacentes pour les lectures Kafka, l'ancien paramètre ReadFromKafkaViaUnbounded et le nouveau paramètre ReadFromKafkaViaSDF. Dataflow choisit automatiquement la meilleure implémentation pour votre job en fonction de la langue de votre SDK et des exigences du job. Évitez de demander explicitement un exécuteur ou une implémentation de Kafka, sauf si vous avez besoin de fonctionnalités spécifiques disponibles uniquement dans cette implémentation. Pour en savoir plus sur le choix d'un exécuteur, consultez la section Utiliser l'exécuteur Dataflow v2.

Si votre pipeline utilise withTopic ou withTopics, l'ancienne implémentation interroge Kafka au moment de la création du pipeline pour les partitions disponibles. La machine qui crée le pipeline doit être autorisée à se connecter à Kafka. Si vous recevez une erreur d'autorisation, vérifiez que vous disposez des autorisations nécessaires pour vous connecter à Kafka localement. Vous pouvez éviter ce problème en utilisant withTopicPartitions, qui ne se connecte pas à Kafka au moment de la création du pipeline.

Déployer en production

Lorsque vous déployez votre solution en production, nous vous recommandons d'utiliser des modèles Flex. En utilisant un modèle Flex, le pipeline est lancé à partir d'un environnement cohérent, ce qui peut aider à atténuer les problèmes de configuration locaux.

La journalisation à partir de KafkaIO peut être assez verbeuse. Envisagez de réduire le niveau de journalisation en production, comme suit :

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Pour en savoir plus, consultez Définir les niveaux de journalisation des nœuds de calcul d'un pipeline.

Configurer la mise en réseau

Par défaut, Dataflow lance les instances au sein de votre réseau cloud privé virtuel (VPC) par défaut. Selon votre configuration Kafka, vous devrez peut-être configurer un réseau et un sous-réseau différents pour Dataflow. Pour plus d'informations, consultez la section Spécifier un réseau et un sous-réseau. Lors de la configuration de votre réseau, créez des règles de pare-feu permettant aux machines de nœud de calcul Dataflow d'atteindre les agents Kafka.

Si vous utilisez VPC Service Controls, placez le cluster Kafka dans le périmètre VPC Service Controls, ou étendez les périmètres au VPN autorisé ou à Cloud Interconnect.

Si votre cluster Kafka est déployé en dehors de Google Cloud, vous devez créer une connexion réseau entre Dataflow et le cluster Kafka. Il existe plusieurs options de mise en réseau avec différents compromis :

Dedicated Interconnect est la meilleure option pour obtenir des performances et une fiabilité prévisibles. Toutefois, sa mise en place peut prendre plus de temps que pour les autres options, car des organisations tierces doivent approvisionner les nouveaux circuits. Avec une topologie basée sur une adresse IP publique, vous pouvez commencer rapidement à travailler, car la mise en réseau ne nécessite que peu d'efforts.

Les deux sections suivantes décrivent ces options plus en détail.

Espace d'adressage RFC 1918 partagé

Dedicated Interconnect et le VPN IPsec vous donnent tous deux un accès direct aux adresses IP RFC 1918 de votre cloud privé virtuel (VPC), ce qui peut simplifier la configuration de Kafka. Si vous utilisez une topologie basée sur un VPN, envisagez de configurer un VPN à haut débit.

Par défaut, Dataflow lance les instances sur votre réseau VPC par défaut. Dans la topologie d'un réseau privé avec des routes explicitement définies dans Cloud Router qui connectent des sous-réseaux de Google Cloud à ce cluster Kafka, vous avez besoin de contrôler davantage l'emplacement des instances Dataflow. Vous pouvez utiliser Dataflow pour configurer les paramètres d'exécution de network et subnetwork.

Assurez-vous que le sous-réseau correspondant dispose de suffisamment d'adresses IP disponibles pour permettre à Dataflow de lancer des instances lorsqu'il tente d'effectuer un scaling horizontal. De même, lorsque vous créez un réseau distinct pour le lancement de vos instances Dataflow, assurez-vous qu'une règle de pare-feu active le trafic TCP entre toutes les machines virtuelles du projet. Cette règle de pare-feu est déjà configurée sur le réseau par défaut.

Espace d'adressage IP public

Cette architecture utilise Transport Layer Security (TLS) pour sécuriser le trafic entre les clients externes et Kafka, et utilise du trafic non chiffré pour les communications entre agents. Lorsque l'écouteur Kafka se lie à une interface réseau utilisée à la fois pour la communication interne et externe, la configuration de l'écouteur s'effectue facilement. Toutefois, dans de nombreux scénarios, les adresses présentées en externe des agents Kafka du cluster diffèrent des interfaces réseau internes utilisées par Kafka. Dans de tels scénarios, vous pouvez utiliser la propriété advertised.listeners :

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Les clients externes se connectent à l'aide du port 9093 via un canal SSL, et les clients internes à l'aide du port 9092 via un canal de texte brut. Lorsque vous spécifiez une adresse sous advertised.listeners, utilisez des noms DNS (kafkabroker-n.mydomain.com, dans cet exemple) qui correspondent à la même instance pour le trafic externe et interne. L'utilisation d'adresses IP publiques peut ne pas fonctionner, car les adresses risquent de ne pas être résolues pour le trafic interne.

Régler Kafka

Les paramètres de votre cluster Kafka et de votre client Kafka peuvent avoir un impact important sur les performances. En particulier, les paramètres suivants peuvent être trop bas. Cette section propose des points de départ, mais vous devez tester ces valeurs en fonction de votre charge de travail.

  • unboundedReaderMaxElements. La valeur par défaut est 10 000. Une valeur plus élevée, telle que 100 000, peut augmenter la taille des groupes, ce qui peut améliorer considérablement les performances si votre pipeline inclut des agrégations. Toutefois, une valeur plus élevée peut également augmenter la latence. Pour définir la valeur, utilisez setUnboundedReaderMaxElements. Ce paramètre ne s'applique pas à Runner v2.

  • unboundedReaderMaxReadTimeMs. La valeur par défaut est de 10 000 ms. Une valeur plus élevée, par exemple 20 000 ms, peut augmenter la taille du groupe, tandis qu'une valeur inférieure, par exemple 5 000 ms, peut réduire la latence ou le backlog. Pour définir la valeur, utilisez setUnboundedReaderMaxReadTimeMs. Ce paramètre ne s'applique pas à Runner v2.

  • max.poll.records. La valeur par défaut est de 500. Une valeur plus élevée peut être plus efficace en récupérant plus d'enregistrements entrants ensemble, en particulier lorsque vous utilisez Runner v2. Pour définir la valeur, appelez withConsumerConfigUpdates.

  • fetch.max.bytes. La valeur par défaut est de 1 Mo. Une valeur plus élevée peut améliorer le débit en réduisant le nombre de requêtes, en particulier lorsque vous utilisez Runner V2. Toutefois, si elle est trop élevée, la latence peut augmenter, bien que le traitement en aval soit plus susceptible d'être le principal goulot d'étranglement. La valeur de départ recommandée est de 100 Mo. Pour définir la valeur, appelez withConsumerConfigUpdates.

  • max.partition.fetch.bytes. La valeur par défaut est de 1 Mo. Ce paramètre définit la quantité maximale de données par partition que le serveur renvoie. Augmenter cette valeur peut améliorer le débit en réduisant le nombre de requêtes, en particulier lorsque vous utilisez Runner v2. Toutefois, si elle est trop élevée, la latence peut augmenter, bien que le traitement en aval soit plus susceptible d'être le principal goulot d'étranglement. La valeur de départ recommandée est de 100 Mo. Pour définir la valeur, appelez withConsumerConfigUpdates.

  • consumerPollingTimeout. La valeur par défaut est de 2 secondes. Si le client consommateur expire avant de pouvoir lire des enregistrements, essayez de définir une valeur plus élevée. Ce paramètre est le plus souvent pertinent lorsque vous effectuez des lectures interrégionales ou des lectures avec un réseau lent. Pour définir la valeur, appelez withConsumerPollingTimeout.

Assurez-vous que receive.buffer.bytes est suffisamment grand pour gérer la taille des messages. Si la valeur est trop faible, les journaux peuvent indiquer que les consommateurs sont continuellement recréés et recherchent un décalage spécifique.

Exemples

Les exemples de code suivants montrent comment créer des pipelines Dataflow qui lisent à partir de Kafka. Lorsque vous utilisez les identifiants par défaut de l'application avec le gestionnaire de rappel fourni par Google Cloud Managed Service pour Apache Kafka, la version 3.7.0 ou ultérieure de kafka-clients est requise.

Lire à partir d'un seul sujet

Cet exemple utilise le connecteur d'E/S géré. Il montre comment lire à partir d'un sujet Kafka et écrire les charges utiles de message dans des fichiers texte.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Lire à partir de plusieurs sujets

Cet exemple utilise le connecteur KafkaIO. Il montre comment lire à partir de plusieurs sujets Kafka et appliquer une logique de pipeline distincte pour chaque sujet.

Pour des cas d'utilisation plus avancés, transmettez de manière dynamique un ensemble d'objets KafkaSourceDescriptor afin de pouvoir mettre à jour la liste des sujets à lire. Cette approche nécessite Java avec Runner v2.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

Étape suivante