Exactly-once in Dataflow

Dataflow supporta l'elaborazione "exactly-once" dei record. Questa pagina spiega come Dataflow implementa l'elaborazione "exactly-once" garantendo al contempo una bassa latenza.

Panoramica

Le pipeline batch utilizzano sempre l'elaborazione exactly-once. Per impostazione predefinita, le pipeline di streaming utilizzano l'elaborazione "exactly-once", ma possono anche utilizzare l'elaborazione almeno una volta.

L'elaborazione exactly-once fornisce garanzie sui risultati dell'elaborazione delle registrazioni, inclusi i risultati di ogni fase della pipeline. Nello specifico, per ogni record che arriva alla pipeline da un'origine o in una fase da una fase precedente, Dataflow garantisce quanto segue:

  • Il record viene elaborato e non viene perso.
  • Eventuali risultati dell'elaborazione che rimangono all'interno della pipeline vengono riportati al massimo una volta.

In altre parole, i record vengono elaborati almeno una volta e i risultati vengono committati esattamente una volta.

L'elaborazione "exactly-once" garantisce l'accuratezza dei risultati, senza record duplicati nell'output. Dataflow è ottimizzato per ridurre al minimo la latenza mantenendo la semantica exactly-once. Tuttavia, l'elaborazione "exactly-once" comporta comunque costi per eseguire la deduplica. Per i casi d'uso che possono tollerare record duplicati, spesso puoi ridurre i costi e migliorare la latenza attivando la modalità almeno una volta. Per ulteriori informazioni sulla scelta tra lo streaming esattamente una volta e almeno una volta, consulta la sezione Impostare la modalità di streaming della pipeline.

Dati in ritardo

L'elaborazione "exactly-once" garantisce l'accuratezza della pipeline: se la pipeline elabora un record, Dataflow garantisce che il record sia riportato nell'output e che non venga duplicato.

Tuttavia, in una pipeline di streaming, l'elaborazione esattamente una volta non può garantire che i risultati siano completi, perché i record potrebbero arrivare in ritardo. Ad esempio, supponendo che la pipeline esegua un'aggregazione in un intervallo di tempo, ad esempio Count. Con l'elaborazione exactly-once, il risultato è accurato per i record che arrivano in tempo utile all'interno dell'intervallo, ma i record in ritardo potrebbero essere eliminati.

In genere, non è possibile garantire la completezza in una pipeline di streaming, perché in teoria i record possono arrivare in ritardo in modo arbitrario. Nel caso limite, dovresti attendere all'infinito per produrre un risultato. In termini più pratici, Apache Beam ti consente di configurare la soglia per l'eliminazione dei dati in ritardo e quando emettere i risultati aggregati. Per ulteriori informazioni, consulta Watermark e dati in ritardo nella documentazione di Apache Beam.

Effetti collaterali

Non è garantito che gli effetti collaterali abbiano la semantica esattamente una volta. È importante sottolineare che questo include la scrittura dell'output in un magazzino esterno, a meno che l'obiettivo non implementi anche la semantica esattamente una volta.

Nello specifico, Dataflow non garantisce che ogni record venga sottoposto a ogni trasformazione esattamente una volta. A causa di ripetuti tentativi o errori dei worker, Dataflow potrebbe inviare un record tramite una trasformazione più volte o anche contemporaneamente su più worker.

Nell'ambito dell'elaborazione exactly-once, Dataflow deduplica gli output. Tuttavia, se il codice di una trasformazione presenta effetti collaterali, questi effetti potrebbero verificarsi più volte. Ad esempio, se una trasformazione effettua una chiamata a un servizio remoto, questa chiamata potrebbe essere effettuata più volte per lo stesso record. Gli effetti collaterali possono persino portare alla perdita di dati in alcune situazioni. Ad esempio, supponiamo che una trasformazione legga un file per produrre un output, quindi lo elimini immediatamente senza attendere l'commit dell'output. Se si verifica un errore durante il commit del risultato, Dataflow riprova la trasformazione, ma ora la trasformazione non può leggere il file eliminato.

Logging

L'output del log dell'elaborazione indica che l'elaborazione è avvenuta, ma non indica se i dati sono stati sottoposti a commit. Pertanto, i file di log potrebbero indicare che i dati sono stati elaborati più volte, anche se i risultati dei dati elaborati vengono applicati allo spazio di archiviazione permanente solo una volta. Inoltre, i log non rispecchiano sempre i dati elaborati e sottoposti a commit. I log potrebbero essere eliminati a causa della limitazione o persi a causa di altri problemi del servizio di registrazione.

Streaming "exactly-once"

Questa sezione spiega come Dataflow implementa l'elaborazione esattamente una volta per i job in streaming, inclusa la gestione di complessità come l'elaborazione non deterministica, i dati in ritardo e il codice personalizzato.

Dataflow streaming shuffle

I job Dataflow in streaming vengono eseguiti su molti worker diversi in parallelo assegnando intervalli di lavoro a ciascun worker. Sebbene le assegnazioni possano cambiare nel tempo in risposta a errori dei worker, alla scalabilità automatica o ad altri eventi, dopo ogni trasformazione GroupByKey, tutti i record con la stessa chiave vengono elaborati nello stesso worker. La trasformazione GroupByKey viene spesso utilizzata dalle trasformazioni composite, come Count, FileIO e così via. Per garantire che i record per una determinata chiave vengano inseriti nello stesso worker, i worker di Dataflow rimescolano i dati tra loro utilizzando le chiamate di procedura remota (RPC).

Per garantire che i record non vengano persi durante l'ordinamento casuale, Dataflow utilizza il backup a monte. Con il backup a monte, il worker che invia i record riprova le RPC finché non riceve un riscontro positivo che indica che il record è stato ricevuto. Gli effetti collaterali dell'elaborazione del record vengono registrati in un luogo di archiviazione permanente a valle. Se il worker che invia i record non è disponibile, Dataflow continua a riprovare le RPC, il che garantisce che ogni record venga inviato almeno una volta.

Poiché questi tentativi di nuovo invio potrebbero creare duplicati, a ogni messaggio viene assegnato un ID unico. Ogni ricevente memorizza un catalogo di tutti gli ID già visti ed elaborati. Quando viene ricevuto un record, Dataflow cerca il relativo ID nel catalogo. Se l'ID viene trovato, il record è già stato ricevuto e committato e viene eliminato come duplicato. Per garantire che gli ID record siano stabili, ogni output da un passaggio all'altro viene sottoposto a checkpoint nello spazio di archiviazione. Di conseguenza, se lo stesso messaggio viene inviato più volte a causa di chiamate RPC ripetute, viene eseguito il commit del messaggio nello spazio di archiviazione solo una volta.

Garantire una bassa latenza

Affinché l'elaborazione esattamente una volta sia fattibile, le operazioni di I/O devono essere ridotte, in particolare impedendo le operazioni di I/O su ogni record. Per raggiungere questo obiettivo, Dataflow utilizza i filtri Bloom e garbage collection.

Filtri Bloom

I filtri di Bloom sono strutture di dati compatte che consentono di eseguire rapidamente controlli di appartenenza all'insieme. In Dataflow, ogni worker mantiene un filtro Bloom di ogni ID che vede. Quando arriva un nuovo ID record, il worker lo cerca nel filtro. Se il filtro restituisce false, questo record non è un duplicato e il worker non cerca l'ID nello spazio di archiviazione stabile.

Dataflow mantiene un insieme di filtri Bloom incrementali raggruppati in base al tempo. Quando arriva un record, Dataflow sceglie il filtro appropriato da controllare in base al timestamp di sistema. Questo passaggio impedisce ai filtri Bloom di saturarsi man mano che vengono raccolti, nonché limita la quantità di dati che devono essere analizzati all'avvio.

Garbage collection

Per evitare di riempire lo spazio di archiviazione con ID record, Dataflow utilizza la raccolta del garbage per rimuovere i record vecchi. Dataflow utilizza il timestamp del sistema per calcolare una marcatura temporale per la raccolta dei rifiuti.

Questa filigrana si basa sul tempo fisico trascorso in attesa in una determinata fase. Fornisce quindi anche informazioni sulle parti della pipeline che sono lente. Questi metadati sono la base della metrica relativa al ritardo del sistema mostrata nell'interfaccia di monitoraggio di Dataflow.

Se un record arriva con un timestamp precedente alla filigrana e se gli ID per questo momento sono già stati sottoposti a garbage collection, il record viene ignorato. Poiché la marcatura temporale minima che attiva garbage collection non avanza finché i caricamenti dei record non vengono confermati, questi record in ritardo sono duplicati.

Origini non deterministiche

Dataflow utilizza l'SDK Apache Beam per leggere i dati nelle pipeline. Se l'elaborazione non va a buon fine, Dataflow potrebbe riprovare a leggere da una sorgente. In questo caso, Dataflow deve assicurarsi che ogni record univoco prodotto da un'origine venga registrato esattamente una volta. Per le origini deterministiche, come Pub/Sub Lite o Kafka, i record vengono letti in base a un offset registrato, attenuando la necessità di questo passaggio.

Poiché Dataflow non può assegnare automaticamente gli ID record, le origini non deterministiche devono indicare a Dataflow quali sono gli ID record per evitare duplicazioni. Quando un'origine fornisce ID univoci per ogni record, il connettore utilizza un'operazione di mescolamento nella pipeline per rimuovere i duplicati. I record con lo stesso ID vengono esclusi. Per un esempio di come Dataflow implementa l'elaborazione "exactly-once" quando utilizza Pub/Sub come origine, consulta la sezione Elaborazione exactly-once nella pagina Streaming con Pub/Sub.

Quando esegui DoFn personalizzati all'interno della pipeline, Dataflow non garantisce che questo codice venga eseguito una sola volta per record. Per garantire un'elaborazione almeno una volta in caso di errori dei worker, Dataflow potrebbe eseguire un determinato record tramite una trasformazione più volte o eseguire lo stesso record contemporaneamente su più worker. Se nella pipeline includi codice che esegue operazioni come contattare un servizio esterno, le azioni potrebbero essere eseguite più volte per un determinato record.

Per rendere l'elaborazione non deterministica effettivamente deterministica, utilizza il controllo dei punti di controllo. Quando utilizzi i checkpoint, ogni output di una trasformazione viene sottoposto a checkpoint in uno spazio di archiviazione stabile con il relativo ID univoco prima di essere inviato alla fase successiva. I tentativi di nuovo invio nell'ordine casuale di Dataflow ritrasmettono l'output sottoposto a checkpoint. Sebbene il codice possa essere eseguito più volte, Dataflow garantisce che viene memorizzato il risultato di una sola di queste esecuzioni. Dataflow utilizza un archivio coerente che impedisce la scrittura di duplicati in uno spazio di archiviazione stabile.

Output con consegna "exactly-once"

L'SDK Apache Beam include sink integrati che sono progettati per garantire che non producano duplicati. Se possibile, utilizza uno di questi lavandini integrati.

Se devi scrivere il tuo sink, l'approccio migliore è rendere l'oggetto funzione idempotente in modo che possa essere riprovato tutte le volte necessarie senza causare effetti collaterali indesiderati. Tuttavia, spesso alcuni componenti della trasformazione che implementano la funzionalità dell'obiettivo non sono deterministici e potrebbero cambiare se si tenta di nuovo.

Ad esempio, in un'aggregazione con finestra, l'insieme di record nella finestra potrebbe essere non deterministico. Nello specifico, la finestra potrebbe tentare di attivarsi con gli elementi e0, e1 ed e2. Il worker potrebbe arrestarsi in modo anomalo prima di eseguire il commit dell'elaborazione della finestra, ma non prima che questi elementi vengano inviati come effetto collaterale. Quando il worker si riavvia, la finestra viene attivata di nuovo e arriva un elemento in ritardo e3. Poiché questo elemento arriva prima dell'applicazione della finestra, non viene conteggiato come dato in ritardo, pertanto DoFn viene chiamato di nuovo con gli elementi e0, e1, e2, e3. Questi elementi vengono poi inviati all'operazione di effetto collaterale. L'idempotenza non è utile in questo scenario, perché ogni volta vengono inviati diversi set di record logici.

Per risolvere la non determinatezza in Dataflow, utilizza la trasformazione Reshuffle incorporata. Quando Dataflow mescola i dati, li scrive in modo permanente in modo che eventuali elementi generati in modo non deterministico siano stabili se si tentano nuovamente le operazioni dopo la miscelazione. L'utilizzo della trasformazione Reshuffle garantisce che solo una versione dell'output di un DoFn possa superare un confine di ordinamento casuale. Il seguente pattern garantisce che l'operazione con effetti collaterali riceva sempre un record deterministico in output:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Per assicurarti che il programma di esecuzione di Dataflow sappia che gli elementi devono essere stabili prima di eseguire un DoFn, aggiungi l'annotazione RequiresStableInput al DoFn.

Scopri di più