Questo documento descrive come leggere i dati da Apache Kafka in Dataflow.
Il connettore I/O Apache Beam Kafka (KafkaIO
) è disponibile in modo nativo per
Java,
e anche per
Python
e Go
utilizzando il framework di pipeline multilingue di Apache Beam.
Per le pipeline Java, ti consigliamo di utilizzare il connettore I/O gestito per leggere da Kafka.
Parallelismo
Il parallelismo è vincolato da due fattori:
numero massimo di worker
(max_num_workers
) e il numero di partizioni Kafka. Dataflow:
il valore predefinito è un fanout di parallelismo di 4 x max_num_workers
. Tuttavia, il fanout è limitato dal numero di partizioni. Ad esempio, se sono disponibili 100 vCPU,
ma la pipeline legge solo da 10 partizioni Kafka, il parallelismo massimo è
10:
Per massimizzare il parallelismo, si consiglia di avere almeno 4 x
max_num_workers
partizioni Kafka. Se il tuo job utilizza
Runner v2, valuta la possibilità di impostare un parallelismo ancora più alto.
Un buon punto di partenza è avere partizioni uguali al doppio del numero di worker
di memoria per vCPU.
Se non è possibile aumentare il numero di partizioni, valuta la possibilità di inserire un Reshuffle
o Redistribute
dopo il passaggio di lettura di Kafka. Questo passaggio consente
Dataflow per ridistribuire e parallelizzare ulteriormente i dati
efficiente, ma comporta un ulteriore overhead per eseguire il passaggio di shuffling. Per maggiori informazioni, consulta Fattori che influiscono sul parallelismo.
Cerca di assicurarti che il carico tra le partizioni sia relativamente uniforme e non disallineate. Se il carico è sbilanciato, può comportare un cattivo utilizzo dei worker. I worker che leggono da partizioni con un carico più leggero potrebbero essere relativamente inattivi, mentre quelli che leggono da partizioni con un carico elevato potrebbero rimanere indietro. Dataflow fornisce metriche per il backlog per partizione.
Se il carico è sbilanciato, il bilanciamento dinamico del lavoro può contribuire a distribuirlo. Ad esempio, Dataflow potrebbe allocare worker per leggere da più partizioni di volume ridotto e allocare un altro worker per leggere da una singola partizione con volumi elevati. Tuttavia, due worker non possono leggere dalla stessa partizione, quindi una partizione molto caricata può comunque causare la pipeline per rimanere indietro.
Best practice
Questa sezione contiene suggerimenti per la lettura da Kafka a e Dataflow.
Argomenti con volume ridotto
Uno scenario comune è leggere contemporaneamente da molti argomenti a basso volume, ad esempio un argomento per cliente. Creazione di report I job Dataflow per ogni argomento sono inefficienti in termini di costi, richiede almeno un worker completo. Valuta invece le seguenti opzioni:
Unisci gli argomenti. Combina gli argomenti prima che vengano importati in Dataflow. L'importazione di pochi argomenti ad alto volume è molto più efficiente rispetto all'importazione di molti argomenti a basso volume. Ogni argomento ad alto volume può essere gestito da un singolo job Dataflow che utilizza completamente i suoi worker.
Leggi più argomenti. Se non puoi combinare gli argomenti prima di importarli in Dataflow, valuta la possibilità di creare una pipeline che legga più argomenti. Questo approccio consente a Dataflow di assegnare diversi argomenti allo stesso worker. Esistono due modi per implementare questo approccio:
Singolo passaggio di lettura. Crea una singola istanza del connettore
KafkaIO
e configurarlo per leggere più argomenti. Quindi filtra per nome dell'argomento per e applicare logiche diverse per argomento. Per un codice di esempio, consulta Leggere da più argomenti. Valuta questa opzione se tutti i tuoi argomenti sono collocati nello stesso cluster. Uno svantaggio è che i problemi con un singolo sink o una singola trasformazione potrebbero causare l'accumulo di backlog per tutti gli argomenti.Per casi d'uso più avanzati, trasmetti un insieme di
KafkaSourceDescriptor
che specificano gli argomenti dai quali eseguire la lettura. UtilizzoKafkaSourceDescriptor
ti consente di aggiornare l'elenco di argomenti in un secondo momento, se necessario. Questa funzionalità richiede Java con Runner v2.Più passaggi di lettura. Per leggere da argomenti situati in cluster diversi, la pipeline può includere diverse istanze
KafkaIO
. Durante l'esecuzione del job, puoi aggiornare le singole origini utilizzando le mappature delle trasformazioni. L'impostazione di un nuovo argomento o cluster è supportata solo se utilizzi Runner 2. L'osservabilità è una potenziale sfida con questo approccio, perché monitorare ogni singola trasformazione di lettura invece di fare affidamento a livello di pipeline.
Conferma in Kafka in corso...
Per impostazione predefinita, il connettore KafkaIO
non utilizza gli offset Kafka per monitorare l'avanzamento
e non esegue il commit in Kafka. Se chiami
commitOffsetsInFinalize
, il connettore fa del suo meglio per eseguire il commit in Kafka dopo che i record sono stati sottoposti a commit in
Dataflow. I record impegnati in Dataflow potrebbero non
l'elaborazione completa, quindi se
annullare la pipeline, un offset
potrebbe essere eseguito senza che i record vengano elaborati completamente.
Perché l'impostazione di enable.auto.commit=True
comporta l'esecuzione del commit non appena vengono letti
Kafka senza alcuna elaborazione da parte di Dataflow. L'uso di questa opzione non è consigliato.
È consigliabile impostare sia enable.auto.commit=False
sia
commitOffsetsInFinalize=True
Se imposti
Da enable.auto.commit
a True
, i dati possono andare persi se la pipeline viene interrotta
durante l'elaborazione. I record già sottoposti a commit su Kafka potrebbero essere eliminati.
Filigrane
Per impostazione predefinita, il connettore KafkaIO
utilizza il tempo di elaborazione corrente per assegnare la marcatura temporale in uscita e la data e l'ora dell'evento. Per modificare questo comportamento, chiama
withTimestampPolicyFactory
e assegna un
TimestampPolicy
. Beam fornisce
implementazioni di TimestampPolicy
che calcolano la filigrana in base a
l'ora di aggiunta dei log di Kafka o l'ora di creazione del messaggio.
Considerazioni relative al runner
Il connettore KafkaIO
ha due implementazioni sottostanti per le letture Kafka,
la versione precedente di ReadFromKafkaViaUnbounded
e quella più recente
ReadFromKafkaViaSDF
. Dataflow:
sceglie automaticamente la migliore implementazione per il tuo job in base al tuo SDK
requisiti linguistici e lavorativi. Evita di richiedere esplicitamente un'implementazione di Kafka o di un runner, a meno che tu non abbia bisogno di funzionalità specifiche disponibili solo in quell'implementazione. Per ulteriori informazioni sulla scelta di un runner, vedi
Usa Dataflow Runner v2.
Se la tua pipeline utilizza withTopic
o withTopics
,
la precedente implementazione esegue query su Kafka al momento della creazione della pipeline per
partizioni disponibili. La macchina che crea la pipeline deve disporre dell'autorizzazione per connettersi a Kafka. Se ricevi un errore di autorizzazione, verifica di disporre delle autorizzazioni per connetterti a Kafka localmente. Puoi evitare questo problema utilizzando
withTopicPartitions
, che non si connette a Kafka
al momento della costruzione della pipeline.
Distribuzione in produzione
Quando esegui il deployment della soluzione in produzione, ti consigliamo di utilizzare Modelli flessibili. Utilizzando un modello flessibile, la pipeline viene avviata da un ambiente coerente, può aiutare a mitigare i problemi di configurazione locale.
I log di KafkaIO
possono essere piuttosto dettagliati. Valuta la possibilità di ridurre il logging
di produzione in questo modo:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Per ulteriori informazioni, vedi Imposta i livelli di log del worker della pipeline.
Configurazione del networking
Per impostazione predefinita, Dataflow avvia le istanze all'interno una rete VPC (Virtual Private Cloud). A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per ulteriori informazioni, vedi Specifica una rete e una subnet. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.
Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno il perimetro dei Controlli di servizio VPC oppure estendere i perimetri alla VPN autorizzata o a Cloud Interconnect.
Se il deployment del cluster Kafka viene eseguito all'esterno di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di networking con diversi compromessi:
- Connettiti utilizzando uno spazio di indirizzi RFC 1918 condiviso usando uno dei seguenti elementi:
- Raggiungi il tuo cluster Kafka ospitato esternamente tramite indirizzi IP pubblici utilizzando uno dei seguenti metodi:
- Rete internet pubblica
- Peering diretto
- Peering con operatori
Dedicated Interconnect è l'opzione migliore per prestazioni e affidabilità, ma la configurazione può richiedere più tempo le parti interessate devono fornire i nuovi circuiti. Con una topologia basata su IP pubblici, possono iniziare rapidamente perché il lavoro di networking deve essere ridotto.
Le due sezioni successive descrivono queste opzioni in modo più dettagliato.
Spazio di indirizzi RFC 1918 condiviso
Sia Dedicated Interconnect che IPsec VPN ti offrono l'accesso diretto a indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), della tua configurazione Kafka. Se utilizzi una topologia basata su VPN, valuta la possibilità di impostare una VPN a velocità effettiva elevata.
Per impostazione predefinita, Dataflow avvia le istanze sulla
rete VPC predefinita. In una topologia di rete privata con
route definite esplicitamente nel router Cloud
che collegano le subnet in Google Cloud a quel cluster Kafka,
con maggiore controllo sulla posizione
delle istanze Dataflow. Puoi
utilizzare Dataflow per configurare i parametri di esecuzione network
e subnetwork
.
Assicurati che la subnet corrispondente abbia un numero sufficiente di indirizzi IP disponibili per consentire a Dataflow di avviare le istanze quando tenta di fare lo scale out. Inoltre, anche quando crei una rete separata per avviare Istanze Dataflow, assicurati di avere una regola firewall che abilita il traffico TCP tra tutte le macchine virtuali nel progetto. La rete predefinita ha già questa regola firewall configurata.
Spazio degli indirizzi IP pubblici
Questa architettura utilizza Transport Layer Security
(TLS) per proteggere il traffico.
tra client esterni e Kafka e utilizza il traffico non criptato per i broker
la comunicazione. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi
annunciati esternamente
dei broker Kafka nel cluster sono diversi dalle interfacce di rete interne
usate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
I client esterni si connettono utilizzando la porta 9093 tramite un canale "SSL", mentre i client interni si connettono utilizzando la porta 9092 tramite un canale non criptato. Quando specifichi un indirizzo in advertised.listeners
, utilizza nomi DNS (kafkabroker-n.mydomain.com
in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare
perché potrebbero non essere risolti per il traffico interno.
Ottimizza Kafka
Le impostazioni del cluster Kafka e del client Kafka possono influire notevolmente delle prestazioni. In particolare, le seguenti impostazioni potrebbero essere troppo basse. Questa sezione fornisce alcuni punti di partenza suggeriti, ma dovresti fare esperimenti con questi valori per il tuo carico di lavoro specifico.
unboundedReaderMaxElements
. Il valore predefinito è 10.000. Un valore più alto, ad esempio 100.000 possono aumentare la dimensione set, che possono migliorare significativamente le prestazioni se la pipeline include aggregazioni. Tuttavia, un valore più elevato potrebbe anche aumentare la latenza. Per impostare il valore, utilizzasetUnboundedReaderMaxElements
. Questa impostazione non si applica a Runner v2.unboundedReaderMaxReadTimeMs
. Il valore predefinito è 10.000 ms. Un valore più elevato, come 20.000 ms, può aumentare le dimensioni del bundle, mentre un valore più basso, come 5000 ms, può ridurre la latenza o l'accumulo. Per impostare il valore, utilizzasetUnboundedReaderMaxReadTimeMs
. Questa impostazione non si applica a Runner v2.max.poll.records
. Il valore predefinito è 500. Un valore più alto potrebbe avere un rendimento migliore recuperando più record in entrata contemporaneamente, in particolare se utilizzi Runner 2. Per impostare il valore, chiamawithConsumerConfigUpdates
.fetch.max.bytes
. Il valore predefinito è 1 MB. Un valore più alto potrebbe migliorare la velocità effettiva di riducendo il numero di richieste, soprattutto quando si utilizza Runner v2. Tuttavia, impostarlo su un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia rappresentato dall'elaborazione a valle. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiamawithConsumerConfigUpdates
.max.partition.fetch.bytes
. Il valore predefinito è 1 MB. Questo parametro imposta il valore massimo quantità di dati per partizione restituita dal server. L'aumento del valore può migliorare il throughput riducendo il numero di richieste, in particolare quando si utilizza Runner 2. Tuttavia, un valore troppo elevato potrebbe aumentare la latenza, anche se è più probabile che il collo di bottiglia principale sia l'elaborazione a valle. Un file consigliato il valore iniziale è 100 MB. Per impostare il valore, chiamawithConsumerConfigUpdates
.consumerPollingTimeout
. Il valore predefinito è 2 secondi. Se il client consumer scade prima di poter leggere i record, prova a impostare un valore più alto. Questa impostazione è più spesso pertinente quando si eseguono letture tra regioni o con un in ogni rete. Per impostare il valore, chiamawithConsumerPollingTimeout
.
Assicurati che receive.buffer.bytes
sia abbastanza grande da gestire le dimensioni dei messaggi. Se il valore è troppo piccolo, i log potrebbero indicare che i consumatori vengono continuamente ricreati e cercano un offset specifico.
Esempi
Gli esempi di codice riportati di seguito mostrano come creare pipeline Dataflow
che leggono da Kafka. Quando utilizzi Credenziali predefinite dell'applicazione insieme alla
Google Cloud Managed Service per Apache Kafka ha fornito il gestore di callback, è richiesto kafka-clients
versione 3.7.0 o successiva.
Leggi di un singolo argomento
Questo esempio legge da un argomento Kafka e scrive i payload dei messaggi in file di testo.
Java
Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Python
Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Leggi da più argomenti
Questo esempio legge da diversi argomenti Kafka e applica una logica di pipeline distinta per ogni argomento.
Per casi d'uso più avanzati, passa dinamicamente un insieme
KafkaSourceDescriptor
di oggetti per consentirti di aggiornarli
l'elenco degli argomenti da cui leggere. Questo approccio richiede Java con Runner v2.
Python
Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.