Questo documento descrive come scrivere dati da Dataflow in Apache Kafka.
Per la maggior parte dei casi d'uso, ti consigliamo di utilizzare il connettore I/O gestito per scrivere in Kafka.
Se hai bisogno di una ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettoreKafkaIO
. Il connettore KafkaIO
è disponibile per
Java
o utilizzando il
framework di pipeline multilingue
per Python
e Go.
Elaborazione "exactly-once"
Per impostazione predefinita, il connettore KafkaIO
non fornisce
semantica exactly-once per le scritture. Ciò significa che i dati potrebbero essere scritti nell'argomento Kafka più volte. Per attivare le scritture esattamente una volta, chiama il metodo withEOS
. Le scritture esattamente una volta
garantiscono che i dati vengano scritti nell'argomento Kafka di destinazione esattamente una volta.
Tuttavia, aumenta anche il costo della pipeline e diminuisce il throughput.
Se non hai requisiti rigorosi per la semantica esatta e la logica della pipeline può gestire i record duplicati, ti consigliamo di attivare la modalità almeno una volta per l'intera pipeline per ridurre i costi. Per ulteriori informazioni, consulta Impostare la modalità di streaming della pipeline.
Scarichi della pipeline
Se svuoti la pipeline, la semantica exactly-once non è garantita. L'unica garanzia è che non vengono persi dati confermati. Di conseguenza, alcuni dati potrebbero essere elaborati durante lo svuotamento della pipeline, senza il commit degli offset di lettura su Kafka. Per ottenere la semantica almeno una volta per Kafka quando modifichi una pipeline, aggiorna la pipeline anziché annullare il job e avviarne uno nuovo.
Ottimizza Kafka per la semantica esatta
La regolazione di transaction.max.timeout.ms
e transactional.id.expiration.ms
può complementare la tua strategia complessiva di tolleranza degli errori e di consegna exactly-once.
Tuttavia, il loro impatto dipende dalla natura dell'interruzione e dalla tua configurazione specifica. Imposta transaction.max.timeout.ms
vicino al tempo di conservazione dei tuoi argomenti Kafka per evitare la duplicazione dei dati causata da interruzioni del broker Kafka.
Se un broker Kafka diventa temporaneamente non disponibile (ad esempio a causa di una partizione della rete o di un errore del nodo) e un produttore ha transazioni in corso, queste transazioni potrebbero scadere. L'aumento del valore di
transaction.max.timeout.ms
consente alle transazioni di avere più tempo per essere completate dopo il recupero di un
broker, evitando potenzialmente la necessità di riavviare le transazioni e di
inviare nuovamente i messaggi. Questa mitigazione contribuisce indirettamente a mantenere la semantica esattamente una volta, riducendo la possibilità di messaggi duplicati causati dai riavvii delle transazioni. D'altra parte, una scadenza più breve può contribuire a ripulire più rapidamente gli ID transazioni inattivi, riducendo il potenziale utilizzo delle risorse.
Configurazione del networking
Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per saperne di più, consulta la sezione Specificare una rete e una sottorete. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.
Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o a Cloud Interconnect autorizzati.
Se il cluster Kafka è dipiegato al di fuori di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di rete con diversi compromessi:
- Esegui la connessione utilizzando uno spazio di indirizzi RFC 1918 condiviso, utilizzando una delle seguenti opzioni:
- Raggiungi il tuo cluster Kafka ospitato esternamente tramite indirizzi IP pubblici utilizzando uno dei seguenti metodi:
- Rete internet pubblica
- Peering diretto
- Peering con operatori
L'interconnessione dedicata è l'opzione migliore per prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblico, puoi iniziare rapidamente perché non è richiesta molta configurazione di rete.
Le due sezioni seguenti descrivono queste opzioni in maggiore dettaglio.
Spazio di indirizzi RFC 1918 condiviso
Sia l'interconnessione dedicata che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, ti consigliamo di configurare una VPN a velocità effettiva elevata.
Per impostazione predefinita, Dataflow avvia le istanze sulla
rete VPC predefinita. In una topologia di rete privata con
route definiti esplicitamente in Cloud Router
che connettono le sottoreti in Google Cloud al cluster Kafka, hai bisogno di
più controllo su dove posizionare le istanze Dataflow. Puoi
utilizzare Dataflow per configurare i parametri di esecuzione network
e subnetwork
.
Assicurati che la sottorete corrispondente abbia un numero sufficiente di indirizzi IP disponibili su cui Dataflow possa avviare istanze quando tenta di eseguire il ridimensionamento orizzontale. Inoltre, quando crei una rete separata per l'avvio delle istanze Dataflow, assicurati di avere una regola firewall che consenta il traffico TCP tra tutte le macchine virtuali del progetto. La rete predefinita ha già configurato questa regola firewall.
Spazio degli indirizzi IP pubblici
Questa architettura utilizza Transport Layer Security
(TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra broker. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente
degli broker Kafka nel cluster sono diversi dalle interfacce di rete interne
utilizzate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
I client esterni si connettono tramite la porta 9093 tramite un canale "SSL", mentre i client interni si connettono tramite la porta 9092 tramite un canale in testo non cifrato. Quando specifichi un indirizzo in advertised.listeners
, utilizza nomi DNS (kafkabroker-n.mydomain.com
in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare
perché potrebbero non essere risolti per il traffico interno.
Logging
I log di KafkaIO
possono essere piuttosto dettagliati. Valuta la possibilità di ridurre il livello di registrazione in produzione come segue:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Per ulteriori informazioni, consulta Impostare i livelli di log dei worker della pipeline.
Passaggi successivi
- Leggi da Apache Kafka.
- Scopri di più sull'I/O gestita.