Leggi da Apache Kafka a Dataflow

Questo documento descrive come leggere i dati da Apache Kafka in Dataflow.

Il connettore I/O Apache Beam Kafka (KafkaIO) è disponibile in modo nativo per Java, e anche per Python e Go utilizzando il framework di pipeline multilingue di Apache Beam.

Per le pipeline Java, ti consigliamo di utilizzare il connettore I/O gestito per leggere da Kafka.

Parallelismo

Il parallelismo è vincolato da due fattori: numero massimo di worker (max_num_workers) e il numero di partizioni Kafka. Dataflow: il valore predefinito è un fanout di parallelismo di 4 x max_num_workers. Tuttavia, il fanout è limitato dal numero di partizioni. Ad esempio, se sono disponibili 100 vCPU, ma la pipeline legge solo da 10 partizioni Kafka, il parallelismo massimo è 10:

Per massimizzare il parallelismo, si consiglia di avere almeno 4 x max_num_workers partizioni Kafka. Se il tuo job utilizza Runner v2, valuta la possibilità di impostare un parallelismo ancora più alto. Un buon punto di partenza è avere partizioni uguali al doppio del numero di worker di memoria per vCPU.

Se non è possibile aumentare il numero di partizioni, valuta la possibilità di inserire un Reshuffle o Redistribute dopo il passaggio di lettura di Kafka. Questo passaggio consente Dataflow per ridistribuire e parallelizzare ulteriormente i dati efficiente, ma comporta un ulteriore overhead per eseguire il passaggio di shuffling. Per maggiori informazioni, consulta Fattori che influiscono sul parallelismo.

Cerca di assicurarti che il carico tra le partizioni sia relativamente uniforme e non disallineate. Se il carico è sbilanciato, può comportare un cattivo utilizzo dei worker. I worker che leggono da partizioni con un carico più leggero potrebbero essere relativamente inattivi, mentre quelli che leggono da partizioni con un carico elevato potrebbero rimanere indietro. Dataflow fornisce metriche per il backlog per partizione.

Se il carico è sbilanciato, il bilanciamento dinamico del lavoro può contribuire a distribuirlo. Ad esempio, Dataflow potrebbe allocare worker per leggere da più partizioni di volume ridotto e allocare un altro worker per leggere da una singola partizione con volumi elevati. Tuttavia, due worker non possono leggere dalla stessa partizione, quindi una partizione molto caricata può comunque causare la pipeline per rimanere indietro.

Best practice

Questa sezione contiene suggerimenti per la lettura da Kafka a e Dataflow.

Argomenti con volume ridotto

Uno scenario comune è leggere contemporaneamente da molti argomenti a basso volume, ad esempio un argomento per cliente. Creazione di report I job Dataflow per ogni argomento sono inefficienti in termini di costi, richiede almeno un worker completo. Valuta invece le seguenti opzioni:

  • Unisci gli argomenti. Combina gli argomenti prima che vengano importati in Dataflow. L'importazione di pochi argomenti ad alto volume è molto più efficiente rispetto all'importazione di molti argomenti a basso volume. Ogni argomento ad alto volume può essere gestito da un singolo job Dataflow che utilizza completamente i suoi worker.

  • Leggi più argomenti. Se non puoi combinare gli argomenti prima di importarli in Dataflow, valuta la possibilità di creare una pipeline che legga più argomenti. Questo approccio consente a Dataflow di assegnare diversi argomenti allo stesso worker. Esistono due modi per implementare questo approccio:

    • Singolo passaggio di lettura. Crea una singola istanza del connettore KafkaIO e configurarlo per leggere più argomenti. Quindi filtra per nome dell'argomento per e applicare logiche diverse per argomento. Per un codice di esempio, consulta Leggere da più argomenti. Valuta questa opzione se tutti i tuoi argomenti sono collocati nello stesso cluster. Uno svantaggio è che i problemi con un singolo sink o una singola trasformazione potrebbero causare l'accumulo di backlog per tutti gli argomenti.

      Per casi d'uso più avanzati, trasmetti un insieme di KafkaSourceDescriptor che specificano gli argomenti dai quali eseguire la lettura. Utilizzo KafkaSourceDescriptor ti consente di aggiornare l'elenco di argomenti in un secondo momento, se necessario. Questa funzionalità richiede Java con Runner v2.

    • Più passaggi di lettura. Per leggere da argomenti situati in cluster diversi, la pipeline può includere diverse istanze KafkaIO. Durante l'esecuzione del job, puoi aggiornare le singole origini utilizzando le mappature delle trasformazioni. L'impostazione di un nuovo argomento o cluster è supportata solo se utilizzi Runner 2. L'osservabilità è una potenziale sfida con questo approccio, perché monitorare ogni singola trasformazione di lettura invece di fare affidamento a livello di pipeline.

Conferma in Kafka in corso...

Per impostazione predefinita, il connettore KafkaIO non utilizza gli offset Kafka per monitorare l'avanzamento e non esegue il commit in Kafka. Se chiami commitOffsetsInFinalize, il connettore fa del suo meglio per eseguire il commit in Kafka dopo che i record sono stati sottoposti a commit in Dataflow. I record impegnati in Dataflow potrebbero non l'elaborazione completa, quindi se annullare la pipeline, un offset potrebbe essere eseguito senza che i record vengano elaborati completamente.

Perché l'impostazione di enable.auto.commit=True comporta l'esecuzione del commit non appena vengono letti Kafka senza alcuna elaborazione da parte di Dataflow. L'uso di questa opzione non è consigliato. È consigliabile impostare sia enable.auto.commit=False sia commitOffsetsInFinalize=True Se imposti Da enable.auto.commit a True, i dati possono andare persi se la pipeline viene interrotta durante l'elaborazione. I record già sottoposti a commit su Kafka potrebbero essere eliminati.

Filigrane

Per impostazione predefinita, il connettore KafkaIO utilizza il tempo di elaborazione corrente per assegnare la marcatura temporale in uscita e la data e l'ora dell'evento. Per modificare questo comportamento, chiama withTimestampPolicyFactory e assegna un TimestampPolicy. Beam fornisce implementazioni di TimestampPolicy che calcolano la filigrana in base a l'ora di aggiunta dei log di Kafka o l'ora di creazione del messaggio.

Considerazioni relative al runner

Il connettore KafkaIO ha due implementazioni sottostanti per le letture Kafka, la versione precedente di ReadFromKafkaViaUnbounded e quella più recente ReadFromKafkaViaSDF. Dataflow: sceglie automaticamente la migliore implementazione per il tuo job in base al tuo SDK requisiti linguistici e lavorativi. Evita di richiedere esplicitamente un'implementazione di Kafka o di un runner, a meno che tu non abbia bisogno di funzionalità specifiche disponibili solo in quell'implementazione. Per ulteriori informazioni sulla scelta di un runner, vedi Usa Dataflow Runner v2.

Se la tua pipeline utilizza withTopic o withTopics, la precedente implementazione esegue query su Kafka al momento della creazione della pipeline per partizioni disponibili. La macchina che crea la pipeline deve disporre dell'autorizzazione per connettersi a Kafka. Se ricevi un errore di autorizzazione, verifica di disporre delle autorizzazioni per connetterti a Kafka localmente. Puoi evitare questo problema utilizzando withTopicPartitions, che non si connette a Kafka al momento della costruzione della pipeline.

Distribuzione in produzione

Quando esegui il deployment della soluzione in produzione, ti consigliamo di utilizzare Modelli flessibili. Utilizzando un modello flessibile, la pipeline viene avviata da un ambiente coerente, può aiutare a mitigare i problemi di configurazione locale.

I log di KafkaIO possono essere piuttosto dettagliati. Valuta la possibilità di ridurre il logging di produzione in questo modo:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Per ulteriori informazioni, vedi Imposta i livelli di log del worker della pipeline.

Configurazione del networking

Per impostazione predefinita, Dataflow avvia le istanze all'interno una rete VPC (Virtual Private Cloud). A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per ulteriori informazioni, vedi Specifica una rete e una subnet. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.

Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno il perimetro dei Controlli di servizio VPC oppure estendere i perimetri alla VPN autorizzata o a Cloud Interconnect.

Se il deployment del cluster Kafka viene eseguito all'esterno di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di networking con diversi compromessi:

Dedicated Interconnect è l'opzione migliore per prestazioni e affidabilità, ma la configurazione può richiedere più tempo le parti interessate devono fornire i nuovi circuiti. Con una topologia basata su IP pubblici, possono iniziare rapidamente perché il lavoro di networking deve essere ridotto.

Le due sezioni successive descrivono queste opzioni in modo più dettagliato.

Spazio di indirizzi RFC 1918 condiviso

Sia Dedicated Interconnect che IPsec VPN ti offrono l'accesso diretto a indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), della tua configurazione Kafka. Se utilizzi una topologia basata su VPN, valuta la possibilità di impostare una VPN a velocità effettiva elevata.

Per impostazione predefinita, Dataflow avvia le istanze sulla rete VPC predefinita. In una topologia di rete privata con route definite esplicitamente nel router Cloud che collegano le subnet in Google Cloud a quel cluster Kafka, con maggiore controllo sulla posizione delle istanze Dataflow. Puoi utilizzare Dataflow per configurare i parametri di esecuzione network e subnetwork.

Assicurati che la subnet corrispondente abbia un numero sufficiente di indirizzi IP disponibili per consentire a Dataflow di avviare le istanze quando tenta di fare lo scale out. Inoltre, anche quando crei una rete separata per avviare Istanze Dataflow, assicurati di avere una regola firewall che abilita il traffico TCP tra tutte le macchine virtuali nel progetto. La rete predefinita ha già questa regola firewall configurata.

Spazio degli indirizzi IP pubblici

Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico. tra client esterni e Kafka e utilizza il traffico non criptato per i broker la comunicazione. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi annunciati esternamente dei broker Kafka nel cluster sono diversi dalle interfacce di rete interne usate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

I client esterni si connettono utilizzando la porta 9093 tramite un canale "SSL", mentre i client interni si connettono utilizzando la porta 9092 tramite un canale non criptato. Quando specifichi un indirizzo in advertised.listeners, utilizza nomi DNS (kafkabroker-n.mydomain.com in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare perché potrebbero non essere risolti per il traffico interno.

Ottimizza Kafka

Le impostazioni del cluster Kafka e del client Kafka possono influire notevolmente delle prestazioni. In particolare, le seguenti impostazioni potrebbero essere troppo basse. Questa sezione fornisce alcuni punti di partenza suggeriti, ma dovresti fare esperimenti con questi valori per il tuo carico di lavoro specifico.

  • unboundedReaderMaxElements. Il valore predefinito è 10.000. Un valore più alto, ad esempio 100.000 possono aumentare la dimensione set, che possono migliorare significativamente le prestazioni se la pipeline include aggregazioni. Tuttavia, un valore più elevato potrebbe anche aumentare la latenza. Per impostare il valore, utilizza setUnboundedReaderMaxElements. Questa impostazione non si applica a Runner v2.

  • unboundedReaderMaxReadTimeMs. Il valore predefinito è 10.000 ms. Un valore più elevato, come 20.000 ms, può aumentare le dimensioni del bundle, mentre un valore più basso, come 5000 ms, può ridurre la latenza o l'accumulo. Per impostare il valore, utilizza setUnboundedReaderMaxReadTimeMs. Questa impostazione non si applica a Runner v2.

  • max.poll.records. Il valore predefinito è 500. Un valore più alto potrebbe avere un rendimento migliore recuperando più record in entrata contemporaneamente, in particolare se utilizzi Runner 2. Per impostare il valore, chiama withConsumerConfigUpdates.

  • fetch.max.bytes. Il valore predefinito è 1 MB. Un valore più alto potrebbe migliorare la velocità effettiva di riducendo il numero di richieste, soprattutto quando si utilizza Runner v2. Tuttavia, impostarlo su un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia rappresentato dall'elaborazione a valle. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiama withConsumerConfigUpdates.

  • max.partition.fetch.bytes. Il valore predefinito è 1 MB. Questo parametro imposta il valore massimo quantità di dati per partizione restituita dal server. L'aumento del valore può migliorare il throughput riducendo il numero di richieste, in particolare quando si utilizza Runner 2. Tuttavia, un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia l'elaborazione a valle. Un file consigliato il valore iniziale è 100 MB. Per impostare il valore, chiama withConsumerConfigUpdates.

  • consumerPollingTimeout. Il valore predefinito è 2 secondi. Se il client consumer scade prima di poter leggere i record, prova a impostare un valore più alto. Questa impostazione è più spesso pertinente quando si eseguono letture tra regioni o con un in ogni rete. Per impostare il valore, chiama withConsumerPollingTimeout.

Assicurati che receive.buffer.bytes sia abbastanza grande da gestire le dimensioni dei messaggi. Se il valore è troppo piccolo, i log potrebbero indicare che i consumatori vengono continuamente ricreati e cercano un offset specifico.

Esempi

Gli esempi di codice riportati di seguito mostrano come creare pipeline Dataflow che leggono da Kafka. Quando utilizzi Credenziali predefinite dell'applicazione insieme alla Google Cloud Managed Service per Apache Kafka ha fornito il gestore di callback, è richiesto kafka-clients versione 3.7.0 o successiva.

Leggi di un singolo argomento

Questo esempio legge da un argomento Kafka e scrive i payload dei messaggi in file di testo.

Java

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Leggi da più argomenti

Questo esempio legge da diversi argomenti Kafka e applica una logica di pipeline distinta per ogni argomento.

Per casi d'uso più avanzati, passa dinamicamente un insieme KafkaSourceDescriptor di oggetti per consentirti di aggiornarli l'elenco degli argomenti da cui leggere. Questo approccio richiede Java con Runner v2.

Python

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))