"exactly-once" in Dataflow

Dataflow supporta l'elaborazione "exactly-once" dei record. Questa pagina spiega in che modo Dataflow implementa l'elaborazione "exactly-once" garantendo al contempo una bassa latenza.

Panoramica

Le pipeline batch utilizzano sempre l'elaborazione "exactly-once". Le pipeline in modalità flusso utilizzano l'elaborazione "exactly-once" per impostazione predefinita, ma possono anche utilizzare l'elaborazione at-least-once.

L'elaborazione "exactly-once" fornisce garanzie sui risultati dell'elaborazione dei record, inclusi i risultati di ogni fase della pipeline. In particolare, per ogni record che arriva alla pipeline da un'origine o arriva in una fase da una fase precedente, Dataflow garantisce quanto segue:

  • Il record viene elaborato e non viene perso.
  • I risultati dell'elaborazione che rimangono nella pipeline vengono visualizzati al massimo una volta.

In altre parole, i record vengono elaborati almeno una volta e i risultati vengono impegnati esattamente una volta.

L'elaborazione "exactly-once" garantisce che i risultati siano accurati, senza record duplicati nell'output. Dataflow è ottimizzato per ridurre al minimo la latenza mantenendo la semantica "exactly-once". Tuttavia, l'elaborazione "exactly-once" comporta ancora dei costi per l'esecuzione della deduplicazione. Per i casi d'uso che possono tollerare i record duplicati, spesso puoi ridurre i costi e migliorare la latenza abilitando la modalità atleast-once. Per saperne di più sulla scelta tra i flussi di dati "exactly-once" e at-least-once, consulta Impostare la modalità di elaborazione in modalità flusso della pipeline.

Dati in ritardo

L'elaborazione "exactly-once" garantisce l'accuratezza della pipeline: se la pipeline elabora un record, Dataflow fa in modo che il record venga riportato nell'output e che il record non sia duplicato.

In una pipeline in modalità flusso, tuttavia, l'elaborazione "exactly-once" non può garantire che i risultati siano completati, perché i record potrebbero arrivare in ritardo. Ad esempio, supponi che la pipeline esegua un'aggregazione in una finestra temporale, come Count. Con l'elaborazione "exactly-once", il risultato è accurato per i record che arrivano all'interno della finestra in modo tempestivo, ma i record tardivi potrebbero essere eliminati.

In genere, non c'è modo di garantire la completezza in una pipeline in modalità flusso, perché in teoria i record possono arrivare in ritardo arbitrario. Nel caso limitato, dovresti attendere all'infinito per produrre un risultato. Più in pratica, Apache Beam ti consente di configurare la soglia per rilasciare i dati in ritardo e quando emettere risultati aggregati. Per ulteriori informazioni, consulta la sezione Filigrane e dati recenti nella documentazione di Apache Beam.

Effetti collaterali

Non è garantito che gli effetti collaterali abbiano una semantica "exactly-once". È importante sottolineare che ciò include la scrittura di output in un archivio esterno, a meno che il sink non implementi anche la semantica "exactly-once".

In particolare, Dataflow non garantisce che ogni record passi tramite ogni trasformazione esattamente una volta. A causa di nuovi tentativi o errori dei worker, Dataflow potrebbe inviare un record attraverso una trasformazione più volte o anche contemporaneamente su più worker.

Nell'ambito dell'elaborazione "exactly-once", Dataflow deduplica gli output. Tuttavia, se il codice in una trasformazione ha effetti collaterali, questi potrebbero verificarsi più volte. Ad esempio, se una trasformazione effettua una chiamata di servizio remoto, questa potrebbe essere effettuata più volte per lo stesso record. Gli effetti collaterali possono anche portare alla perdita di dati in alcune situazioni. Ad esempio, supponi che una trasformazione legga un file per produrre un output, quindi lo elimini immediatamente senza attendere che venga eseguito il commit dell'output. Se si verifica un errore durante il commit del risultato, Dataflow riprova a eseguire la trasformazione, ma ora non può leggere il file eliminato.

Logging

L'output del log dall'elaborazione indica che l'elaborazione è avvenuta, ma non indica se è stato eseguito il commit dei dati. Di conseguenza, i file di log potrebbero indicare che i dati sono stati elaborati più volte, anche se i risultati dei dati elaborati vengono impegnati nell'archiviazione permanente solo una volta. Inoltre, i log non riflettono sempre i dati elaborati e di cui è stato eseguito il commit. I log potrebbero venire eliminati a causa della limitazione o persi a causa di altri problemi del servizio di logging.

Streaming "exactly-once"

Questa sezione spiega in che modo Dataflow implementa l'elaborazione "exactly-once" per i job di flusso, incluso il modo in cui Dataflow gestisce complessità come l'elaborazione non deterministica, i dati tardivi e il codice personalizzato.

shuffling dei flussi di dati Dataflow

I job Dataflow vengono eseguiti su vari worker in parallelo assegnando intervalli di lavoro a ciascun worker. Sebbene le assegnazioni possano cambiare nel tempo in risposta agli errori dei worker, alla scalabilità automatica o ad altri eventi, dopo ogni trasformazione di GroupByKey, tutti i record con la stessa chiave vengono elaborati sullo stesso worker. La trasformazione GroupByKey viene spesso utilizzata da trasformazioni composite, come Count, FileIO e così via. Per garantire che i record per una determinata chiave finiscano sullo stesso worker, i worker Dataflow eseguono lo shuffling dei dati tra loro utilizzando le chiamate di procedura remota (RPC).

Per garantire che i record non vadano persi durante l'shuffling, Dataflow utilizza il backup upstream. Con il backup upstream, il worker che invia i record riprova gli RPC fino a quando non riceve una conferma di ricezione del record. Gli effetti collaterali dell'elaborazione del record vengono applicati all'archiviazione permanente a valle. Se il worker che invia i record non è più disponibile, Dataflow continua a riprovare gli RPC, assicurando che ogni record venga consegnato almeno una volta.

Poiché questi nuovi tentativi potrebbero creare duplicati, ogni messaggio è contrassegnato con un ID univoco. Ogni destinatario memorizza un catalogo di tutti gli ID che sono già stati visti ed elaborati. Alla ricezione di un record, Dataflow ne cerca l'ID nel catalogo. Se viene trovato l'ID, il record è già stato ricevuto e confermato e viene eliminato come duplicato. Per garantire che gli ID record siano stabili, ogni output, da un passaggio all'altro, viene controllato nello spazio di archiviazione. Di conseguenza, se lo stesso messaggio viene inviato più volte a causa di chiamate RPC ripetute, viene impegnato nell'archiviazione una sola volta.

Garanzia di bassa latenza

Affinché l'elaborazione "exactly-once" sia attuabile, l'I/O deve essere ridotto, in particolare impedendo l'I/O su ogni record. Per raggiungere questo obiettivo, Dataflow utilizza i filtri Bloom e la garbage collection.

Filtri Fiori

I filtri Bloom sono strutture di dati compatte che consentono di eseguire rapidamente i controlli relativi all'appartenenza al set. In Dataflow, ogni worker mantiene un filtro Bloom per ogni ID che vede. Quando arriva un nuovo ID record, il worker cerca l'ID nel filtro. Se il filtro restituisce false, il record non è un duplicato e il worker non cerca l'ID nello spazio di archiviazione stabile.

Dataflow mantiene un insieme di filtri Bloom aggiornati raggruppati in base al tempo. Quando arriva un record, Dataflow sceglie il filtro appropriato da verificare in base al timestamp di sistema. Questo passaggio impedisce ai filtri Bloom di saturare man mano che i filtri vengono raccolti inutili e limita la quantità di dati da analizzare all'avvio.

Garbage collection

Per evitare di riempire lo spazio di archiviazione con ID record, Dataflow utilizza la garbage collection per rimuovere i record precedenti. Dataflow utilizza il timestamp di sistema per calcolare una filigrana di garbage collection.

Questa filigrana si basa sul tempo fisico di attesa in una determinata fase. Pertanto, fornisce anche informazioni sulle parti della pipeline lente. Questi metadati sono la base per la metrica di ritardo di sistema mostrata nell'interfaccia di monitoraggio di Dataflow.

Se un record arriva con un timestamp precedente alla filigrana e se gli ID per questo periodo di tempo sono già stati garbage collection, il record viene ignorato. Poiché la filigrana bassa che attiva la garbage collection non avanza finché le pubblicazioni dei record non vengono confermate, questi record in ritardo sono duplicati.

Origini non deterministiche

Dataflow utilizza l'SDK Apache Beam per leggere i dati nelle pipeline. Se l'elaborazione non va a buon fine, Dataflow potrebbe riprovare a leggere da un'origine. In questa situazione, Dataflow deve garantire che ogni record univoco prodotto da un'origine venga registrato esattamente una volta. Per le origini deterministiche, come Pub/Sub Lite o Kafka, i record vengono letti in base a un offset registrato, riducendo la necessità di questo passaggio.

Poiché Dataflow non può assegnare automaticamente gli ID record, le origini non deterministiche devono indicare a Dataflow quali sono gli ID record per evitare duplicati. Quando un'origine fornisce ID univoci per ogni record, il connettore utilizza uno shuffling nella pipeline per rimuovere i duplicati. I record con lo stesso ID vengono filtrati. Per un esempio di come Dataflow implementa l'elaborazione "exactly-once" quando si utilizza Pub/Sub come origine, consulta la sezione Elaborazione "exactly-once" nella pagina Flussi di dati con Pub/Sub.

Quando esegui DoFn personalizzati come parte della tua pipeline, Dataflow non garantisce che questo codice venga eseguito una sola volta per record. Per garantire l'elaborazione almeno una volta in caso di errori dei worker, Dataflow potrebbe eseguire un determinato record attraverso una trasformazione più volte oppure eseguire lo stesso record contemporaneamente su più worker. Se includi la tua pipeline con codice che esegue operazioni come contattare un servizio esterno, le azioni potrebbero essere eseguite più di una volta per un determinato record.

Per rendere l'elaborazione non deterministica efficacemente deterministica, utilizza il checkpointing. Quando utilizzi il checkpointing, ogni output di una trasformazione viene sottoposto a checkpoint nello spazio di archiviazione stabile con il suo ID univoco prima di passare alla fase successiva. I nuovi tentativi nella distribuzione shuffling di Dataflow inoltrano l'output che è stato sottoposto a checkpoint. Sebbene il codice possa essere eseguito più volte, Dataflow garantisce l'archiviazione dell'output di una sola di queste esecuzioni. Dataflow utilizza un archivio coerente che impedisce la scrittura dei duplicati nello spazio di archiviazione stabile.

Distribuzione dell'output "exactly-once"

L'SDK Apache Beam include sink integrati progettati per garantire che non producano duplicati. Se possibile, utilizza uno di questi sink integrati.

Se devi scrivere il tuo sink, l'approccio migliore è rendere l'oggetto funzione idempotente in modo che possa essere ripetuto ogni volta che è necessario senza causare effetti collaterali indesiderati. Tuttavia, spesso alcuni componenti della trasformazione che implementano la funzionalità del sink non sono deterministici e potrebbero cambiare se viene riprovata.

Ad esempio, in un'aggregazione a finestre, l'insieme di record nella finestra potrebbe essere non deterministico. In particolare, la finestra potrebbe tentare di attivarsi con gli elementi e0, e1, e2. Il worker potrebbe arrestarsi in modo anomalo prima di eseguire il commit dell'elaborazione delle finestre, ma non prima che questi elementi vengano inviati come effetto collaterale. Quando il worker si riavvia, la finestra si attiva di nuovo e arriva un elemento e3 in ritardo. Poiché questo elemento arriva prima del commit della finestra, non viene conteggiato come dati in ritardo, quindi DoFn viene richiamato con gli elementi e0, e1, e2, e3. Questi elementi vengono quindi inviati all'operazione di effetto collaterale. L'idempotenza non è utile in questo scenario perché vengono inviati ogni volta set di record logici diversi.

Per affrontare l'indeterminatezza in Dataflow, utilizza la trasformazione Reshuffle integrata. Quando Dataflow esegue lo shuffling dei dati, Dataflow scrive i dati in modo duraturo in modo che tutti gli elementi non generati in modo deterministico siano stabili nel caso in cui le operazioni vengano tentate di nuovo dopo l'shuffling. L'uso della trasformazione Reshuffle garantisce che solo una versione dell'output di DoFn possa superare un limite di shuffling. Il seguente pattern garantisce che l'operazione di effetto collaterale riceva sempre un record deterministico per l'output:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Per assicurarti che l'esecutore di Dataflow sappia che gli elementi devono essere stabili prima di eseguire un comando DoFn, aggiungi l'annotazione RequiresStableInput a DoFn.

Scopri di più