"exactly-once" in Dataflow

Dataflow supporta l'elaborazione "exactly-once" dei record. Questa pagina spiega come Dataflow implementa l'elaborazione "exactly-once", garantendo al contempo una bassa latenza.

Panoramica

Le pipeline batch utilizzano sempre l'elaborazione "exactly-once". Le pipeline in modalità flusso utilizzano l'elaborazione "exactly-once" per impostazione predefinita, ma possono anche utilizzare l'elaborazione "at-least-once".

L'elaborazione "exactly-once" fornisce garanzie sui risultati dell'elaborazione dei record, inclusi i risultati di ogni fase della pipeline. Nello specifico, per ogni record che arriva alla pipeline da un'origine o a una fase di una precedente, Dataflow garantisce quanto segue:

  • Il record viene elaborato senza andare perso.
  • Eventuali risultati dell'elaborazione che rimangono all'interno della pipeline vengono riportati al massimo una volta.

In altre parole, i record vengono elaborati almeno una volta e i risultati vengono impegnati esattamente una volta.

L'elaborazione "exactly-once" garantisce che i risultati siano accurati, senza record duplicati nell'output. Dataflow è ottimizzato per ridurre al minimo la latenza mantenendo la semantica "exactly-once". Tuttavia, l'elaborazione "exactly-once" comporta costi per l'esecuzione della deduplicazione. Per i casi d'uso che tollerano record duplicati, spesso puoi ridurre i costi e migliorare la latenza abilitando la modalità "Almeno una volta". Per ulteriori informazioni sulla scelta tra i flussi di dati "exactly-once" e "at-least-once", consulta Impostare la modalità di flusso della pipeline.

Dati in ritardo

L'elaborazione "exactly-once" garantisce l'accuratezza della pipeline: se la pipeline elabora un record, Dataflow garantisce che il record sia riportato nell'output e che il record non sia duplicato.

In una pipeline in modalità flusso, tuttavia, l'elaborazione "exactly-once" non può garantire che i risultati siano completati, perché i record potrebbero arrivare in ritardo. Ad esempio, supponi che la tua pipeline esegua un'aggregazione in una finestra temporale, come Count. Con l'elaborazione "exactly-once", il risultato è accurato per i record che arrivano all'interno della finestra in modo tempestivo, ma quelli in ritardo potrebbero essere eliminati.

In generale, non c'è modo di garantire la completezza in una pipeline in modalità flusso, perché in teoria i record possono arrivare arbitrariamente in ritardo. Nel caso limite, devi attendere in eterno per produrre un risultato. In pratica, Apache Beam consente di configurare la soglia per eliminare i dati in ritardo e quando emettere risultati aggregati. Per ulteriori informazioni, consulta Filigrane e dati in ritardo nella documentazione di Apache Beam.

Effetti collaterali

Non è garantito che gli effetti collaterali abbiano una semantica "exactly-once". È importante sottolineare che questo include la scrittura dell'output in un archivio esterno, a meno che il sink non implementi anche la semantica "exactly-once".

Nello specifico, Dataflow non garantisce che ogni record passi attraverso ogni trasformazione esattamente una volta. A causa di nuovi tentativi o errori del worker, Dataflow può inviare un record tramite una trasformazione più volte o anche contemporaneamente su più worker.

Nell'ambito dell'elaborazione "exactly-once", Dataflow deduplica gli output. Tuttavia, se il codice in una trasformazione presenta effetti collaterali, questi effetti potrebbero verificarsi più volte. Ad esempio, se una trasformazione effettua una chiamata di servizio remoto, questa chiamata potrebbe essere effettuata più volte per lo stesso record. Gli effetti collaterali possono persino portare alla perdita di dati in alcune situazioni. Ad esempio, supponiamo che una trasformazione legga un file per produrre l'output, quindi lo elimini immediatamente senza attendere il commit dell'output. Se si verifica un errore durante il commit del risultato, Dataflow riprova a eseguire la trasformazione, ma ora quest'ultima non riesce a leggere il file eliminato.

Logging

L'output di log dell'elaborazione indica che l'elaborazione è avvenuta, ma non indica se è stato eseguito il commit dei dati. Pertanto, i file di log potrebbero indicare che i dati sono stati elaborati più volte anche se i risultati dei dati elaborati sono impegnati nell'archiviazione permanente una sola volta. Inoltre, non sempre i log riflettono i dati elaborati e di cui è stato eseguito il commit. I log potrebbero essere eliminati a causa della limitazione o andare persi a causa di altri problemi del servizio di logging.

Streaming "exactly-once"

Questa sezione spiega in che modo Dataflow implementa l'elaborazione "exactly-once" per i job di flussi, incluso il modo in cui Dataflow gestisce complessità come l'elaborazione non deterministica, i dati in ritardo e il codice personalizzato.

Shuffling dei flussi di dati Dataflow

I job Dataflow in flusso vengono eseguiti su molti worker diversi in parallelo assegnando intervalli di lavoro a ciascun worker. Sebbene le assegnazioni possano cambiare nel tempo in risposta a errori del worker, scalabilità automatica o altri eventi, dopo ogni trasformazione GroupByKey, tutti i record con la stessa chiave vengono elaborati sullo stesso worker. La trasformazione GroupByKey viene spesso utilizzata da trasformazioni composte, come Count, FileIO e così via. Per garantire che i record di una determinata chiave finiscano sullo stesso worker, i worker Dataflow eseguono lo shuffling dei dati tra loro utilizzando chiamate di procedura remota (RPC).

Per garantire che i record non vadano persi durante lo shuffling, Dataflow utilizza il backup upstream. Con il backup upstream, il worker che invia i record dei record tenta di nuovo RPC fino a quando non riceve una conferma positiva della ricezione del record. Gli effetti collaterali dell'elaborazione del record sono impegnati nell'archiviazione permanente a valle. Se il worker che invia i record non è più disponibile, Dataflow continua a riprovare gli RPC, per garantire che ogni record venga consegnato almeno una volta.

Poiché questi nuovi tentativi potrebbero creare duplicati, ogni messaggio viene codificato con un ID univoco. Ogni destinatario archivia un catalogo di tutti gli ID che sono già stati visti ed elaborati. Quando viene ricevuto un record, Dataflow cerca il suo ID nel catalogo. Se l'ID viene trovato, il record è già stato ricevuto e sottoposto a commit e viene eliminato come duplicato. Per garantire che gli ID record siano stabili, ogni output, da un passaggio all'altro, viene sottoposto a checkpoint per l'archiviazione. Di conseguenza, se lo stesso messaggio viene inviato più volte a causa di chiamate RPC ripetute, viene impegnato nell'archiviazione una sola volta.

Garantire una bassa latenza

Affinché l'elaborazione "exactly-once" sia utilizzabile, l'I/O deve essere ridotto, in particolare impedendo l'I/O in ogni record. Per raggiungere questo obiettivo, Dataflow usa i filtri di Bloom e la garbage collection.

Filtri Fioritura

I filtri Bloom sono strutture dati compatte che consentono di impostare rapidamente i controlli di appartenenza. In Dataflow, ogni worker mantiene un filtro Bloom di ogni ID che vede. Quando arriva un nuovo ID record, il worker cerca l'ID nel filtro. Se il filtro restituisce false, il record non è un duplicato e il worker non cerca l'ID nello spazio di archiviazione stabile.

Dataflow mantiene un insieme di filtri Bloom in sequenza in bucket per tempo. Quando arriva un record, Dataflow sceglie il filtro appropriato da controllare in base al timestamp del sistema. Questo passaggio impedisce la saturazione dei filtri Bloom quando i filtri vengono garbage-collect e vincola la quantità di dati da analizzare all'avvio.

Garbage collection

Per evitare di riempire lo spazio di archiviazione con ID record, Dataflow utilizza la garbage collection per rimuovere i vecchi record. Dataflow usa il timestamp di sistema per calcolare una filigrana di garbage collection.

Questa filigrana si basa sulla quantità di tempo fisico di attesa in una determinata fase. Di conseguenza, fornisce anche informazioni sulle parti della pipeline che sono lente. Questi metadati costituiscono la base per la metrica Ritardo sistema mostrata nell'interfaccia di monitoraggio di Dataflow.

Se un record arriva con un timestamp precedente alla filigrana e se gli ID per il momento sono già stati garbage collection, il record viene ignorato. Poiché la bassa filigrana che attiva la garbage collection non avanza finché non vengono confermati i caricamenti dei record, questi record arrivati in ritardo sono duplicati.

Sorgenti non deterministiche

Dataflow utilizza l'SDK Apache Beam per leggere i dati nelle pipeline. Se l'elaborazione non riesce, Dataflow potrebbe riprovare a leggere da un'origine. In questa situazione, Dataflow deve garantire che ogni record univoco prodotto da un'origine venga registrato una sola volta. Per le origini deterministiche, come Pub/Sub Lite o Kafka, i record vengono letti in base a un offset registrato, mitigando la necessità di questo passaggio.

Poiché Dataflow non può assegnare automaticamente gli ID dei record, le origini non deterministiche devono indicare a Dataflow quali sono gli ID dei record per evitare duplicati. Quando un'origine fornisce ID univoci per ogni record, il connettore utilizza uno shuffling nella pipeline per rimuovere i duplicati. I record con lo stesso ID vengono esclusi. Per un esempio di come Dataflow implementa l'elaborazione "exactly-once" quando si utilizza Pub/Sub come origine, consulta la sezione Elaborazione "exactly-once" nella pagina Flussi di dati con Pub/Sub.

Quando esegui DoFn personalizzati all'interno della pipeline, Dataflow non garantisce che questo codice venga eseguito una sola volta per record. Per garantire l'elaborazione almeno una volta in caso di errori del worker, Dataflow può eseguire un determinato record tramite una trasformazione più volte oppure eseguire lo stesso record contemporaneamente su più worker. Se nella pipeline includi codice per eseguire operazioni come contattare un servizio esterno, le azioni potrebbero essere eseguite più di una volta per un determinato record.

Per rendere efficace l'elaborazione non deterministica, utilizza il checkpoint. Quando utilizzi il checkpoint, ogni output di una trasformazione viene sottoposto a un checkpoint dello spazio di archiviazione stabile con il suo ID univoco prima di essere consegnato alla fase successiva. I nuovi tentativi nella distribuzione shuffle di Dataflow inoltrano l'output che è stato controllato. Anche se il codice può essere eseguito più volte, Dataflow garantisce che venga archiviato l'output di una sola di queste esecuzioni. Dataflow usa un archivio coerente che impedisce la scrittura di duplicati in uno spazio di archiviazione stabile.

Distribuzione dell'output "exactly-once"

L'SDK Apache Beam include sink integrati progettati per garantire che non producano duplicati. Quando possibile, usa uno di questi sink integrati.

Se devi scrivere il tuo sink, l'approccio migliore consiste nel rendere l'oggetto della funzione idempotente in modo da poter essere ritentati tutte le volte che è necessario senza causare effetti collaterali indesiderati. Tuttavia, spesso alcuni componenti della trasformazione che implementano la funzionalità del sink non sono deterministici e potrebbero cambiare se ritentati.

Ad esempio, in un'aggregazione con finestra, l'insieme di record nella finestra potrebbe essere non deterministico. Nello specifico, la finestra potrebbe tentare di attivarsi con gli elementi e0, e1, e2. Il worker potrebbe arrestarsi in modo anomalo prima di eseguire il commit dell'elaborazione della finestra, ma non prima che gli elementi vengano inviati come effetto collaterale. Quando l'utente riavvia, la finestra si attiva di nuovo e arriva l'elemento e3 tardivo. Poiché questo elemento arriva prima del commit della finestra, non viene conteggiato come dati in ritardo, quindi DoFn viene richiamato con gli elementi e0, e1, e2, e3. Questi elementi vengono poi inviati all'operazione dell'effetto collaterale. L'idempotenza non è utile in questo scenario, perché ogni volta vengono inviati set di record logici diversi.

Per affrontare la non determinabilità in Dataflow, utilizza la trasformazione Reshuffle integrata. Quando Dataflow esegue lo shuffling dei dati, Dataflow scrive i dati in modo duraturo, in modo che tutti gli elementi generati in modo non deterministico siano stabili se si ritentano operazioni dopo lo shuffling. L'utilizzo della trasformazione Reshuffle garantisce che solo una versione dell'output di DoFn possa superare il limite di shuffling. Il seguente pattern garantisce che l'operazione effetto collaterale riceva sempre un record deterministico da generare:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Per assicurarti che l'esecutore di Dataflow sappia che gli elementi devono essere stabili prima di eseguire un DoFn, aggiungi l'annotazione RequiresStableInput a DoFn.

Scopri di più