Questo documento descrive come scrivere dati da Dataflow in Apache Kafka.
Il connettore Kafka I/O di Apache Beam (KafkaIO
) è disponibile in modo nativo per
Java,
ed è disponibile anche
Python
e Vai
usando il framework di pipeline multilingua di Apache Beam.
Per le pipeline Java, valuta la possibilità di utilizzare Connettore I/O gestito da cui leggere Kafka,
Elaborazione "exactly-once"
Per impostazione predefinita, il connettore KafkaIO
non fornisce
semantica "exactly-once" per le scritture. Ciò significa che i dati potrebbero essere scritti nell'argomento Kafka più volte. Per attivare le scritture esattamente una volta, chiama il metodo withEOS
. Scrittura "exactly-once"
garantire che i dati vengano scritti esattamente una volta nell'argomento Kafka di destinazione.
Tuttavia, aumenta anche il costo della pipeline e diminuisce la velocità effettiva.
Se non hai requisiti rigorosi per la semantica esatta e la logica della pipeline può gestire i record duplicati, ti consigliamo di attivare la modalità almeno una volta per l'intera pipeline per ridurre i costi. Per ulteriori informazioni, vedi Imposta la modalità flusso di dati della pipeline.
Scarichi della pipeline
Se svuota la pipeline, la semantica "exactly-once" non è garantita. L'unica garanzia è che non vengono persi dati confermati. Di conseguenza, alcuni dati potrebbero essere elaborati durante lo svuotamento della pipeline, senza il commit degli offset di lettura su Kafka. Per ottenere la semantica "at-least-once" per Kafka quando modifichi un pipeline, aggiorna la pipeline anziché annullare il job e iniziarne uno nuovo.
Ottimizza Kafka per la semantica "exactly-once"
La modifica di transaction.max.timeout.ms
e transactional.id.expiration.ms
può
completano la tua strategia generale di tolleranza di errore
e di pubblicazione "exactly-once".
Tuttavia, il loro impatto dipende dalla natura dell'interruzione e dalle tue
configurazione. Imposta transaction.max.timeout.ms
in prossimità del periodo di conservazione di
i tuoi argomenti Kafka per evitare la duplicazione dei dati causata dalle interruzioni dei broker Kafka.
Se un broker Kafka diventa temporaneamente non disponibile (ad esempio a causa di una partizione della rete o di un errore del nodo) e un produttore ha transazioni in corso, queste transazioni potrebbero scadere. L'aumento del valore di
transaction.max.timeout.ms
consente alle transazioni di avere più tempo per essere completate dopo il recupero di un
broker, evitando potenzialmente la necessità di riavviare le transazioni e di
inviare nuovamente i messaggi. Questa mitigazione contribuisce indirettamente a mantenere la semantica esattamente una volta, riducendo la possibilità di messaggi duplicati causati dai riavvii delle transazioni. D'altra parte, una scadenza più breve può aiutare a pulire
ID transazionali inattivi più rapidamente, riducendo il potenziale utilizzo di risorse.
Configurazione del networking
Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per saperne di più, consulta la sezione Specificare una rete e una sottorete. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.
Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o all'interconnessione Cloud autorizzata.
Se il deployment del cluster Kafka viene eseguito all'esterno di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di rete con diversi compromessi:
- Esegui la connessione utilizzando uno spazio di indirizzi RFC 1918 condiviso, utilizzando una delle seguenti opzioni:
- Raggiungi il tuo cluster Kafka ospitato esternamente tramite indirizzi IP pubblici utilizzando uno dei seguenti metodi:
- Rete internet pubblica
- Peering diretto
- Peering con operatori
Dedicated Interconnect è l'opzione migliore per prestazioni e affidabilità, ma la configurazione può richiedere più tempo le parti interessate devono fornire i nuovi circuiti. Con una topologia basata su IP pubblici, possono iniziare rapidamente perché il lavoro di networking deve essere ridotto.
Le due sezioni seguenti descrivono queste opzioni in maggiore dettaglio.
Spazio di indirizzi RFC 1918 condiviso
Sia l'interconnessione dedicata che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, ti consigliamo di configurare una VPN a velocità effettiva elevata.
Per impostazione predefinita, Dataflow avvia le istanze sulla
rete VPC predefinita. In una topologia di rete privata con
percorsi definiti esplicitamente in Cloud Router
che connettono le sottoreti in Google Cloud a quel cluster Kafka, hai bisogno di
più controllo su dove posizionare le istanze Dataflow. Tu
puoi usare Dataflow per configurare network
e subnetwork
parametri di esecuzione.
Assicurati che la subnet corrispondente abbia un numero sufficiente di indirizzi IP disponibili per consentire a Dataflow di avviare le istanze quando tenta di fare lo scale out. Inoltre, anche quando crei una rete separata per avviare Istanze Dataflow, assicurati di avere una regola firewall che abilita il traffico TCP tra tutte le macchine virtuali nel progetto. La rete predefinita ha già questa regola firewall configurata.
Spazio degli indirizzi IP pubblici
Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra broker. Quando il listener Kafka si associa a un'interfaccia di rete utilizzata
sia per le comunicazioni interne che esterne,
in modo diretto. Tuttavia, in molti scenari, gli indirizzi
annunciati esternamente
dei broker Kafka nel cluster sono diversi dalle interfacce di rete interne
usate da Kafka. In questi scenari, puoi utilizzare advertised.listeners
proprietà:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
I client esterni si connettono utilizzando la porta 9093 tramite un protocollo "SSL" canale e interno
I client si connettono utilizzando la porta 9092 tramite un canale di testo non crittografato. Se specifichi
indirizzo advertised.listeners
, usa nomi DNS
(kafkabroker-n.mydomain.com
, in questo esempio) che si risolvono nella stessa istanza
sia per il traffico interno che esterno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare
perché gli indirizzi potrebbero non riuscire a risolvere il problema per il traffico interno.
Logging
La registrazione da KafkaIO
può essere piuttosto complessa. Valuta la possibilità di ridurre il livello di registrazione in produzione come segue:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Per ulteriori informazioni, consulta Impostare i livelli di log dei worker della pipeline.