Dataflow supporta l'elaborazione "exactly-once" dei record. Questa pagina spiega come Dataflow implementa l'elaborazione "exactly-once" mentre garantendo anche una bassa latenza.
Panoramica
Le pipeline batch utilizzano sempre l'elaborazione "exactly-once". Le pipeline di flusso utilizzano l'elaborazione "exactly-once" per impostazione predefinita, ma può anche elaborazione "at-least-once".
L'elaborazione exactly-once fornisce garanzie sui risultati dell'elaborazione delle registrazioni, inclusi i risultati di ogni fase della pipeline. Nello specifico, per ogni record che arriva alla pipeline da un'origine o in una fase da una fase precedente, Dataflow garantisce quanto segue:
- Il record viene elaborato senza andare perso.
- Eventuali risultati dell'elaborazione che rimangono all'interno della pipeline vengono mostrati al massimo una volta sola.
In altre parole, i record vengono elaborati almeno una volta e i risultati vengono committati esattamente una volta.
L'elaborazione "exactly-once" garantisce l'accuratezza dei risultati, senza record duplicati nell'output. Dataflow è ottimizzato per ridurre al minimo la latenza mantenendo la semantica "exactly-once". Tuttavia, l'elaborazione "exactly-once" comporta costi per l'esecuzione della deduplicazione. Per casi d'uso che possono tollerare i duplicati spesso è possibile ridurre i costi e migliorare la latenza abilitando l'opzione . Per ulteriori informazioni sulla scelta tra "exactly-once" e "almeno una volta" i flussi di dati, vedi Imposta la modalità flusso di dati della pipeline.
Dati in ritardo
L'elaborazione "exactly-once" garantisce la accuratezza della pipeline: se la pipeline elabora un record, Dataflow garantisce che questo venga riflesso nell'output e che il record non sia duplicato.
In una pipeline in modalità flusso, tuttavia, l'elaborazione "exactly-once" non può garantire che
i risultati sono complete perché i record potrebbero arrivare in ritardo. Ad esempio,
supponendo che la pipeline esegua un'aggregazione in un intervallo di tempo, come
Count
. Con l'elaborazione exactly-once, il risultato è accurato per i record che arrivano in tempo utile all'interno dell'intervallo, ma i record in ritardo potrebbero essere eliminati.
In genere, non c'è modo di garantire la completezza in una pipeline in modalità flusso, perché in teoria i record possono arrivare arbitrariamente in ritardo. Nel caso limite, dovresti attendere all'infinito per produrre un risultato. In termini più pratici, Apache Beam ti consente di configurare la soglia per l'eliminazione dei dati in ritardo e quando emettere i risultati aggregati. Per ulteriori informazioni, vedi Filigrane e dati in ritardo nella documentazione di Apache Beam.
Effetti collaterali
Non è garantito che gli effetti collaterali abbiano una semantica "exactly-once". È importante sottolineare che include la scrittura dell'output in un archivio esterno, a meno che il sink implementa la semantica "exactly-once".
Nello specifico, Dataflow non garantisce che ogni record venga sottoposto a ogni trasformazione esattamente una volta. A causa di ripetuti tentativi o errori dei worker, Dataflow potrebbe inviare un record tramite una trasformazione più volte o anche contemporaneamente su più worker.
Nell'ambito dell'elaborazione "exactly-once", Dataflow deduplica come output. Tuttavia, se il codice di una trasformazione presenta effetti collaterali, questi effetti potrebbero verificarsi più volte. Ad esempio, se una trasformazione rende un servizio remoto questa chiamata può essere effettuata più volte per lo stesso record. Gli effetti collaterali possono persino portare alla perdita di dati in alcune situazioni. Ad esempio, supponiamo che Transform legge un file per produrre l'output, quindi lo elimina immediatamente senza attendere il commit dell'output. Se si verifica un errore quando eseguendo il commit del risultato, Dataflow riprova a eseguire la trasformazione, ma ora transform non riesce a leggere il file eliminato.
Logging
L'output del log dell'elaborazione indica che l'elaborazione è avvenuta, ma non indica se i dati sono stati sottoposti a commit. Pertanto, i file di log potrebbero indicare che i dati sono stati elaborati più volte, anche se i risultati dei dati elaborati vengono applicati allo spazio di archiviazione permanente solo una volta. Inoltre, i log non riflettono sempre i dati elaborati e impegnati. I log potrebbero essere eliminati a causa limitazione o perdita a causa di altri problemi del servizio di logging.
Streaming "exactly-once"
Questa sezione spiega come Dataflow esegue l'implementazione "exactly-once" per i job di flussi di dati, incluso il modo in cui Dataflow gestisce complessità come l'elaborazione non deterministica, i dati in ritardo e il codice personalizzato.
Shuffling dei flussi di dati Dataflow
I job Dataflow in modalità flusso vengono eseguiti su molti worker diversi in parallelo
assegnando intervalli di lavoro a ciascun worker. Sebbene le assegnazioni possano cambiare nel tempo in risposta a errori dei worker, alla scalabilità automatica o ad altri eventi, dopo ogni trasformazione GroupByKey
, tutti i record con la stessa chiave vengono elaborati nello stesso worker. La trasformazione GroupByKey
viene spesso utilizzata dalle trasformazioni composite, come Count
, FileIO
e così via. Per garantire che i record di una determinata chiave finiscano
sullo stesso worker, i worker Dataflow eseguono lo shuffling dei dati tra loro
mediante chiamate di procedura remota (RPC).
Per garantire che i record non vengano persi durante l'ordinamento casuale, Dataflow utilizza il backup a monte. Con il backup a monte, il worker che invia i record riprova le RPC finché non riceve un riconoscimento positivo dell'avvenuta ricezione del record. Gli effetti collaterali dell'elaborazione del record sono di archiviazione a valle. Se il worker che invia i record non è disponibile, Dataflow continua a riprovare le RPC, il che garantisce che ogni record venga inviato almeno una volta.
Poiché questi tentativi di ripetizione potrebbero creare duplicati, a ogni messaggio viene assegnato un ID unico. Ogni ricevente memorizza un catalogo di tutti gli ID già visti ed elaborati. Quando viene ricevuto un record, Dataflow cerca il suo ID nel catalogo. Se l'ID viene trovato, il record è già stato ricevuto viene eseguito come duplicato. Per garantire che gli ID record siano stabili, ogni output da un passaggio all'altro viene sottoposto a checkpoint nello spazio di archiviazione. Di conseguenza, se lo stesso messaggio viene inviato più volte a causa di chiamate RPC ripetute, viene eseguito il commit del messaggio nello spazio di archiviazione solo una volta.
Garantire una bassa latenza
Affinché l'elaborazione esattamente una volta sia fattibile, le operazioni di I/O devono essere ridotte, in particolare impedendo le operazioni di I/O su ogni record. Per raggiungere questo obiettivo, Dataflow utilizza i filtri Bloom e la garbage collection.
Filtri Fioritura
Filtri Bloom sono strutture di dati compatte che consentono rapidi controlli di impostazione delle iscrizioni. Nella Dataflow, ogni worker mantiene un filtro Bloom per ogni ID che vede. Quando arriva un nuovo ID record, il worker lo cerca nel filtro. Se il filtro restituisce false, il record non è un duplicato e il worker non cerca l'ID nello spazio di archiviazione stabile.
Dataflow mantiene un insieme di filtri Bloom incrementali raggruppati in base al tempo. Quando arriva un record, Dataflow sceglie il filtro appropriato da controllare in base al timestamp di sistema. Questo passaggio impedisce ai filtri Bloom di saturarsi man mano che vengono raccolti, nonché limita la quantità di dati da analizzare all'avvio.
Garbage collection
Per evitare di riempire lo spazio di archiviazione con ID record, Dataflow utilizza la garbage per rimuovere i vecchi record. Dataflow utilizza il sistema timestamp per calcolare una filigrana di garbage collection.
Questa filigrana si basa sulla quantità di tempo fisico di attesa di una determinata fase. Fornisce quindi anche informazioni sulle parti della pipeline che sono lente. Questi metadati costituiscono la base per la metrica Tempo di sistema Interfaccia di monitoraggio di Dataflow.
Se arriva un record con un timestamp precedente alla filigrana e se gli ID per il momento attuale sono già garbage collection, il record viene ignorato. Poiché la filigrana bassa attiva la garbage collection non avanza finché non vengono riconosciuti, questi record arrivati in ritardo sono duplicati.
Fonti non deterministiche
Dataflow utilizza l'SDK Apache Beam per leggere i dati nelle pipeline. Se l'elaborazione non riesce, Dataflow potrebbe riprovare a leggere da un'origine. In questo caso, Dataflow deve assicurarsi che ogni record univoco prodotto da un'origine venga registrato esattamente una volta. Per le origini deterministiche, come Pub/Sub Lite o Kafka, i record vengono letti in base a un offset registrato, attenuando la necessità di questo passaggio.
Poiché Dataflow non può assegnare automaticamente gli ID record, le origini non deterministiche devono indicare a Dataflow gli ID record al fine di evitare duplicati. Quando una fonte fornisce ID univoci per ogni record, il connettore usa uno shuffling della pipeline per rimuovere i duplicati. I record con lo stesso ID vengono esclusi. Per un esempio di come Dataflow esegue l'implementazione "exactly-once" quando utilizzi Pub/Sub come origine, consulta le Elaborazione "exactly-once" della pagina Flussi di dati con Pub/Sub.
Quando esegui DoFn
personalizzati all'interno della pipeline, Dataflow non garantisce che questo codice venga eseguito una sola volta per record. Per garantire un'elaborazione almeno una volta in caso di errori dei worker, Dataflow potrebbe eseguire un determinato record tramite una trasformazione più volte o eseguire lo stesso record contemporaneamente su più worker. Se nella pipeline includi codice che esegue operazioni come contattare un servizio esterno, le azioni potrebbero essere eseguite più volte per un determinato record.
Per rendere l'elaborazione non deterministica effettivamente deterministica, utilizza il controllo dei punti di controllo. Quando utilizzi i checkpoint, ogni output di una trasformazione viene sottoposto a checkpoint in uno spazio di archiviazione stabile con il relativo ID univoco prima di essere inviato alla fase successiva. Nuovi tentativi nel Distribuzione shuffling di Dataflow inoltra l'output che è stato controllato. Sebbene il codice possa essere eseguito più volte, Dataflow garantisce che viene memorizzato il risultato di una sola di queste esecuzioni. Dataflow utilizza un archivio coerente che impedisce la scrittura di duplicati in uno spazio di archiviazione stabile.
Output con consegna "exactly-once"
L'SDK Apache Beam include sink integrati che sono progettati per garantire che non producano duplicati. Se possibile, utilizza uno di questi lavandini integrati.
Se devi scrivere il tuo sink, l'approccio migliore è rendere l'oggetto funzione idempotente in modo che possa essere riprovato tutte le volte necessarie senza causare effetti collaterali indesiderati. Tuttavia, spesso è presente un componente della trasformazione che implementa la funzionalità del sink non è deterministica e potrebbe cambiare se viene ritentato.
Ad esempio, in un'aggregazione con finestra, l'insieme di record nella finestra potrebbe essere non deterministico. Nello specifico, la finestra potrebbe tentare di attivarsi con gli elementi e0, e1 ed e2. Il worker potrebbe arrestarsi in modo anomalo prima di eseguire il commit della finestra
ma non prima che questi elementi vengano inviati come effetto collaterale. Quando il worker si riavvia, la finestra viene attivata di nuovo e arriva un elemento in ritardo e3. Poiché questo elemento arriva prima dell'applicazione della finestra, non viene conteggiato come dato in ritardo, pertanto DoFn
viene chiamato di nuovo con gli elementi e0, e1, e2, e3. Questi elementi vengono poi
inviati all'operazione di effetto collaterale. L'idempotenza non è utile in questo scenario, perché ogni volta vengono inviati diversi set di record logici.
Per risolvere la non determinatezza in Dataflow, utilizza la trasformazione Reshuffle
incorporata. Quando Dataflow esegue lo shuffling dei dati, Dataflow scrive
in modo che tutti gli elementi generati non deterministici siano stabili se
e operazioni vengono tentate di nuovo dopo lo shuffle. Utilizzo delle garanzie di trasformazione Reshuffle
che solo una versione dell'output di DoFn
può superare il limite di shuffling.
Il seguente pattern garantisce che l'operazione con effetti collaterali riceva sempre un
record deterministico in output:
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(GroupByKey.<..>.create())
.apply(new PrepareOutputData())
.apply(Reshuffle.<..>of())
.apply(WriteToSideEffect());
Per assicurarti che il programma di esecuzione di Dataflow sappia che gli elementi devono essere stabili prima di eseguire un DoFn
, aggiungi l'annotazione RequiresStableInput
al DoFn
.
Scopri di più
- Imposta la modalità flusso di dati della pipeline
- Flusso di dati con Pub/Sub
- Streaming Engine: modello di esecuzione per l'elaborazione di dati a bassa latenza e altamente scalabile
- Scopri di più sul modello di esecuzione di Apache Beam
- Dopo Lambda: elaborazione "exactly-once" in Dataflow, parte 1
- After Lambda: elaborazione "exactly-once" in Dataflow, parte 2 (garantire una bassa latenza)
- Dopo Lambda: elaborazione "exactly-once" in Dataflow, parte 3 (sorgenti e sink)