"exactly-once" in Dataflow

Dataflow supporta l'elaborazione "exactly-once" dei record. Questa pagina spiega in che modo Dataflow implementa l'elaborazione "exactly-once" garantendo anche una bassa latenza.

Panoramica

Le pipeline batch utilizzano sempre l'elaborazione "exactly-once". Le pipeline di flusso utilizzano l'elaborazione "exactly-once" per impostazione predefinita, ma può anche elaborazione "at-least-once".

L'elaborazione "exactly-once" fornisce garanzie sui risultati dell'elaborazione inclusi i risultati di ogni fase della pipeline. Nello specifico, per ogni che arriva alla pipeline da una sorgente o a una fase da un fase precedente, Dataflow garantisce quanto segue:

  • Il record viene elaborato senza andare perso.
  • Eventuali risultati dell'elaborazione che rimangono all'interno della pipeline vengono mostrati al massimo una volta sola.

In altre parole, i record vengono elaborati almeno una volta e i risultati una sola volta.

L'elaborazione "exactly-once" garantisce che i risultati siano accurati, senza duplicati record nell'output. Dataflow è ottimizzato per ridurre al minimo la latenza mantenendo la semantica "exactly-once". Tuttavia, l'elaborazione "exactly-once" comporta costi per l'esecuzione della deduplicazione. Per casi d'uso che possono tollerare i duplicati spesso è possibile ridurre i costi e migliorare la latenza abilitando l'opzione . Per ulteriori informazioni sulla scelta tra "exactly-once" e "Almeno una volta" i flussi di dati, vedi Imposta la modalità flusso di dati della pipeline.

Dati in ritardo

L'elaborazione "exactly-once" garantisce la accuratezza della pipeline: se la pipeline elabora un record, Dataflow garantisce che questo venga riflesso nell'output e che il record non sia duplicato.

In una pipeline in modalità flusso, tuttavia, l'elaborazione "exactly-once" non può garantire che i risultati sono complete perché i record potrebbero arrivare in ritardo. Ad esempio: supponi che la tua pipeline esegua un'aggregazione in una finestra temporale, ad esempio Count. Con l'elaborazione "exactly-once", il risultato è preciso per i record che arrivano all'interno della finestra in modo tempestivo, ma quelli in ritardo potrebbero è caduto.

In genere, non c'è modo di garantire la completezza in una pipeline in modalità flusso, perché in teoria i record possono arrivare arbitrariamente in ritardo. Nel caso limite, deve aspettare in eterno per produrre un risultato. In pratica, Apache Beam consente di configurare la soglia per eliminare i dati in ritardo quando emettere risultati aggregati. Per ulteriori informazioni, vedi Filigrane e dati in ritardo nella documentazione di Apache Beam.

Effetti collaterali

Non è garantito che gli effetti collaterali abbiano una semantica "exactly-once". È importante sottolineare che include la scrittura dell'output in un archivio esterno, a meno che il sink implementa la semantica "exactly-once".

Nello specifico, Dataflow non garantisce che ogni record venga inviato per ogni trasformazione esattamente una volta. A causa di nuovi tentativi o errori del worker, Dataflow potrebbe inviare più volte un record attraverso una trasformazione, o anche contemporaneamente su più worker.

Nell'ambito dell'elaborazione "exactly-once", Dataflow deduplica come output. Tuttavia, se il codice di una trasformazione presenta degli effetti collaterali, potrebbe verificarsi più volte. Ad esempio, se una trasformazione rende un servizio remoto questa chiamata può essere effettuata più volte per lo stesso record. Effetti collaterali può causare la perdita di dati in alcune situazioni. Ad esempio, supponiamo che Transform legge un file per produrre l'output, quindi lo elimina immediatamente senza attendere il commit dell'output. Se si verifica un errore quando eseguendo il commit del risultato, Dataflow riprova a eseguire la trasformazione, ma ora transform non riesce a leggere il file eliminato.

Logging

L'output di log dell'elaborazione indica che l'elaborazione è avvenuta, ma non indicano se è stato eseguito il commit dei dati. Pertanto, i file di log potrebbero indicare che i dati sono stati elaborati più volte anche se i risultati delle i dati vengono impegnati nell'archiviazione permanente solo una volta. Inoltre, i log non riflettono sempre i dati elaborati e impegnati. I log potrebbero essere eliminati a causa limitazione o perdita a causa di altri problemi del servizio di logging.

Streaming "exactly-once"

Questa sezione spiega come Dataflow esegue l'implementazione "exactly-once" per i job di flussi di dati, incluso il modo in cui Dataflow gestisce complessità come l'elaborazione non deterministica, i dati in ritardo e il codice personalizzato.

Shuffling dei flussi di dati Dataflow

I job Dataflow in modalità flusso vengono eseguiti su molti worker diversi in parallelo assegnando intervalli di lavoro a ciascun worker. Anche se i compiti potrebbero cambiare tempo in risposta a errori del worker, scalabilità automatica o altri eventi, dopo ogni trasformazione GroupByKey, tutti i record con la stessa chiave elaborati sullo stesso worker. La trasformazione GroupByKey viene spesso utilizzata da elementi trasformazioni come Count, FileIO e così via. Per garantire che i record di una determinata chiave finiscano sullo stesso worker, i worker Dataflow eseguono lo shuffling dei dati tra loro mediante chiamate di procedura remota (RPC).

Per garantire che i record non vadano persi durante lo shuffling, Dataflow utilizza il backup upstream. Con il backup upstream, il worker che invia i nuovi tentativi dei record RPC finché non riceve una conferma positiva della ricezione del record. Gli effetti collaterali dell'elaborazione del record sono di archiviazione a valle. Se il worker che invia i record diventa non disponibile, Dataflow continua a riprovare RPC, per garantire che ogni record venga pubblicato almeno una volta.

Poiché questi nuovi tentativi potrebbero creare duplicati, ogni messaggio è codificato con un univoco. Ogni destinatario archivia un catalogo di tutti gli ID che sono già stati visti ed elaborati. Quando viene ricevuto un record, Dataflow cerca il suo ID nel catalogo. Se l'ID viene trovato, il record è già stato ricevuto viene eseguito come duplicato. Per assicurarti che gli ID record sono stabili, ogni output, da un passaggio all'altro, viene sottoposto a controlli di archiviazione. Di conseguenza, se lo stesso inviato più volte a causa di chiamate RPC ripetute, viene inviato nello spazio di archiviazione una sola volta.

Garantire una bassa latenza

Affinché l'elaborazione "exactly-once" sia utilizzabile, l'I/O deve essere ridotto, in particolare impedendo l'I/O su ogni record. Per raggiungere questo obiettivo, Dataflow utilizza i filtri Bloom e la garbage collection.

Filtri Fioritura

Filtri Bloom sono strutture di dati compatte che consentono rapidi controlli di impostazione delle iscrizioni. Nel Dataflow, ogni worker mantiene un filtro Bloom per ogni ID che vede. Quando arriva un nuovo ID record, il worker cerca l'ID nel filtro. Se il filtro restituisce false, il record non è un duplicato e il worker non cerca l'ID nello spazio di archiviazione stabile.

Dataflow mantiene un insieme di filtri Bloom in sequenza in bucket per tempo. Quando arriva un record, Dataflow sceglie da verificare in base al timestamp di sistema. Questo passaggio impedisce al Bloom filtri dalla saturazione man mano che i filtri vengono garbage-collect e inoltre vincola la da analizzare all'avvio.

Garbage collection

Per evitare di riempire lo spazio di archiviazione con ID record, Dataflow utilizza la garbage per rimuovere i vecchi record. Dataflow utilizza il sistema timestamp per calcolare una filigrana di garbage collection.

Questa filigrana si basa sulla quantità di tempo fisico di attesa di una determinata fase. Di conseguenza, fornisce anche informazioni su quali parti della pipeline sono lento. Questi metadati costituiscono la base per la metrica Tempo di sistema Interfaccia di monitoraggio di Dataflow.

Se arriva un record con un timestamp precedente alla filigrana e se gli ID per il momento attuale sono già garbage collection, il record viene ignorato. Poiché la filigrana bassa attiva la garbage collection non avanza finché non vengono riconosciuti, questi record arrivati in ritardo sono duplicati.

Sorgenti non deterministiche

Dataflow utilizza l'SDK Apache Beam per leggere i dati nelle pipeline. Se l'elaborazione non riesce, Dataflow potrebbe riprovare a leggere da un'origine. Nel tale situazione, Dataflow deve garantire che ogni record univoco prodotti da una fonte vengono registrati esattamente una volta. Per le fonti deterministiche, come come Pub/Sub Lite o Kafka, i record vengono letti in base a un offset registrato, per ridurre la necessità di questo passaggio.

Poiché Dataflow non può assegnare automaticamente gli ID record, le origini non deterministiche devono indicare a Dataflow gli ID record al fine di evitare duplicati. Quando una fonte fornisce ID univoci per ogni record, il connettore usa uno shuffling della pipeline per rimuovere i duplicati. I record con lo stesso ID vengono esclusi. Per un esempio di come Dataflow esegue l'implementazione "exactly-once" quando utilizzi Pub/Sub come origine, consulta le Elaborazione "exactly-once" della pagina Flussi di dati con Pub/Sub.

Quando esegui DoFn personalizzati all'interno della pipeline, Dataflow non garantisce che questo codice venga eseguito una sola volta per record. Per garantire l'elaborazione "at-least-once" in caso di errori del worker, Dataflow può eseguire un determinato record attraverso una più volte oppure eseguire lo stesso record contemporaneamente su più worker. Se includi nella pipeline codice che, ad esempio, contatta un servizio esterno, le azioni potrebbero essere eseguite più di una volta per un determinato record.

Per rendere efficace l'elaborazione non deterministica, usa checkpoint. Quando utilizzi i checkpoint, ogni output viene eseguito il checkpoint di una trasformazione verso uno spazio di archiviazione stabile con il suo ID univoco prima di passare alla fase successiva. Nuovi tentativi nel Distribuzione shuffling di Dataflow inoltra l'output che è stato controllato. Anche se il codice può essere eseguito più volte, Dataflow garantisce che viene archiviato l'output di una sola di queste esecuzioni. Dataflow utilizza un archivio coerente che impedisce ai duplicati di vengono scritte nello spazio di archiviazione stabile.

Distribuzione dell'output "exactly-once"

L'SDK Apache Beam include lavandini integrati progettati per garantire che non producano duplicati. Se possibile, utilizza una di queste lavandini integrati.

Per scrivi il tuo sink, l'approccio migliore consiste nel rendere l'oggetto funzione idempotente in modo da poter riprovare ogni volta che serve senza causare effetti collaterali indesiderati. Tuttavia, spesso è presente qualche componente della trasformazione che implementa la funzionalità del sink non è deterministica e potrebbe cambiare se viene ritentato.

Ad esempio, in un'aggregazione con finestra, l'insieme di record al suo interno potrebbe non essere deterministici. In particolare, la finestra potrebbe tentare di accendersi con elemento e0, e1, e2. Il worker potrebbe arrestarsi in modo anomalo prima di eseguire il commit della finestra ma non prima che questi elementi vengano inviati come effetto collaterale. Quando il worker si riavvia, la finestra si attiva di nuovo e arriva un elemento finale e3. Poiché questo elemento arriva prima del commit della finestra, non viene conteggiato come in ritardo pertanto, DoFn viene richiamato con gli elementi e0, e1, e2, e3. Questi elementi vengono quindi inviati all'operazione effetto collaterale. L'idempotenza non aiuta in questo scenario, vengono inviati ogni volta set di record logici diversi.

Per affrontare la non determinabilità in Dataflow, utilizza l'Reshuffle integrata e trasformerai automaticamente. Quando Dataflow esegue lo shuffling dei dati, Dataflow scrive in modo che tutti gli elementi generati non deterministici siano stabili se e operazioni vengono tentate di nuovo dopo lo shuffle. Utilizzo delle garanzie di trasformazione Reshuffle che solo una versione dell'output di DoFn può superare il limite di shuffling. Il seguente pattern garantisce che l'operazione effetto collaterale riceva sempre un record deterministico in output:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

per assicurarti che il runner Dataflow sappia che gli elementi devono essere stabile prima di eseguire un DoFn, aggiungi RequiresStableInput di lettura per DoFn.

Scopri di più