Questo documento descrive come leggere i dati da Apache Kafka a Dataflow e include suggerimenti e best practice per il rendimento.
Per la maggior parte dei casi d'uso, valuta la possibilità di utilizzare il connettore I/O gestito per leggere da Kafka.
Se hai bisogno di una ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettore
KafkaIO
. Il connettore KafkaIO
è disponibile per
Java
o utilizzando il
framework per pipeline multilingue
per Python
e Go.
Parallelismo
Le sezioni seguenti descrivono come configurare il parallelismo durante la lettura da Kafka.
Panoramica
Il parallelismo è limitato da due fattori: il numero massimo di worker (max_num_workers
) e il numero di partizioni Kafka. Dataflow
utilizza per impostazione predefinita un fanout di parallelismo di 4 x max_num_workers
. Tuttavia, il fanout è
limitato dal numero di partizioni. Ad esempio, se sono disponibili 100 vCPU, ma la pipeline legge solo da 10 partizioni Kafka, il parallelismo massimo è 10.
Per massimizzare il parallelismo, ti consigliamo di avere almeno 4 x
max_num_workers
partizioni Kafka. Se il job utilizza
Runner v2, valuta la possibilità di impostare un parallelismo ancora più elevato.
Un buon punto di partenza è avere partizioni pari al doppio del numero di vCPU worker.
Ridistribuisci
Se non riesci ad aumentare il numero di partizioni, puoi aumentare il parallelismo chiamando KafkaIO.Read.withRedistribute
. Questo metodo aggiunge
una trasformazione Redistribute
alla pipeline, che fornisce un suggerimento a
Dataflow per ridistribuire e parallelizzare i dati in modo più
efficiente. Ti consigliamo vivamente di specificare il numero ottimale di shard chiamando KafkaIO.Read.withRedistributeNumKeys
. L'utilizzo
di KafkaIO.Read.withRedistribute
da solo può generare numerose
chiavi, causando problemi di prestazioni. Per saperne di più, consulta Identificare le fasi con parallelismo elevato.
La ridistribuzione dei dati aggiunge un sovraccarico aggiuntivo per eseguire il passaggio di rimescolamento. Per ulteriori informazioni, vedi
Evitare la fusione.
Per ridurre al minimo il costo del rimescolamento della ridistribuzione, chiama
KafkaIO.Read.withOffsetDeduplication
. Questa modalità
riduce al minimo la quantità di dati che devono essere resi persistenti nell'ambito dello shuffle,
fornendo comunque l'elaborazione esattamente una volta.
Se l'elaborazione "exactly-once" non è necessaria, puoi consentire i duplicati chiamando
KafkaIO.Read.withAllowDuplicates
.
La tabella seguente riassume le opzioni di ridistribuzione:
Opzione | Modalità di elaborazione | Apache Beam | Configurazione |
---|---|---|---|
Ridistribuisci input | Exactly-once | v2.60+ | KafkaIO.Read.withRedistribute() |
Consenti duplicati | Almeno una volta | v2.60+ | KafkaIO.Read.withRedistribute().withAllowDuplicates() |
Deduplicazione offset | Exactly-once | v2.65+ | KafkaIO.Read.withRedistribute().withOffsetDeduplication() |
Distorsione del carico
Cerca di assicurarti che il carico tra le partizioni sia relativamente uniforme e non distorto. Se il carico è sbilanciato, può comportare un utilizzo inefficiente dei lavoratori. I worker che leggono dalle partizioni con un carico più leggero potrebbero essere relativamente inattivi, mentre i worker che leggono dalle partizioni con un carico elevato potrebbero rimanere indietro. Dataflow fornisce metriche per il backlog per partizione.
Se il carico è sbilanciato, il bilanciamento dinamico del lavoro può contribuire a distribuire il lavoro. Ad esempio, Dataflow potrebbe allocare un worker per leggere da più partizioni a basso volume e un altro worker per leggere da una singola partizione ad alto volume. Tuttavia, due worker non possono leggere dalla stessa partizione, quindi una partizione molto caricata può comunque causare il ritardo della pipeline.
Best practice
Questa sezione contiene consigli per la lettura da Kafka in Dataflow.
Argomenti con volume basso
Uno scenario comune è leggere da molti argomenti a basso volume contemporaneamente, ad esempio un argomento per cliente. La creazione di job Dataflow separati per ogni argomento è inefficiente in termini di costi, perché ogni job richiede almeno un worker completo. Valuta invece le seguenti opzioni:
Unire gli argomenti. Combina gli argomenti prima che vengano inseriti in Dataflow. L'importazione di pochi argomenti con un volume elevato è molto più efficiente rispetto all'importazione di molti argomenti con un volume basso. Ogni argomento ad alto volume può essere gestito da un singolo job Dataflow che utilizza completamente i suoi worker.
Leggere più argomenti. Se non riesci a combinare gli argomenti prima di importarli in Dataflow, valuta la possibilità di creare una pipeline che legga da più argomenti. Questo approccio consente a Dataflow di assegnare più argomenti allo stesso worker. Esistono due modi per implementare questo approccio:
Singolo passaggio di lettura. Crea una singola istanza del connettore
KafkaIO
e configurala per leggere più argomenti. Quindi, filtra per nome dell'argomento per applicare una logica diversa per ogni argomento. Per un esempio di codice, vedi Lettura da più argomenti. Prendi in considerazione questa opzione se tutti gli argomenti si trovano nello stesso cluster. Uno svantaggio è che i problemi con un singolo sink o una singola trasformazione potrebbero causare l'accumulo di backlog in tutti gli argomenti.Per casi d'uso più avanzati, passa un insieme di oggetti
KafkaSourceDescriptor
che specificano gli argomenti da leggere. L'utilizzo diKafkaSourceDescriptor
ti consente di aggiornare l'elenco degli argomenti in un secondo momento, se necessario. Questa funzionalità richiede Java con Runner v2.Più passaggi di lettura. Per leggere argomenti che si trovano in cluster diversi, la pipeline può includere diverse istanze di
KafkaIO
. Mentre il job è in esecuzione, puoi aggiornare le singole origini utilizzando le mappature di trasformazione. L'impostazione di un nuovo argomento o cluster è supportata solo quando utilizzi Runner v2. L'osservabilità è una potenziale sfida con questo approccio, perché devi monitorare ogni singola trasformazione di lettura anziché fare affidamento sulle metriche a livello di pipeline.
Eseguire il commit di nuovo in Kafka
Per impostazione predefinita, il connettore KafkaIO
non utilizza gli offset Kafka per monitorare l'avanzamento
e non esegue il commit di nuovo in Kafka. Se chiami
commitOffsetsInFinalize
, il connettore fa del suo meglio per eseguire il commit di nuovo in Kafka dopo che i record sono stati sottoposti a commit in
Dataflow. I record di cui è stato eseguito il commit in Dataflow potrebbero non
essere elaborati completamente, quindi se annulli la pipeline, potrebbe essere eseguito il commit di un offset senza che i record vengano mai elaborati completamente.
Poiché l'impostazione enable.auto.commit=True
esegue il commit degli offset non appena vengono letti da
Kafka senza alcuna elaborazione da parte di Dataflow, l'utilizzo di questa opzione non è consigliato.
Il consiglio è di impostare sia enable.auto.commit=False
che
commitOffsetsInFinalize=True
. Se imposti
enable.auto.commit
su True
, i dati possono essere persi se la pipeline viene interrotta
durante l'elaborazione. I record già inviati su Kafka potrebbero essere eliminati.
Filigrane
Per impostazione predefinita, il connettore KafkaIO
utilizza l'ora di elaborazione corrente per assegnare
la filigrana di output
e l'ora dell'evento. Per modificare questo comportamento, chiama
withTimestampPolicyFactory
e assegna un
TimestampPolicy
. Beam fornisce
implementazioni di TimestampPolicy
che calcolano il watermark in base
all'ora di accodamento del log di Kafka o all'ora di creazione del messaggio.
Considerazioni per i runner
Il connettore KafkaIO
ha due implementazioni sottostanti per le letture Kafka: la
ReadFromKafkaViaUnbounded
precedente e la
ReadFromKafkaViaSDF
più recente. Dataflow sceglie automaticamente l'implementazione migliore per il tuo job in base al linguaggio SDK e ai requisiti del job. Evita di richiedere esplicitamente un runner o un'implementazione di Kafka, a meno che tu non abbia bisogno di funzionalità specifiche disponibili solo in quell'implementazione. Per ulteriori informazioni sulla scelta di un runner, consulta
Utilizzare Dataflow Runner v2.
Se la tua pipeline utilizza withTopic
o withTopics
,
l'implementazione precedente esegue query su Kafka al momento della creazione della pipeline per le
partizioni disponibili. La macchina che crea la pipeline deve disporre dell'autorizzazione
per connettersi a Kafka. Se ricevi un errore di autorizzazione, verifica di disporre delle autorizzazioni per connetterti a Kafka localmente. Puoi evitare questo problema utilizzando
withTopicPartitions
, che non si connette a Kafka
durante la creazione della pipeline.
Distribuzione in produzione
Quando esegui il deployment della soluzione in produzione, ti consigliamo di utilizzare i modelli flessibili. Utilizzando un modello flessibile, la pipeline viene avviata da un ambiente coerente, il che può contribuire a mitigare i problemi di configurazione locali.
La registrazione da KafkaIO
può essere piuttosto dettagliata. Valuta la possibilità di ridurre il livello di logging in produzione nel seguente modo:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Per saperne di più, vedi Impostare i livelli di log dei worker della pipeline.
Configurazione del networking
Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potresti dover configurare una rete e una subnet diverse per Dataflow. Per saperne di più, consulta Specifica una rete e una subnet. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.
Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o a Cloud Interconnect autorizzati.
Se il cluster Kafka è implementato al di fuori di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di networking con compromessi diversi:
- Connettiti utilizzando uno spazio di indirizzi RFC 1918 condiviso, utilizzando uno dei seguenti metodi:
- Raggiungi il cluster Kafka ospitato esternamente tramite indirizzi IP pubblici utilizzando uno dei seguenti metodi:
- Rete internet pubblica
- Peering diretto
- Peering con operatori
L'Dedicated Interconnect è l'opzione migliore per prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché le terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblico, puoi iniziare rapidamente perché è necessario eseguire un lavoro di networking minimo.
Le due sezioni successive descrivono queste opzioni in modo più dettagliato.
Spazio di indirizzi RFC 1918 condiviso
Sia l'Dedicated Interconnect che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, valuta la possibilità di configurare una VPN a velocità effettiva elevata.
Per impostazione predefinita, Dataflow avvia le istanze sulla tua
rete VPC predefinita. In una topologia di rete privata con
route definite in modo esplicito in Cloud Router
che connettono le subnet in Google Cloud a quel cluster Kafka, hai bisogno
di un maggiore controllo sulla posizione delle istanze Dataflow. Puoi
utilizzare Dataflow per configurare i network
e subnetwork
parametri di esecuzione.
Assicurati che la subnet corrispondente disponga di indirizzi IP sufficienti per consentire a Dataflow di avviare le istanze quando tenta di fare lo scale out. Inoltre, quando crei una rete separata per avviare le istanze Dataflow, assicurati di avere una regola firewall che consenta il traffico TCP tra tutte le macchine virtuali del progetto. La rete predefinita ha già questa regola firewall configurata.
Spazio di indirizzi IP pubblici
Questa architettura utilizza Transport Layer Security
(TLS) per proteggere il traffico
tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione
tra i broker. Quando il listener Kafka si associa a un'interfaccia di rete utilizzata
sia per la comunicazione interna che per quella esterna, la configurazione del listener è
semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente
dei broker Kafka nel cluster differiscono dalle interfacce di rete interne
utilizzate da Kafka. In questi scenari, puoi utilizzare la proprietà advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
I client esterni si connettono utilizzando la porta 9093 tramite un canale "SSL", mentre i client interni si connettono utilizzando la porta 9092 tramite un canale non criptato. Quando specifichi un
indirizzo in advertised.listeners
, utilizza nomi DNS
(kafkabroker-n.mydomain.com
, in questo esempio) che si risolvono nella stessa istanza
sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare
perché la risoluzione degli indirizzi potrebbe non riuscire per il traffico interno.
Ottimizzare Kafka
Le impostazioni del cluster Kafka e del client Kafka possono avere un grande impatto sulle prestazioni. In particolare, le seguenti impostazioni potrebbero essere troppo basse. Questa sezione fornisce alcuni punti di partenza suggeriti, ma devi sperimentare con questi valori per il tuo carico di lavoro specifico.
unboundedReaderMaxElements
. Il valore predefinito è 10.000. Un valore più alto, ad esempio 100.000, può aumentare le dimensioni dei bundle, il che può migliorare notevolmente le prestazioni se la pipeline include aggregazioni. Tuttavia, un valore più alto potrebbe anche aumentare la latenza. Per impostare il valore, utilizzasetUnboundedReaderMaxElements
. Questa impostazione non si applica a Runner v2. Per Runner v2, utilizza l'opzionesdf_checkpoint_after_output_bytes
del servizio Dataflow.unboundedReaderMaxReadTimeMs
. Il valore predefinito è 10.000 ms. Un valore più alto, ad esempio 20.000 ms, può aumentare le dimensioni del bundle, mentre un valore più basso, ad esempio 5000 ms, può ridurre la latenza o il backlog. Per impostare il valore, utilizzasetUnboundedReaderMaxReadTimeMs
. Questa impostazione non si applica a Runner v2. Per Runner v2, utilizza l'opzionesdf_checkpoint_after_duration
del servizio Dataflow.max.poll.records
. Il valore predefinito è 500. Un valore più alto potrebbe avere un rendimento migliore recuperando più record in entrata insieme, soprattutto quando utilizzi Runner v2. Per impostare il valore, chiamawithConsumerConfigUpdates
.fetch.max.bytes
. Il valore predefinito è 1 MB. Un valore più elevato potrebbe migliorare la velocità effettiva riducendo il numero di richieste, soprattutto quando utilizzi Runner v2. Tuttavia, se lo imposti su un valore troppo elevato, la latenza potrebbe aumentare, anche se l'elaborazione a valle è più probabile che sia il collo di bottiglia principale. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiamawithConsumerConfigUpdates
.max.partition.fetch.bytes
. Il valore predefinito è 1 MB. Questo parametro imposta la quantità massima di dati per partizione restituita dal server. L'aumento del valore può migliorare la velocità effettiva riducendo il numero di richieste, soprattutto quando si utilizza Runner v2. Tuttavia, se lo imposti su un valore troppo alto, la latenza potrebbe aumentare, anche se l'elaborazione downstream è più probabile che sia il collo di bottiglia principale. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiamawithConsumerConfigUpdates
.consumerPollingTimeout
. Il valore predefinito è 2 secondi. Se il client consumer scade prima di poter leggere i record, prova a impostare un valore più alto. Questa impostazione è più spesso pertinente quando si eseguono letture tra regioni o letture con una rete lenta. Per impostare il valore, chiamawithConsumerPollingTimeout
.
Assicurati che receive.buffer.bytes
sia abbastanza grande da gestire le dimensioni dei
messaggi. Se il valore è troppo piccolo, i log potrebbero mostrare che i consumatori vengono
continuamente ricreati e cercano un offset specifico.
Esempi
Gli esempi di codice riportati di seguito mostrano come creare pipeline Dataflow
che leggono da Kafka. Quando utilizzi le Credenziali predefinite dell'applicazione#39;applicazione insieme al gestore di callback fornito da Google Cloud Managed Service per Apache Kafka, è richiesta la versione kafka-clients
3.7.0 o successive.
Leggere da un singolo argomento
Questo esempio utilizza il connettore I/O gestito. Mostra come leggere da un argomento Kafka e scrivere i payload dei messaggi in file di testo.
Java
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.
Python
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.
Leggere da più argomenti
Questo esempio utilizza il connettore KafkaIO
. Mostra come leggere da più argomenti Kafka e applicare una logica della pipeline separata per ogni argomento.
Per casi d'uso più avanzati, trasmetti dinamicamente un insieme di oggetti
KafkaSourceDescriptor
, in modo da poter aggiornare
l'elenco degli argomenti da leggere. Questo approccio richiede Java con Runner v2.
Java
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.
Python
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.
Passaggi successivi
- Scrivi su Apache Kafka.
- Scopri di più su I/O gestito.