Leggere da Apache Kafka in Dataflow

Questo documento descrive come leggere i dati da Apache Kafka in Dataflow e include suggerimenti e best practice per le prestazioni.

Per la maggior parte dei casi d'uso, ti consigliamo di utilizzare il connettore I/O gestito per leggere da Kafka.

Se hai bisogno di una ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettoreKafkaIO. Il connettore KafkaIO è disponibile per Java o utilizzando il framework di pipeline multilingue per Python e Go.

Parallelismo

Il parallelismo è limitato da due fattori: il numero massimo di worker (max_num_workers) e il numero di partizioni Kafka. Per impostazione predefinita, Dataflow utilizza un fanout di parallelismo pari a 4 x max_num_workers. Tuttavia, il fanout è limitato dal numero di partizioni. Ad esempio, se sono disponibili 100 vCPU, ma la pipeline legge solo da 10 partizioni Kafka, il parallelismo massimo è 10.

Per massimizzare il parallelismo, è consigliabile avere almeno 4 x max_num_workers partizioni Kafka. Se il tuo job utilizza Runner v2, valuta la possibilità di impostare un parallelismo ancora più elevato. Un buon punto di partenza è avere partizioni pari al doppio del numero di vCPU worker.

Se non puoi aumentare il numero di partizioni, puoi aumentare il parallelismo chiamando KafkaIO.Read.withRedistribute. Questo metodo aggiunge una trasformazione Redistribute alla pipeline, che fornisce un suggerimento a Dataflow per ridistribuire e parallelizzare i dati in modo più efficiente. Puoi anche specificare il numero ottimale di shard da utilizzare nel passaggio di ridistribuzione chiamando KafkaIO.Read.withRedistributeNumKeys. Dataflow tratta questo valore come un suggerimento di ottimizzazione. La ridistribuzione dei dati comporta un sovraccarico aggiuntivo per eseguire il passaggio di ordinamento casuale. Per ulteriori informazioni, consulta Impedire la fusione.

Cerca di assicurarti che il carico tra le partizioni sia relativamente uniforme e non scompensato. Se il carico è sbilanciato, può comportare un cattivo utilizzo dei worker. I worker che leggono da partizioni con un carico più leggero potrebbero essere relativamente inattivi, mentre quelli che leggono da partizioni con un carico elevato potrebbero rimanere indietro. Dataflow fornisce metriche per il backlog per partizione.

Se il carico è sbilanciato, il bilanciamento dinamico del lavoro può contribuire a distribuirlo. Ad esempio, Dataflow potrebbe allocare un worker per leggere da più partizioni a volume ridotto e un altro per leggere da una singola partizione a volume elevato. Tuttavia, due worker non possono leggere dalla stessa partizione, quindi una partizione con un carico elevato può comunque causare il ritardo della pipeline.

Best practice

Questa sezione contiene consigli per la lettura da Kafka in Dataflow.

Argomenti con volume ridotto

Uno scenario comune è leggere contemporaneamente da molti argomenti a basso volume, ad esempio un argomento per cliente. La creazione di job Dataflow distinti per ogni argomento non è conveniente in termini di costi, perché ogni job richiede almeno un worker completo. Valuta invece le seguenti opzioni:

  • Unisci gli argomenti. Combina gli argomenti prima che vengano importati in Dataflow. L'importazione di pochi argomenti ad alto volume è molto più efficiente rispetto all'importazione di molti argomenti a basso volume. Ogni argomento ad alto volume può essere gestito da un singolo job Dataflow che utilizza completamente i suoi worker.

  • Leggi più argomenti. Se non riesci a combinare gli argomenti prima di importarli in Dataflow, ti consigliamo di creare una pipeline che legga da più argomenti. Questo approccio consente a Dataflow di assegnare diversi argomenti allo stesso worker. Esistono due modi per implementare questo approccio:

    • Passaggio di lettura singola. Crea una singola istanza del connettore KafkaIO e configurala in modo che legga più argomenti. Quindi filtra per nome dell'argomento per applicare una logica diversa per ogni argomento. Per un codice di esempio, consulta Leggere da più argomenti. Valuta questa opzione se tutti i tuoi argomenti sono collocati nello stesso cluster. Uno svantaggio è che i problemi con un singolo sink o una singola trasformazione potrebbero causare l'accumulo di backlog per tutti gli argomenti.

      Per casi d'uso più avanzati, passa un insieme di oggetti KafkaSourceDescriptor che specificano gli argomenti da leggere. L'utilizzo di KafkaSourceDescriptor ti consente di aggiornare l'elenco di argomenti in un secondo momento, se necessario. Questa funzionalità richiede Java con Runner v2.

    • Più passaggi di lettura. Per leggere da argomenti in cluster diversi, la pipeline può includere diverse istanze di KafkaIO. Durante l'esecuzione del job, puoi aggiornare le singole origini utilizzando le mappature delle trasformazioni. L'impostazione di un nuovo argomento o cluster è supportata solo quando utilizzi Runner v2. L'osservabilità è una potenziale sfida con questo approccio, perché devi monitorare ogni singola trasformazione di lettura anziché fare affidamento su metriche a livello di pipeline.

Eseguire il commit di nuovo in Kafka

Per impostazione predefinita, il connettore KafkaIO non utilizza gli offset Kafka per monitorare l'avanzamento e non esegue il commit in Kafka. Se chiami commitOffsetsInFinalize, il connettore fa del suo meglio per eseguire il commit in Kafka dopo che i record sono stati sottoposti a commit in Dataflow. I record committati in Dataflow potrebbero non essere completamente elaborati, pertanto se annulli la pipeline, un offset potrebbe essere committato senza che i record vengano mai completamente elaborati.

Poiché l'impostazione enable.auto.commit=True esegue il commit degli offset non appena vengono letti da Kafka senza alcuna elaborazione da parte di Dataflow, l'utilizzo di questa opzione non è consigliato. Ti consigliamo di impostare sia enable.auto.commit=False che commitOffsetsInFinalize=True. Se imposti enable.auto.commit su True, i dati possono andare persi se la pipeline viene interrotta durante l'elaborazione. I record già committati su Kafka potrebbero essere eliminati.

Filigrane

Per impostazione predefinita, il connettore KafkaIO utilizza il tempo di elaborazione corrente per assegnare la marcatura temporale in uscita e la data e l'ora dell'evento. Per modificare questo comportamento, chiama withTimestampPolicyFactory e assegna un TimestampPolicy. Beam fornisce implementazioni di TimestampPolicy che calcolano la marcatura temporale in base all'ora di accodamento del log di Kafka o all'ora di creazione del messaggio.

Considerazioni sul runner

Il connettore KafkaIO ha due implementazioni sottostanti per le letture di Kafka, la precedente ReadFromKafkaViaUnbounded e la più recente ReadFromKafkaViaSDF. Dataflow sceglie automaticamente l'implementazione migliore per il tuo job in base al linguaggio SDK e ai requisiti del job. Evita di richiedere esplicitamente un'implementazione di Kafka o di un runner, a meno che tu non abbia bisogno di funzionalità specifiche disponibili solo in quella implementazione. Per ulteriori informazioni sulla scelta di un runner, consulta Utilizzare Dataflow Runner v2.

Se la pipeline utilizza withTopic o withTopics, l'implementazione precedente esegue query su Kafka al momento della compilazione della pipeline per le partizioni disponibili. La macchina che crea la pipeline deve disporre dell'autorizzazione per connettersi a Kafka. Se ricevi un errore di autorizzazione, verifica di disporre delle autorizzazioni per connetterti a Kafka localmente. Puoi evitare questo problema utilizzando withTopicPartitions, che non si connette a Kafka al momento della creazione della pipeline.

Distribuzione in produzione

Quando esegui il deployment della soluzione in produzione, ti consigliamo di utilizzare modelli flessibili. Se utilizzi un modello flessibile, la pipeline viene lanciata da un ambiente coerente, il che può contribuire ad attenuare i problemi di configurazione locale.

I log di KafkaIO possono essere piuttosto dettagliati. Valuta la possibilità di ridurre il livello di registrazione in produzione come segue:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Per ulteriori informazioni, consulta Impostare i livelli di log dei worker della pipeline.

Configurazione del networking

Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per saperne di più, consulta la sezione Specificare una rete e una sottorete. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.

Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o a Cloud Interconnect autorizzati.

Se il cluster Kafka è dipiegato al di fuori di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di rete con diversi compromessi:

L'interconnessione dedicata è l'opzione migliore per prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblico, puoi iniziare rapidamente perché non è richiesta molta configurazione di rete.

Le due sezioni seguenti descrivono queste opzioni in maggiore dettaglio.

Spazio di indirizzi RFC 1918 condiviso

Sia l'interconnessione dedicata che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, ti consigliamo di configurare una VPN a velocità effettiva elevata.

Per impostazione predefinita, Dataflow avvia le istanze sulla rete VPC predefinita. In una topologia di rete privata con route definiti esplicitamente in Cloud Router che connettono le sottoreti in Google Cloud al cluster Kafka, hai bisogno di più controllo su dove posizionare le istanze Dataflow. Puoi utilizzare Dataflow per configurare i parametri di esecuzione network e subnetwork.

Assicurati che la sottorete corrispondente abbia un numero sufficiente di indirizzi IP disponibili su cui Dataflow possa avviare istanze quando tenta di eseguire il ridimensionamento orizzontale. Inoltre, quando crei una rete separata per l'avvio delle istanze Dataflow, assicurati di avere una regola firewall che consenta il traffico TCP tra tutte le macchine virtuali del progetto. La rete predefinita ha già configurato questa regola firewall.

Spazio degli indirizzi IP pubblici

Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra broker. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente degli broker Kafka nel cluster sono diversi dalle interfacce di rete interne utilizzate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

I client esterni si connettono tramite la porta 9093 tramite un canale "SSL", mentre i client interni si connettono tramite la porta 9092 tramite un canale in testo non cifrato. Quando specifichi un indirizzo in advertised.listeners, utilizza nomi DNS (kafkabroker-n.mydomain.com in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare perché potrebbero non essere risolti per il traffico interno.

Ottimizza Kafka

Le impostazioni del cluster Kafka e del client Kafka possono avere un impatto significativo sul funzionamento. In particolare, le seguenti impostazioni potrebbero essere troppo basse. Questa sezione fornisce alcuni punti di partenza suggeriti, ma dovresti fare esperimenti con questi valori per il tuo carico di lavoro specifico.

  • unboundedReaderMaxElements. Il valore predefinito è 10.000. Un valore più elevato, ad esempio 100.000, può aumentare le dimensioni dei bundle, il che può migliorare notevolmente le prestazioni se la pipeline include aggregazioni. Tuttavia, un valore più elevato potrebbe anche aumentare la latenza. Per impostare il valore, utilizza setUnboundedReaderMaxElements. Questa impostazione non si applica a Runner v2.

  • unboundedReaderMaxReadTimeMs. Il valore predefinito è 10.000 ms. Un valore più elevato, come 20.000 ms, può aumentare le dimensioni del bundle, mentre un valore più basso, come 5000 ms, può ridurre la latenza o l'accumulo. Per impostare il valore, utilizza setUnboundedReaderMaxReadTimeMs. Questa impostazione non si applica a Runner v2.

  • max.poll.records. Il valore predefinito è 500. Un valore più elevato potrebbe avere un rendimento migliore recuperando più record in entrata contemporaneamente, in particolare se utilizzi Runner 2. Per impostare il valore, chiama withConsumerConfigUpdates.

  • fetch.max.bytes. Il valore predefinito è 1 MB. Un valore più elevato potrebbe migliorare il throughput riducendo il numero di richieste, in particolare quando si utilizza Runner 2. Tuttavia, impostarlo su un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia rappresentato dall'elaborazione a valle. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiama withConsumerConfigUpdates.

  • max.partition.fetch.bytes. Il valore predefinito è 1 MB. Questo parametro imposta la quantità massima di dati per partizione restituita dal server. L'aumento del valore può migliorare il throughput riducendo il numero di richieste, in particolare quando si utilizza Runner 2. Tuttavia, un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia l'elaborazione a valle. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiama withConsumerConfigUpdates.

  • consumerPollingTimeout. Il valore predefinito è 2 secondi. Se il client consumer scade prima di poter leggere i record, prova a impostare un valore più alto. Questa impostazione è spesso pertinente quando vengono eseguite letture tra regioni o letture con una rete lenta. Per impostare il valore, chiama withConsumerPollingTimeout.

Assicurati che receive.buffer.bytes sia abbastanza grande da gestire le dimensioni dei messaggi. Se il valore è troppo piccolo, i log potrebbero indicare che i consumatori vengono continuamente ricreati e cercano un offset specifico.

Esempi

Gli esempi di codice riportati di seguito mostrano come creare pipeline Dataflow che leggono da Kafka. Quando utilizzi le credenziali predefinite dell'applicazione in combinazione con il gestore dei callback fornito da Google Cloud Managed Service per Apache Kafka, è necessaria la versione 3.7.0 o successive di kafka-clients.

Leggere da un singolo argomento

Questo esempio utilizza il connettore I/O gestito. Mostra come leggere da un argomento Kafka e scrivere i payload dei messaggi in file di testo.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Leggere da più argomenti

Questo esempio utilizza il connettore KafkaIO. Mostra come leggere da più argomenti Kafka e applicare una logica di pipeline separata per ogni argomento.

Per casi d'uso più avanzati, passa dinamicamente un insieme di oggetti KafkaSourceDescriptor, in modo da poter aggiornare l'elenco di argomenti da leggere. Questo approccio richiede Java con Runner v2.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

Passaggi successivi