Scrivere da Dataflow ad Apache Kafka

Questo documento descrive come scrivere dati da Dataflow in Apache Kafka.

Per la maggior parte dei casi d'uso, ti consigliamo di utilizzare il connettore I/O gestito per scrivere in Kafka.

Se hai bisogno di una ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettoreKafkaIO. Il connettore KafkaIO è disponibile per Java o utilizzando il framework di pipeline multilingue per Python e Go.

Elaborazione "exactly-once"

Per impostazione predefinita, il connettore KafkaIO non fornisce semantica exactly-once per le scritture. Ciò significa che i dati potrebbero essere scritti nell'argomento Kafka più volte. Per attivare le scritture esattamente una volta, chiama il metodo withEOS. Le scritture esattamente una volta garantiscono che i dati vengano scritti nell'argomento Kafka di destinazione esattamente una volta. Tuttavia, aumenta anche il costo della pipeline e diminuisce il throughput.

Se non hai requisiti rigorosi per la semantica esatta e la logica della pipeline può gestire i record duplicati, ti consigliamo di attivare la modalità almeno una volta per l'intera pipeline per ridurre i costi. Per ulteriori informazioni, consulta Impostare la modalità di streaming della pipeline.

Scarichi della pipeline

Se svuoti la pipeline, la semantica exactly-once non è garantita. L'unica garanzia è che non vengono persi dati confermati. Di conseguenza, alcuni dati potrebbero essere elaborati durante lo svuotamento della pipeline, senza il commit degli offset di lettura su Kafka. Per ottenere la semantica almeno una volta per Kafka quando modifichi una pipeline, aggiorna la pipeline anziché annullare il job e avviarne uno nuovo.

Ottimizza Kafka per la semantica esatta

La regolazione di transaction.max.timeout.ms e transactional.id.expiration.ms può complementare la tua strategia complessiva di tolleranza degli errori e di consegna exactly-once. Tuttavia, il loro impatto dipende dalla natura dell'interruzione e dalla tua configurazione specifica. Imposta transaction.max.timeout.ms vicino al tempo di conservazione dei tuoi argomenti Kafka per evitare la duplicazione dei dati causata da interruzioni del broker Kafka.

Se un broker Kafka diventa temporaneamente non disponibile (ad esempio a causa di una partizione della rete o di un errore del nodo) e un produttore ha transazioni in corso, queste transazioni potrebbero scadere. L'aumento del valore di transaction.max.timeout.ms consente alle transazioni di avere più tempo per essere completate dopo il recupero di un broker, evitando potenzialmente la necessità di riavviare le transazioni e di inviare nuovamente i messaggi. Questa mitigazione contribuisce indirettamente a mantenere la semantica esattamente una volta, riducendo la possibilità di messaggi duplicati causati dai riavvii delle transazioni. D'altra parte, una scadenza più breve può contribuire a ripulire più rapidamente gli ID transazioni inattivi, riducendo il potenziale utilizzo delle risorse.

Configurazione del networking

Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per saperne di più, consulta la sezione Specificare una rete e una sottorete. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.

Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o a Cloud Interconnect autorizzati.

Se il cluster Kafka è dipiegato al di fuori di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di rete con diversi compromessi:

L'interconnessione dedicata è l'opzione migliore per prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblico, puoi iniziare rapidamente perché non è richiesta molta configurazione di rete.

Le due sezioni seguenti descrivono queste opzioni in maggiore dettaglio.

Spazio di indirizzi RFC 1918 condiviso

Sia l'interconnessione dedicata che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, ti consigliamo di configurare una VPN a velocità effettiva elevata.

Per impostazione predefinita, Dataflow avvia le istanze sulla rete VPC predefinita. In una topologia di rete privata con route definiti esplicitamente in Cloud Router che connettono le sottoreti in Google Cloud al cluster Kafka, hai bisogno di più controllo su dove posizionare le istanze Dataflow. Puoi utilizzare Dataflow per configurare i parametri di esecuzione network e subnetwork.

Assicurati che la sottorete corrispondente abbia un numero sufficiente di indirizzi IP disponibili su cui Dataflow possa avviare istanze quando tenta di eseguire il ridimensionamento orizzontale. Inoltre, quando crei una rete separata per l'avvio delle istanze Dataflow, assicurati di avere una regola firewall che consenta il traffico TCP tra tutte le macchine virtuali del progetto. La rete predefinita ha già configurato questa regola firewall.

Spazio degli indirizzi IP pubblici

Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra broker. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente degli broker Kafka nel cluster sono diversi dalle interfacce di rete interne utilizzate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

I client esterni si connettono tramite la porta 9093 tramite un canale "SSL", mentre i client interni si connettono tramite la porta 9092 tramite un canale in testo non cifrato. Quando specifichi un indirizzo in advertised.listeners, utilizza nomi DNS (kafkabroker-n.mydomain.com in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare perché potrebbero non essere risolti per il traffico interno.

Logging

I log di KafkaIO possono essere piuttosto dettagliati. Valuta la possibilità di ridurre il livello di registrazione in produzione come segue:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Per ulteriori informazioni, consulta Impostare i livelli di log dei worker della pipeline.

Passaggi successivi