Ciclo di vita della pipeline

Questa pagina fornisce una panoramica del ciclo di vita della pipeline dal codice della pipeline a un job Dataflow.

Questa pagina illustra i seguenti concetti:

  • Che cos'è un grafo di esecuzione e come una pipeline Apache Beam diventa un job Dataflow
  • Come Dataflow gestisce gli errori
  • In che modo Dataflow esegue automaticamente il parallelismo e la distribuzione della logica di elaborazione nella pipeline ai worker che eseguono il job
  • Ottimizzazioni dei job che potrebbero essere eseguite da Dataflow

Grafico di esecuzione

Quando esegui la pipeline Dataflow, Dataflow crea un grafo di esecuzione dal codice che genera l'oggetto Pipeline, incluse tutte le trasformazioni e le relative funzioni di elaborazione associate, come gli oggetti DoFn. Questo è il grafico di esecuzione della pipeline e la fase è chiamata tempo di costruzione del grafico.

Durante la costruzione del grafico, Apache Beam esegue localmente il codice dal punto di contatto principale del codice della pipeline, fermandosi alle chiamate a un passaggio di origine, destinazione o trasformazione e trasformando queste chiamate in nodi del grafico. Di conseguenza, un frammento di codice nel punto di contatto di una pipeline (metodo main Java e Go o il livello superiore di uno script Python) viene eseguito localmente sulla macchina che esegue la pipeline. Lo stesso codice dichiarato in un metodo di un oggetto DoFn viene eseguito nei worker di Dataflow.

Ad esempio, l'esempio WordCount incluso negli SDK Apache Beam contiene una serie di trasformazioni per leggere, estrarre, conteggiare, formattare e scrivere le singole parole in una raccolta di testo, insieme a un conteggio delle occorrenze per ogni parola. Il seguente diagramma mostra come le trasformazioni nella pipeline WordCount vengono espanse in un grafico di esecuzione:

Le trasformazioni nel programma di esempio WordCount sono state espanse in un grafo di esecuzione
di passaggi da eseguire dal servizio Dataflow.

Figura 1: grafico di esecuzione di esempio di WordCount

Il grafico di esecuzione spesso è diverso dall'ordine in cui hai specificato le trasformazioni durante la costruzione della pipeline. Questa differenza esiste perché il servizio Dataflow esegue varie ottimizzazioni e fusioni sul grafico di esecuzione prima di eseguirlo sulle risorse cloud gestite. Il servizio Dataflow rispetta le dipendenze dei dati durante l'esecuzione della pipeline. Tuttavia, i passaggi senza dipendenze di dati tra loro possono essere eseguiti in qualsiasi ordine.

Per visualizzare il grafico di esecuzione non ottimizzato generato da Dataflow per la pipeline, seleziona il job nell'interfaccia di monitoraggio di Dataflow. Per ulteriori informazioni sulla visualizzazione dei job, consulta Utilizzare l'interfaccia di monitoraggio di Dataflow.

Durante la costruzione del grafo, Apache Beam convalida che tutte le risorse a cui fa riferimento la pipeline, ad esempio i bucket Cloud Storage, le tabelle BigQuery e gli argomenti o gli abbonamenti Pub/Sub, esistano effettivamente e siano accessibili. La convalida viene eseguita tramite chiamate API standard ai rispettivi servizi, pertanto è fondamentale che l'account utente utilizzato per eseguire una pipeline abbia la connettività corretta ai servizi necessari e sia autorizzato a chiamare le API dei servizi. Prima di inviare la pipeline al servizio Dataflow, Apache Beam controlla anche la presenza di altri errori e assicura che il grafico della pipeline non contenga operazioni illegali.

Il grafo di esecuzione viene quindi tradotto in formato JSON e il grafo di esecuzione JSON viene trasmesso all'endpoint del servizio Dataflow.

Il servizio Dataflow convalida quindi il grafico di esecuzione JSON. Quando il grafico viene convalidato, diventa un job nel servizio Dataflow. Puoi visualizzare il job, il relativo grafico di esecuzione, lo stato e le informazioni dei log utilizzando l'interfaccia di monitoraggio di Dataflow.

Java

Il servizio Dataflow invia una risposta alla macchina su cui esegui il programma Dataflow. Questa risposta è incapsulata nell'oggetto DataflowPipelineJob, che contiene il jobId del tuo job Dataflow. Utilizza jobId per monitorare, monitorare e risolvere i problemi del tuo job utilizzando l'interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow. Per ulteriori informazioni, consulta il riferimento all'API per DataflowPipelineJob.

Python

Il servizio Dataflow invia una risposta alla macchina su cui esegui il programma Dataflow. Questa risposta è incapsulata nell'oggetto DataflowPipelineResult, che contiene il job_id del tuo job Dataflow. Utilizza job_id per monitorare, monitorare e risolvere i problemi del tuo job utilizzando l'interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow.

Go

Il servizio Dataflow invia una risposta alla macchina su cui esegui il programma Dataflow. Questa risposta è incapsulata nell'oggetto dataflowPipelineResult, che contiene il jobID del tuo job Dataflow. Utilizza jobID per monitorare, monitorare e risolvere i problemi del tuo job utilizzando l'interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow.

La costruzione del grafico avviene anche quando esegui la pipeline localmente, ma il grafico non viene tradotto in JSON o trasmesso al servizio. Il grafo viene invece eseguito localmente sulla stessa macchina su cui hai avviato il programma Dataflow. Per ulteriori informazioni, consulta la pagina sulla configurazione di PipelineOptions per l'esecuzione locale.

Gestione di errori ed eccezioni

La pipeline potrebbe generare eccezioni durante l'elaborazione dei dati. Alcuni di questi errori sono temporanei, ad esempio difficoltà temporanee di accesso a un servizio esterno. Altri errori sono permanenti, ad esempio quelli causati da dati di input danneggiati o non decodificabili o da puntatori null durante il calcolo.

Dataflow elabora gli elementi in pacchetti arbitrari e riprova il pacchetto completo quando viene generato un errore per un elemento del pacchetto. Quando viene eseguito in modalità batch, i bundle che includono un elemento con errori vengono riprovati quattro volte. La pipeline non va a buon fine quando un singolo bundle non va a buon fine quattro volte. Quando viene eseguito in modalità di streaming, un bundle che include un elemento con errori viene tentato nuovamente indefinitamente, il che potrebbe causare l'interruzione permanente della pipeline.

Durante l'elaborazione in modalità batch, potresti notare un numero elevato di singoli errori prima che un job della pipeline non riesca completamente, il che si verifica quando un determinato bundle non riesce dopo quattro tentativi di ripetizione. Ad esempio, se la pipeline tenta di elaborare 100 pacchetti, Dataflow potrebbe generare diverse centinaia di singoli errori finché un singolo pacchetto non raggiunge la condizione di quattro errori per l'uscita.

Gli errori dei worker all'avvio, come la mancata installazione dei pacchetti sui worker, sono temporanei. Questo scenario comporta ripetuti tentativi e potrebbe causare il blocco permanente della pipeline.

Parallelizzazione e distribuzione

Il servizio Dataflow esegue automaticamente il parallelismo e la distribuzione della logica di elaborazione nella pipeline ai worker che assegni per eseguire il job. Dataflow utilizza le astrazioni nel modello di programmazione per rappresentare funzioni di elaborazione parallela. Ad esempio, le trasformazioni ParDo in una pipeline fanno sì che Dataflow distribuisca automaticamente il codice di elaborazione, rappresentato dagli oggetti DoFn, a più worker da eseguire in parallelo.

Esistono due tipi di parallelismo dei job:

  • Il parallelismo orizzontale si verifica quando i dati della pipeline vengono suddivisi ed elaborati su più worker contemporaneamente. L'ambiente di runtime di Dataflow è basato su un pool di worker distribuiti. Una pipeline ha un potenziale parallelismo più elevato quando il pool contiene più worker, ma questa configurazione ha anche un costo più elevato. In teoria, il parallelismo horizontale non ha un limite superiore. Tuttavia, Dataflow limita il pool di worker a 4000 per ottimizzare l'utilizzo delle risorse a livello di parco.

  • Il parallelismo verticale si verifica quando i dati della pipeline vengono suddivisi ed elaborati da più core della CPU sullo stesso worker. Ogni worker è basato su una VM Compute Engine. Una VM può eseguire più processi per saturare tutti i suoi core della CPU. Una VM con più core ha un potenziale parallelismo verticale più elevato, ma questa configurazione comporta costi più elevati. Un numero maggiore di core spesso porta a un aumento dell'utilizzo della memoria, pertanto il numero di core viene solitamente scalato insieme alle dimensioni della memoria. Dato il limite fisico delle architetture dei computer, il limite superiore del parallelismo verticale è molto più basso del limite superiore del parallelismo orizzontale.

Parallelismo gestito

Per impostazione predefinita, Dataflow gestisce automaticamente il parallelismo dei job. Dataflow monitora le statistiche di runtime del job, ad esempio l'utilizzo della CPU e della memoria, per determinare come scalare il job. A seconda delle impostazioni del job, Dataflow può scalare i job orizzontalmente, indicata come scalabilità automatica orizzontale, o verticalmente, indicata come scalabilità verticale. La scalabilità automatica per il parallelismo ottimizza il costo e le prestazioni del job.

Per migliorare le prestazioni dei job, Dataflow ottimizza anche le pipeline internamente. Le ottimizzazioni più comuni sono l'ottimizzazione della fusione e l'ottimizzazione combinata. Unendo i passaggi della pipeline, Dataflow elimina i costi non necessari associati al coordinamento dei passaggi in un sistema distribuito e all'esecuzione di ogni singolo passaggio separatamente.

Fattori che influiscono sul parallelismo

I seguenti fattori influiscono sul funzionamento del parallelismo nei job Dataflow.

Origine di input

Quando un'origine di input non consente il parallelismo, il passaggio di importazione dell'origine di input può diventare un collo di bottiglia in un job Dataflow. Ad esempio, quando importi i dati da un singolo file di testo compresso, Dataflow non può eseguire il parallelismo dei dati di input. Poiché la maggior parte dei formati di compressione non può essere divisa arbitrariamente in frammenti durante l'importazione, Dataflow deve leggere i dati in sequenza dall'inizio del file. Il throughput complessivo della pipeline viene rallentato dalla parte non parallela della pipeline. La soluzione a questo problema consiste nell'utilizzare un'origine di input più scalabile.

In alcuni casi, l'unione dei passaggi riduce anche il parallelismo. Quando l'origine di input non consente il parallelismo, se Dataflow riunisce il passaggio di importazione dati con i passaggi successivi e assegna questo passaggio combinato a un singolo thread, l'intera pipeline potrebbe funzionare più lentamente.

Per evitare questo scenario, inserisci un passaggio Redistribute dopo il passaggio di importazione della fonte di input. Per saperne di più, consulta la sezione Impedire la fusione di questo documento.

Fanout e forma dei dati predefiniti

La distribuzione predefinita di un singolo passaggio di trasformazione può diventare un collo di bottiglia e limitare il parallelismo. Ad esempio, la trasformazione ParDo "ad albero" può causare la fusione per limitare la capacità di Dataflow di ottimizzare l'utilizzo dei worker. In una tale operazione, potresti avere una raccolta di input con relativamente pochi elementi, ma ParDo produce un output con centinaia o migliaia di volte più elementi, seguito da un altro ParDo. Se il servizio Dataflow unisce queste operazioni ParDo, il parallelismo in questo passaggio è limitato al massimo al numero di elementi nella raccolta di input, anche se la PCollection intermedia contiene molti più elementi.

Per potenziali soluzioni, consulta la sezione Impedire la fusione di questo documento.

Forma dei dati

La forma dei dati, che si tratti di dati di input o intermedi, può limitare il parallelismo. Ad esempio, quando un passaggio GroupByKey su una chiave naturale, ad esempio una città, è seguito da un passaggio map o Combine, Dataflow unisce i due passaggi. Quando lo spazio delle chiavi è ridotto, ad esempio cinque città, e una chiave è molto frequentemente utilizzata, ad esempio una grande città, la maggior parte degli elementi nell'output del GroupByKey passaggio viene distribuita a un processo. Questo processo diventa un collo di bottiglia e rallenta il job.

In questo esempio, puoi ridistribuire i risultati del passaggio GroupByKey in uno spazio di chiavi artificiali più grande anziché utilizzare le chiavi naturali. Inserisci un passaggio Redistribute tra il passaggio GroupByKey e il passaggio map o Combine. Nel passaggio Redistribute, crea lo spazio delle chiavi artificiali, ad esempio utilizzando una funzione hash, per superare il parallelismo limitato causato dalla forma dei dati.

Per saperne di più, consulta la sezione Impedire la fusione di questo documento.

Sink di output

Un sink è una trasformazione che scrive in un sistema di archiviazione dati esterno, ad esempio un file o un database. In pratica, gli sink vengono modellati e implementati come oggetti DoFn standard e vengono utilizzati per materializzare un PCollection in sistemi esterni. In questo caso, PCollection contiene i risultati finali della pipeline. I thread che chiamano le API di destinazione possono essere eseguiti in parallelo per scrivere i dati nei sistemi esterni. Per impostazione predefinita, non viene eseguito alcun coordinamento tra i thread. Senza un livello intermedio per mettere in coda le richieste di scrittura e controllare il flusso, il sistema esterno può sovraccaricarsi e ridurre il throughput di scrittura. L'aumento delle risorse tramite l'aggiunta di un maggiore parallelismo potrebbe rallentare ulteriormente la pipeline.

La soluzione a questo problema consiste nel ridurre il parallelismo nel passaggio di scrittura. Puoi aggiungere un passaggio GroupByKey subito prima del passaggio di scrittura. Il GroupByKey passaggio raggruppa i dati di output in un insieme più piccolo di batch per ridurre le chiamate RPC e le connessioni ai sistemi esterni. Ad esempio, utilizza un GroupByKey per creare un spazio hash di 50 punti dati su 1 milione.

Lo svantaggio di questo approccio è che introduce un limite hardcoded al parallelismo. Un'altra opzione è implementare il backoff esponenziale nel sink durante la scrittura degli dati. Questa opzione può fornire una limitazione del client minima.

Monitorare il parallelismo

Per monitorare il parallelismo, puoi utilizzare la console Google Cloud per visualizzare eventuali elementi in ritardo rilevati. Per ulteriori informazioni, consulta Risolvere i problemi relativi ai job in ritardo nei job batch e Risolvere i problemi relativi ai job in ritardo nei job di streaming.

Ottimizzazione della fusione

Dopo aver convalidato il formato JSON del grafo di esecuzione della pipeline, il servizio Dataflow potrebbe modificare il grafo per eseguire ottimizzazioni. Le ottimizzazioni possono includere l'unione di più passaggi o trasformazioni nel grafico di esecuzione della pipeline in singoli passaggi. L'unione dei passaggi impedisce al servizio Dataflow di dover materializzare ogni PCollection intermedio nella pipeline, il che può essere costoso in termini di memoria e overhead di elaborazione.

Sebbene tutte le trasformazioni specificate durante la creazione della pipeline vengano eseguite sul servizio, per garantire l'esecuzione più efficiente della pipeline, le trasformazioni potrebbero essere eseguite in un ordine diverso o nell'ambito di una trasformazione combinata più grande. Il servizio Dataflow rispetta le dipendenze dei dati tra i passaggi nel grafo di esecuzione, ma in caso contrario i passaggi potrebbero essere eseguiti in qualsiasi ordine.

Esempio di fusione

Il seguente diagramma mostra come il grafo di esecuzione dell'esempio WordCount incluso nell'SDK Apache Beam per Java possa essere ottimizzato e unito dal servizio Dataflow per un'esecuzione efficiente:

Il grafo di esecuzione del programma di esempio WordCount ottimizzato e con i passaggi uniti
dal servizio Dataflow.

Figura 2: grafico di esecuzione ottimizzato di WordCount

Impedire la fusione

In alcuni casi, Dataflow potrebbe indovinare erroneamente il modo ottimale per unire le operazioni nella pipeline, il che può limitare la capacità di Dataflow di utilizzare tutti i worker disponibili. In questi casi, puoi fornire un suggerimento a Dataflow per ridistribuire i dati utilizzando una trasformazione Redistribute.

Per aggiungere una trasformazione Redistribute, chiama uno dei seguenti metodi:

  • Redistribute.arbitrarily: indica che i dati potrebbero essere sbilanciati. Dataflow sceglie l'algoritmo migliore per ridistribuire i dati.

  • Redistribute.byKey: indica che è probabile che un PCollection di coppie chiave-valore sia sbilanciato e debba essere ridistribuito in base alle chiavi. In genere, Dataflow colloca tutti gli elementi di una singola chiave nello stesso thread di lavoro. Tuttavia, la co-localizzazione delle chiavi non è garantita e gli elementi vengono elaborati in modo indipendente.

Se la pipeline contiene una trasformazione Redistribute, in genere Dataflow impedisce la fusione dei passaggi precedenti e successivi alla trasformazione Redistribute e mescola i dati in modo che i passaggi a valle della trasformazione Redistribute abbiano un parallelismo più ottimale.

Fusione dei monitor

Puoi accedere al grafo ottimizzato e alle fasi fuse nella console Google Cloud , utilizzando gcloud CLI o l'API.

Console

Per visualizzare le fasi e i passaggi uniti del grafico nella console, nella scheda Dettagli esecuzione del tuo job Dataflow, apri la visualizzazione del grafico Flusso di lavoro delle fasi.

Per visualizzare i passaggi dei componenti uniti per una fase, fai clic sulla fase unita nel grafico. Nel riquadro Informazioni sulla fase, la riga Passaggi del componente visualizza le fasi fuse. A volte parti di una singola trasformazione composita vengono fuse in più fasi.

gcloud

Per accedere al grafo ottimizzato e alle fasi fuse utilizzando gcloud CLI, esegui il seguente comando gcloud:

  gcloud dataflow jobs describe --full JOB_ID --format json

Sostituisci JOB_ID con l'ID del tuo job Dataflow.

Per estrarre i bit pertinenti, invia l'output del comando gcloud a jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Per visualizzare la descrizione delle fasi fuse nel file di risposta di output, consulta l'oggetto ExecutionStageSummary all'interno dell'array ComponentTransform.

API

Per accedere al grafico ottimizzato e alle fasi fuse utilizzando l'API, chiama project.locations.jobs.get.

Per visualizzare la descrizione delle fasi fuse nel file di risposta di output, consulta l'oggetto ExecutionStageSummary all'interno dell'array ComponentTransform.

Ottimizzazione combinata

Le operazioni di aggregazione sono un concetto importante nell'elaborazione dei dati su larga scala. L'aggregazione riunisce dati concettualmente molto distanti, il che la rende estremamente utile per la correlazione. Il modello di programmazione Dataflow rappresenta le operazioni di aggregazione come le trasformazioni GroupByKey, CoGroupByKey e Combine.

Le operazioni di aggregazione di Dataflow combinano i dati dell'intero set di dati, inclusi quelli che potrebbero essere distribuiti su più worker. Durante queste operazioni di aggregazione, spesso è più efficiente combinare il maggior numero possibile di dati localmente prima di combinarli tra le istanze. Quando applichi una trasformazione GroupByKey o un'altra trasformazione di aggregazione, il servizio Dataflow esegue automaticamente la combinazione parziale localmente prima dell'operazione di raggruppamento principale.

Quando esegui una combinazione parziale o a più livelli, il servizio Dataflow prende decisioni diverse a seconda che la pipeline utilizzi dati batch o in streaming. Per i dati con limiti, il servizio favorisce l'efficienza e svolge il maggior numero possibile di combinazioni locali. Per i dati illimitati, il servizio favorisce una latenza più bassa e potrebbe non eseguire la combinazione parziale, in quanto potrebbe aumentare la latenza.