Questo documento descrive come leggere i dati da Apache Kafka in Dataflow e include suggerimenti e best practice per le prestazioni.
Per la maggior parte dei casi d'uso, ti consigliamo di utilizzare il connettore I/O gestito per leggere da Kafka.
Se hai bisogno di una ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettoreKafkaIO
. Il connettore KafkaIO
è disponibile per
Java
o utilizzando il
framework di pipeline multilingue
per Python
e Go.
Parallelismo
Il parallelismo è limitato da due fattori: il numero massimo di worker (max_num_workers
) e il numero di partizioni Kafka. Per impostazione predefinita, Dataflow utilizza un fanout di parallelismo pari a 4 x max_num_workers
. Tuttavia, il fanout è limitato dal numero di partizioni. Ad esempio, se sono disponibili 100 vCPU, ma la pipeline legge solo da 10 partizioni Kafka, il parallelismo massimo è 10.
Per massimizzare il parallelismo, è consigliabile avere almeno 4 x
max_num_workers
partizioni Kafka. Se il tuo job utilizza
Runner v2, valuta la possibilità di impostare un parallelismo ancora più elevato.
Un buon punto di partenza è avere partizioni pari al doppio del numero di vCPU worker.
Se non puoi aumentare il numero di partizioni, puoi aumentare il parallelismo chiamando KafkaIO.Read.withRedistribute
. Questo metodo aggiunge
una trasformazione Redistribute
alla pipeline, che fornisce un suggerimento a
Dataflow per ridistribuire e parallelizzare i dati in modo più
efficiente. Puoi anche specificare il numero ottimale di shard da utilizzare nel passaggio di ridistribuzione chiamando KafkaIO.Read.withRedistributeNumKeys
.
Dataflow tratta questo valore come un suggerimento di ottimizzazione.
La ridistribuzione dei dati comporta un sovraccarico aggiuntivo per eseguire il
passaggio di ordinamento casuale. Per ulteriori informazioni, consulta
Impedire la fusione.
Cerca di assicurarti che il carico tra le partizioni sia relativamente uniforme e non scompensato. Se il carico è sbilanciato, può comportare un cattivo utilizzo dei worker. I worker che leggono da partizioni con un carico più leggero potrebbero essere relativamente inattivi, mentre quelli che leggono da partizioni con un carico elevato potrebbero rimanere indietro. Dataflow fornisce metriche per il backlog per partizione.
Se il carico è sbilanciato, il bilanciamento dinamico del lavoro può contribuire a distribuirlo. Ad esempio, Dataflow potrebbe allocare un worker per leggere da più partizioni a volume ridotto e un altro per leggere da una singola partizione a volume elevato. Tuttavia, due worker non possono leggere dalla stessa partizione, quindi una partizione con un carico elevato può comunque causare il ritardo della pipeline.
Best practice
Questa sezione contiene consigli per la lettura da Kafka in Dataflow.
Argomenti con volume ridotto
Uno scenario comune è leggere contemporaneamente da molti argomenti a basso volume, ad esempio un argomento per cliente. La creazione di job Dataflow distinti per ogni argomento non è conveniente in termini di costi, perché ogni job richiede almeno un worker completo. Valuta invece le seguenti opzioni:
Unisci gli argomenti. Combina gli argomenti prima che vengano importati in Dataflow. L'importazione di pochi argomenti ad alto volume è molto più efficiente rispetto all'importazione di molti argomenti a basso volume. Ogni argomento ad alto volume può essere gestito da un singolo job Dataflow che utilizza completamente i suoi worker.
Leggi più argomenti. Se non riesci a combinare gli argomenti prima di importarli in Dataflow, ti consigliamo di creare una pipeline che legga da più argomenti. Questo approccio consente a Dataflow di assegnare diversi argomenti allo stesso worker. Esistono due modi per implementare questo approccio:
Passaggio di lettura singola. Crea una singola istanza del connettore
KafkaIO
e configurala in modo che legga più argomenti. Quindi filtra per nome dell'argomento per applicare una logica diversa per ogni argomento. Per un codice di esempio, consulta Leggere da più argomenti. Valuta questa opzione se tutti i tuoi argomenti sono collocati nello stesso cluster. Uno svantaggio è che i problemi con un singolo sink o una singola trasformazione potrebbero causare l'accumulo di backlog per tutti gli argomenti.Per casi d'uso più avanzati, passa un insieme di oggetti
KafkaSourceDescriptor
che specificano gli argomenti da leggere. L'utilizzo diKafkaSourceDescriptor
ti consente di aggiornare l'elenco di argomenti in un secondo momento, se necessario. Questa funzionalità richiede Java con Runner v2.Più passaggi di lettura. Per leggere da argomenti in cluster diversi, la pipeline può includere diverse istanze di
KafkaIO
. Durante l'esecuzione del job, puoi aggiornare le singole origini utilizzando le mappature delle trasformazioni. L'impostazione di un nuovo argomento o cluster è supportata solo quando utilizzi Runner v2. L'osservabilità è una potenziale sfida con questo approccio, perché devi monitorare ogni singola trasformazione di lettura anziché fare affidamento su metriche a livello di pipeline.
Eseguire il commit di nuovo in Kafka
Per impostazione predefinita, il connettore KafkaIO
non utilizza gli offset Kafka per monitorare l'avanzamento e non esegue il commit in Kafka. Se chiami
commitOffsetsInFinalize
, il connettore fa del suo meglio per eseguire il commit in Kafka dopo che i record sono stati sottoposti a commit in
Dataflow. I record committati in Dataflow potrebbero non essere completamente elaborati, pertanto se annulli la pipeline, un offset potrebbe essere committato senza che i record vengano mai completamente elaborati.
Poiché l'impostazione enable.auto.commit=True
esegue il commit degli offset non appena vengono letti da Kafka senza alcuna elaborazione da parte di Dataflow, l'utilizzo di questa opzione non è consigliato.
Ti consigliamo di impostare sia enable.auto.commit=False
che
commitOffsetsInFinalize=True
. Se imposti enable.auto.commit
su True
, i dati possono andare persi se la pipeline viene interrotta durante l'elaborazione. I record già committati su Kafka potrebbero essere eliminati.
Filigrane
Per impostazione predefinita, il connettore KafkaIO
utilizza il tempo di elaborazione corrente per assegnare la marcatura temporale in uscita e la data e l'ora dell'evento. Per modificare questo comportamento, chiama
withTimestampPolicyFactory
e assegna un
TimestampPolicy
. Beam fornisce implementazioni di TimestampPolicy
che calcolano la marcatura temporale in base all'ora di accodamento del log di Kafka o all'ora di creazione del messaggio.
Considerazioni sul runner
Il connettore KafkaIO
ha due implementazioni sottostanti per le letture di Kafka, la precedente ReadFromKafkaViaUnbounded
e la più recente ReadFromKafkaViaSDF
. Dataflow sceglie automaticamente l'implementazione migliore per il tuo job in base al linguaggio SDK e ai requisiti del job. Evita di richiedere esplicitamente un'implementazione di Kafka o di un runner, a meno che tu non abbia bisogno di funzionalità specifiche disponibili solo in quella implementazione. Per ulteriori informazioni sulla scelta di un runner, consulta Utilizzare Dataflow Runner v2.
Se la pipeline utilizza withTopic
o withTopics
,
l'implementazione precedente esegue query su Kafka al momento della compilazione della pipeline per le
partizioni disponibili. La macchina che crea la pipeline deve disporre dell'autorizzazione per connettersi a Kafka. Se ricevi un errore di autorizzazione, verifica di disporre delle autorizzazioni per connetterti a Kafka localmente. Puoi evitare questo problema utilizzando
withTopicPartitions
, che non si connette a Kafka
al momento della creazione della pipeline.
Distribuzione in produzione
Quando esegui il deployment della soluzione in produzione, ti consigliamo di utilizzare modelli flessibili. Se utilizzi un modello flessibile, la pipeline viene lanciata da un ambiente coerente, il che può contribuire ad attenuare i problemi di configurazione locale.
I log di KafkaIO
possono essere piuttosto dettagliati. Valuta la possibilità di ridurre il livello di registrazione in produzione come segue:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Per ulteriori informazioni, consulta Impostare i livelli di log dei worker della pipeline.
Configurazione del networking
Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per saperne di più, consulta la sezione Specificare una rete e una sottorete. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.
Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o a Cloud Interconnect autorizzati.
Se il cluster Kafka è dipiegato al di fuori di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di rete con diversi compromessi:
- Esegui la connessione utilizzando uno spazio di indirizzi RFC 1918 condiviso, utilizzando una delle seguenti opzioni:
- Raggiungi il tuo cluster Kafka ospitato esternamente tramite indirizzi IP pubblici utilizzando uno dei seguenti metodi:
- Rete internet pubblica
- Peering diretto
- Peering con operatori
L'interconnessione dedicata è l'opzione migliore per prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblico, puoi iniziare rapidamente perché non è richiesta molta configurazione di rete.
Le due sezioni seguenti descrivono queste opzioni in maggiore dettaglio.
Spazio di indirizzi RFC 1918 condiviso
Sia l'interconnessione dedicata che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, ti consigliamo di configurare una VPN a velocità effettiva elevata.
Per impostazione predefinita, Dataflow avvia le istanze sulla
rete VPC predefinita. In una topologia di rete privata con
route definiti esplicitamente in Cloud Router
che connettono le sottoreti in Google Cloud al cluster Kafka, hai bisogno di
più controllo su dove posizionare le istanze Dataflow. Puoi
utilizzare Dataflow per configurare i parametri di esecuzione network
e subnetwork
.
Assicurati che la sottorete corrispondente abbia un numero sufficiente di indirizzi IP disponibili su cui Dataflow possa avviare istanze quando tenta di eseguire il ridimensionamento orizzontale. Inoltre, quando crei una rete separata per l'avvio delle istanze Dataflow, assicurati di avere una regola firewall che consenta il traffico TCP tra tutte le macchine virtuali del progetto. La rete predefinita ha già configurato questa regola firewall.
Spazio degli indirizzi IP pubblici
Questa architettura utilizza Transport Layer Security
(TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra broker. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente
degli broker Kafka nel cluster sono diversi dalle interfacce di rete interne
utilizzate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
I client esterni si connettono tramite la porta 9093 tramite un canale "SSL", mentre i client interni si connettono tramite la porta 9092 tramite un canale in testo non cifrato. Quando specifichi un indirizzo in advertised.listeners
, utilizza nomi DNS (kafkabroker-n.mydomain.com
in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare
perché potrebbero non essere risolti per il traffico interno.
Ottimizza Kafka
Le impostazioni del cluster Kafka e del client Kafka possono avere un impatto significativo sul funzionamento. In particolare, le seguenti impostazioni potrebbero essere troppo basse. Questa sezione fornisce alcuni punti di partenza suggeriti, ma dovresti fare esperimenti con questi valori per il tuo carico di lavoro specifico.
unboundedReaderMaxElements
. Il valore predefinito è 10.000. Un valore più elevato, ad esempio 100.000, può aumentare le dimensioni dei bundle, il che può migliorare notevolmente le prestazioni se la pipeline include aggregazioni. Tuttavia, un valore più elevato potrebbe anche aumentare la latenza. Per impostare il valore, utilizzasetUnboundedReaderMaxElements
. Questa impostazione non si applica a Runner v2.unboundedReaderMaxReadTimeMs
. Il valore predefinito è 10.000 ms. Un valore più elevato, come 20.000 ms, può aumentare le dimensioni del bundle, mentre un valore più basso, come 5000 ms, può ridurre la latenza o l'accumulo. Per impostare il valore, utilizzasetUnboundedReaderMaxReadTimeMs
. Questa impostazione non si applica a Runner v2.max.poll.records
. Il valore predefinito è 500. Un valore più elevato potrebbe avere un rendimento migliore recuperando più record in entrata contemporaneamente, in particolare se utilizzi Runner 2. Per impostare il valore, chiamawithConsumerConfigUpdates
.fetch.max.bytes
. Il valore predefinito è 1 MB. Un valore più elevato potrebbe migliorare il throughput riducendo il numero di richieste, in particolare quando si utilizza Runner 2. Tuttavia, impostarlo su un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia rappresentato dall'elaborazione a valle. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiamawithConsumerConfigUpdates
.max.partition.fetch.bytes
. Il valore predefinito è 1 MB. Questo parametro imposta la quantità massima di dati per partizione restituita dal server. L'aumento del valore può migliorare il throughput riducendo il numero di richieste, in particolare quando si utilizza Runner 2. Tuttavia, un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia l'elaborazione a valle. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiamawithConsumerConfigUpdates
.consumerPollingTimeout
. Il valore predefinito è 2 secondi. Se il client consumer scade prima di poter leggere i record, prova a impostare un valore più alto. Questa impostazione è spesso pertinente quando vengono eseguite letture tra regioni o letture con una rete lenta. Per impostare il valore, chiamawithConsumerPollingTimeout
.
Assicurati che receive.buffer.bytes
sia abbastanza grande da gestire le dimensioni dei messaggi. Se il valore è troppo piccolo, i log potrebbero indicare che i consumatori vengono continuamente ricreati e cercano un offset specifico.
Esempi
Gli esempi di codice riportati di seguito mostrano come creare pipeline Dataflow
che leggono da Kafka. Quando utilizzi le credenziali predefinite dell'applicazione in combinazione con il gestore dei callback fornito da Google Cloud Managed Service per Apache Kafka, è necessaria la versione 3.7.0 o successive di kafka-clients
.
Leggere da un singolo argomento
Questo esempio utilizza il connettore I/O gestito. Mostra come leggere da un argomento Kafka e scrivere i payload dei messaggi in file di testo.
Java
Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Python
Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Leggere da più argomenti
Questo esempio utilizza il connettore KafkaIO
. Mostra come leggere da più argomenti Kafka e applicare una logica di pipeline separata per ogni argomento.
Per casi d'uso più avanzati, passa dinamicamente un insieme di oggetti KafkaSourceDescriptor
, in modo da poter aggiornare l'elenco di argomenti da leggere. Questo approccio richiede Java con Runner v2.
Java
Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Python
Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Passaggi successivi
- Scrivere in Apache Kafka.
- Scopri di più sull'I/O gestita.