Ciclo di vita della pipeline

Questa pagina fornisce una panoramica del ciclo di vita della pipeline dal codice della pipeline a un job Dataflow.

In questa pagina vengono spiegati i seguenti concetti:

  • Che cos'è un grafico di esecuzione e in che modo una pipeline Apache Beam diventa un job Dataflow
  • In che modo Dataflow gestisce gli errori
  • Dataflow parallelizza e distribuisce automaticamente la logica di elaborazione della pipeline ai worker
  • Ottimizzazioni dei job che Dataflow potrebbe apportare

Grafico di esecuzione

Quando esegui la pipeline Dataflow, Dataflow crea un grafico di esecuzione a partire dal codice che costruisce l'oggetto Pipeline, incluse tutte le trasformazioni e le funzioni di elaborazione associate, come gli oggetti DoFn. Questo è il grafico di esecuzione della pipeline, la fase è denominata tempo di creazione del grafico.

Durante la creazione dei grafici, Apache Beam esegue in locale il codice dal punto di ingresso principale del codice della pipeline, fermandosi in corrispondenza delle chiamate a un passaggio di origine, sink o trasformazione e trasforma queste chiamate in nodi del grafico. Di conseguenza, una porzione di codice nel punto di ingresso di una pipeline (metodo main di Java e Go o il livello principale di uno script Python) viene eseguita localmente sulla macchina che esegue la pipeline. Lo stesso codice dichiarato nel metodo di un oggetto DoFn viene eseguito nei worker Dataflow.

Ad esempio, l'esempio WordCount incluso negli SDK Apache Beam contiene una serie di trasformazioni per leggere, estrarre, contare, formattare e scrivere le singole parole in una raccolta di testo, insieme a un conteggio delle occorrenze per ogni parola. Il seguente diagramma mostra in che modo le trasformazioni nella pipeline di WordCount vengono espanse in un grafico di esecuzione:

Le trasformazioni nel programma di esempio WordCount espanse in un grafico di esecuzione dei passaggi che devono essere eseguiti dal servizio Dataflow.

Figura 1: grafico di esecuzione di esempio del conteggio di parole

Il grafico di esecuzione è spesso diverso dall'ordine in cui hai specificato le trasformazioni quando hai creato la pipeline. Questa differenza esiste perché il servizio Dataflow esegue varie ottimizzazioni e fusioni sul grafico di esecuzione prima dell'esecuzione su risorse cloud gestite. Il servizio Dataflow rispetta le dipendenze dei dati durante l'esecuzione della pipeline. Tuttavia, i passaggi senza dipendenze dei dati tra loro possono essere eseguiti in qualsiasi ordine.

Per visualizzare il grafico di esecuzione non ottimizzata generato da Dataflow per la tua pipeline, seleziona il job nell'interfaccia di monitoraggio di Dataflow. Per maggiori informazioni sulla visualizzazione dei job, consulta Utilizzare l'interfaccia di monitoraggio di Dataflow.

Durante la creazione dei grafici, Apache Beam verifica che tutte le risorse a cui fa riferimento la pipeline, come bucket Cloud Storage, tabelle BigQuery e argomenti o abbonamenti Pub/Sub, esistano e siano effettivamente accessibili. La convalida viene eseguita tramite chiamate API standard ai rispettivi servizi, pertanto è fondamentale che l'account utente utilizzato per eseguire una pipeline disponga di una connettività adeguata ai servizi necessari e che sia autorizzato a chiamare le API dei servizi. Prima di inviare la pipeline al servizio Dataflow, Apache Beam controlla anche la presenza di altri errori e si assicura che il grafico della pipeline non contenga operazioni illegali.

Il grafico di esecuzione viene quindi tradotto in formato JSON e il grafico di esecuzione JSON viene trasmesso all'endpoint del servizio Dataflow.

Il servizio Dataflow convalida quindi il grafico di esecuzione JSON. Una volta convalidato, il grafico diventa un job nel servizio Dataflow. Puoi visualizzare il job, il relativo grafico di esecuzione, lo stato e le informazioni di log utilizzando l'interfaccia di monitoraggio di Dataflow.

Java

Il servizio Dataflow invia una risposta alla macchina su cui esegui il programma Dataflow. Questa risposta viene incapsulata nell'oggetto DataflowPipelineJob, che contiene jobId del job Dataflow. Utilizza jobId per monitorare, monitorare e risolvere i problemi del job mediante l'interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow. Per ulteriori informazioni, consulta il riferimento API per DataflowPipelineJob.

Python

Il servizio Dataflow invia una risposta alla macchina su cui esegui il programma Dataflow. Questa risposta viene incapsulata nell'oggetto DataflowPipelineResult, che contiene job_id del job Dataflow. Utilizza job_id per monitorare, monitorare e risolvere i problemi del job mediante l'interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow.

Go

Il servizio Dataflow invia una risposta alla macchina su cui esegui il programma Dataflow. Questa risposta viene incapsulata nell'oggetto dataflowPipelineResult, che contiene jobID del job Dataflow. Utilizza jobID per monitorare, monitorare e risolvere i problemi del job mediante l'interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow.

La creazione di grafici avviene anche quando esegui la pipeline a livello locale, ma il grafico non viene tradotto in JSON né trasmesso al servizio. Il grafico viene invece eseguito localmente sulla stessa macchina in cui hai avviato il programma Dataflow. Per maggiori informazioni, consulta Configurazione di PipelineOptions per l'esecuzione locale.

Gestione degli errori e delle eccezioni

La pipeline potrebbe generare eccezioni durante l'elaborazione dei dati. Alcuni di questi errori sono temporanei, ad esempio problemi temporanei di accesso a un servizio esterno. Altri errori sono permanenti, come quelli causati da dati di input corrotti o non analizzabili, oppure puntatori nulli durante il calcolo.

Dataflow elabora gli elementi in bundle arbitrari e riprova il bundle completo quando viene generato un errore per qualsiasi elemento del bundle. Durante l'esecuzione in modalità batch, i bundle che includono un elemento con errori vengono ripetuti quattro volte. La pipeline non funziona completamente quando un singolo bundle ha avuto esito negativo per quattro volte. Quando è in esecuzione in modalità flusso, un bundle che include un elemento che presenta errori viene ripetuto a tempo indeterminato, il che potrebbe causare l'arresto permanente della pipeline.

Durante l'elaborazione in modalità batch, potresti notare un numero elevato di singoli errori prima che un job di pipeline fallisca completamente, il che si verifica quando un determinato bundle ha esito negativo dopo quattro nuovi tentativi. Ad esempio, se la tua pipeline tenta di elaborare 100 bundle, Dataflow potrebbe generare diverse centinaia di errori individuali fino a quando un singolo bundle non raggiunge la condizione di quattro errori per l'uscita.

Gli errori dei worker di avvio, come la mancata installazione di pacchetti sui worker, sono temporanei. Questo scenario comporta un numero indefinito di nuovi tentativi e potrebbe causare lo stallo permanente della pipeline.

Parallelizzazione e distribuzione

Il servizio Dataflow carica automaticamente in contemporanea e distribuisce la logica di elaborazione nella pipeline ai worker assegnati affinché eseguano il job. Dataflow utilizza le astrazioni nel modello di programmazione per rappresentare le funzioni di elaborazione parallela. Ad esempio, ParDo si trasforma in una pipeline e fa sì che Dataflow distribuisca automaticamente il codice di elaborazione, rappresentato dagli oggetti DoFn, a più worker da eseguire in parallelo.

Esistono due tipi di parallelismo dei job:

  • Il Parallelismo orizzontale si verifica quando i dati della pipeline vengono suddivisi ed elaborati su più worker contemporaneamente. L'ambiente di runtime Dataflow è basato su un pool di worker distribuiti. Una pipeline ha un potenziale parallelismo più elevato quando il pool contiene più worker, ma questa configurazione ha anche un costo più elevato. In teoria, il parallelismo orizzontale non ha un limite massimo. Tuttavia, Dataflow limita il pool di worker a 4000 worker per ottimizzare l'utilizzo delle risorse a livello di parco risorse.

  • Il Parallelismo verticale si verifica quando i dati della pipeline vengono suddivisi ed elaborati da più core della CPU sullo stesso worker. Ogni worker è alimentato da una VM di Compute Engine. Una VM può eseguire più processi per saturare tutti i core della CPU. Una VM con più core ha un potenziale parallelismo verticale più elevato, ma questa configurazione comporta costi più elevati. Un numero maggiore di core spesso determina un aumento dell'utilizzo della memoria, quindi il numero di core viene generalmente scalato insieme alla dimensione della memoria. Dato il limite fisico delle architetture informatiche, il limite superiore del parallelismo verticale è molto più basso del limite superiore del parallelismo orizzontale.

Parallelismo gestito

Per impostazione predefinita, Dataflow gestisce automaticamente il parallelismo dei job. Dataflow monitora le statistiche di runtime per il job, come l'utilizzo di CPU e memoria, per determinare come scalare il job. A seconda delle impostazioni del job, Dataflow può scalare i job orizzontalmente, chiamata Scalabilità automatica orizzontale o Scalabilità verticale. La scalabilità automatica per il parallelismo ottimizza il costo e le prestazioni dei job.

Per migliorare le prestazioni dei job, Dataflow ottimizza anche le pipeline internamente. Le ottimizzazioni più comuni sono ottimizzazione fusion e ottimizzazione combinata. Unendo i passaggi della pipeline, Dataflow elimina i costi inutili associati al coordinamento delle fasi in un sistema distribuito e all'esecuzione separata di ogni singolo passaggio.

Fattori che influenzano il parallelismo

I seguenti fattori influiscono sul funzionamento del parallelismo nei job Dataflow.

Origine di input

Quando un'origine di input non consente il parallelismo, la fase di importazione dell'origine di input può diventare un collo di bottiglia in un job Dataflow. Ad esempio, quando importi dati da un singolo file di testo compresso, Dataflow non può caricare in contemporanea i dati di input. Poiché la maggior parte dei formati di compressione non può essere divisa arbitrariamente in shard durante l'importazione, Dataflow deve leggere i dati in sequenza dall'inizio del file. La velocità effettiva complessiva della pipeline è rallentata dalla parte non parallela della pipeline. La soluzione a questo problema è usare un'origine di input più scalabile.

In alcuni casi, anche la step fusion riduce il parallelismo. Quando l'origine di input non consente il parallelismo, se Dataflow fonde il passaggio di importazione dati con i passaggi successivi e assegna questo passaggio unito a un singolo thread, l'intera pipeline potrebbe essere eseguita più lentamente.

Per evitare questo scenario, inserisci un passaggio Reshuffle dopo il passaggio di importazione dell'origine di input. Per ulteriori informazioni, consulta la sezione Impedire la fusione di questo documento.

Fanout predefinito e forma dei dati

Il fanout predefinito di un singolo passaggio di trasformazione può diventare un collo di bottiglia e limitare il parallelismo. Ad esempio, la trasformazione ParDo "con fan-out elevato" può causare la fusione per limitare la capacità di Dataflow di ottimizzare l'utilizzo dei worker. In un'operazione di questo tipo, potresti avere una raccolta di input con relativamente pochi elementi, ma ParDo produce un output con centinaia o migliaia di volte più elementi, seguito da un altro ParDo. Se il servizio Dataflow fonde insieme queste operazioni ParDo, il parallelismo in questo passaggio è limitato al massimo al numero di elementi nella raccolta di input, anche se l'elemento PCollection intermedio contiene molti più elementi.

Per potenziali soluzioni, consulta la sezione Impedire la fusione di questo documento.

Forma dei dati

La forma dei dati, che si tratti di dati di input o di dati intermedi, può limitare il parallelismo. Ad esempio, quando un passaggio GroupByKey su una chiave naturale, ad esempio una città, è seguito da un passaggio map o Combine, Dataflow fonde i due passaggi. Quando lo spazio chiave è piccolo, ad esempio, cinque città e una chiave è molto calcolata, ad esempio una grande città, la maggior parte degli elementi nell'output del passaggio GroupByKey viene distribuita in un unico processo. Questo processo diventa un collo di bottiglia e rallenta il job.

In questo esempio, puoi ridistribuire i risultati del passaggio GroupByKey in uno spazio di chiavi artificiale più grande anziché utilizzare le chiavi naturali. Inserisci un passaggio Reshuffle tra il passaggio GroupByKey e il passaggio map o Combine. Nel passaggio Reshuffle, crea lo spazio della chiave artificiale, ad esempio utilizzando una funzione hash, per superare il parallelismo limitato causato dalla forma dei dati.

Per ulteriori informazioni, consulta la sezione Impedire la fusione di questo documento.

Sink di output

Un sink è una trasformazione che scrive in un sistema di archiviazione dati esterno, ad esempio un file o un database. In pratica, i sink sono modellati e implementati come oggetti DoFn standard e vengono utilizzati per materializzare un PCollection ai sistemi esterni. In questo caso, PCollection contiene i risultati finali della pipeline. I thread che le API sink di chiamata possono essere eseguiti in parallelo per scrivere dati nei sistemi esterni. Per impostazione predefinita, non viene eseguito alcun coordinamento tra i thread. Senza un livello intermedio che esegua il buffer delle richieste di scrittura e il flusso di controllo, il sistema esterno può sovraccaricarsi e ridurre la velocità effettiva di scrittura. Lo scale up delle risorse con l'aggiunta di più parallelismo potrebbe rallentare ulteriormente la pipeline.

La soluzione a questo problema è ridurre il parallelismo nella fase di scrittura. Puoi aggiungere un passaggio GroupByKey subito prima del passaggio di scrittura. Il passaggio GroupByKey raggruppa i dati di output in un insieme più ridotto di batch per ridurre le chiamate RPC totali e le connessioni ai sistemi esterni. Ad esempio, utilizza GroupByKey per creare uno spazio hash di 50 punti dati su 1 milione.

Lo svantaggio di questo approccio è che introduce un limite di parallelismo impostato come hardcoded. Un'altra opzione è implementare il backoff esponenziale nel sink durante la scrittura dei dati. Questa opzione può fornire una limitazione del client al minimo indispensabile.

Monitora il parallelismo

Per monitorare il parallelismo, puoi utilizzare la console Google Cloud per visualizzare eventuali elementi in ritardo rilevati. Per maggiori informazioni, consulta Risolvere i problemi relativi agli elementi in ritardo nei job batch e Risolvere i problemi relativi agli elementi in ritardo nei job di elaborazione in modalità flusso.

Ottimizzazione di Fusion

Dopo aver convalidato la forma JSON del grafico di esecuzione della pipeline, il servizio Dataflow potrebbe modificare il grafico per eseguire ottimizzazioni. Le ottimizzazioni possono includere l'unione di più passaggi o trasformazioni nel grafico di esecuzione della pipeline in singoli passaggi. I passaggi di fusione impediscono al servizio Dataflow di dover materializzare ogni intermedio PCollection nella tua pipeline, il che può essere costoso in termini di memoria e overhead di elaborazione.

Sebbene tutte le trasformazioni specificate nella costruzione della pipeline vengano eseguite sul servizio, per garantire l'esecuzione più efficiente della pipeline, le trasformazioni potrebbero essere eseguite in un ordine diverso o nell'ambito di una trasformazione fusa più grande. Il servizio Dataflow rispetta le dipendenze dei dati tra i passaggi del grafico di esecuzione, ma altrimenti i passaggi potrebbero essere eseguiti in qualsiasi ordine.

Esempio di fusione

Il seguente diagramma mostra in che modo il grafico di esecuzione dell'esempio WordCount incluso nell'SDK Apache Beam per Java potrebbe essere ottimizzato e fuso dal servizio Dataflow per un'esecuzione efficiente:

Il grafico di esecuzione per il programma di esempio WordCount ottimizzato e con passaggi fusi dal servizio Dataflow.

Figura 2: grafico di esecuzione ottimizzata di esempio di conteggio di parole

Prevenzione della fusione

In alcuni casi, Dataflow potrebbe indovinare erroneamente il modo ottimale per fondere le operazioni nella pipeline, il che può limitare la capacità di Dataflow di utilizzare tutti i worker disponibili. In questi casi, puoi impedire a Dataflow di eseguire ottimizzazioni di fusione.

Puoi impedire la fase di fusione aggiungendo un'operazione alla pipeline che obbliga il servizio Dataflow a materializzare l'elemento PCollection intermedio. Valuta la possibilità di utilizzare una delle seguenti operazioni:

  • Inserisci un GroupByKey e separa il gruppo dopo i primi ParDo. Il servizio Dataflow non fonde mai ParDo operazioni in un'aggregazione.
  • Passa il tuo PCollection intermedio come input laterale a un altro ParDo. Il servizio Dataflow materializza sempre gli input laterali.
  • Inserisci un passaggio Reshuffle. Reshuffle impedisce la fusione, effettua controlli dei dati ed esegue la deduplicazione dei record. Il riordinamento è supportato da Dataflow anche se è contrassegnato come deprecato nella documentazione di Apache Beam.

Monitora fusione

Puoi accedere al grafico ottimizzato e alle fasi unite nella console Google Cloud, utilizzando gcloud CLI o l'API.

Console

Per visualizzare le fasi e i passaggi fusi del grafico nella console, nella scheda Dettagli esecuzione del job Dataflow, apri la visualizzazione grafico del flusso di lavoro Stage.

Per visualizzare i passaggi dei componenti fusi per una fase, fai clic sulla fase fusa nel grafico. Nel riquadro Informazioni fase, la riga Passaggi del componente mostra le fasi combinate. A volte le parti di una singola trasformazione composita sono fuse in più fasi.

gcloud

Per accedere al grafico ottimizzato e alle fasi unite utilizzando gcloud CLI, esegui questo comando gcloud:

  gcloud dataflow jobs describe --full JOB_ID --format json

Sostituisci JOB_ID con l'ID del job Dataflow.

Per estrarre i bit pertinenti, collega l'output del comando gcloud a jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Per visualizzare la descrizione delle fasi unite nel file di risposta di output, all'interno dell'array ComponentTransform, visualizza l'oggetto ExecutionStageSummary.

API

Per accedere al grafico ottimizzato e alle fasi unite utilizzando l'API, chiama project.locations.jobs.get.

Per visualizzare la descrizione delle fasi unite nel file di risposta di output, all'interno dell'array ComponentTransform, visualizza l'oggetto ExecutionStageSummary.

Combina l'ottimizzazione

Le operazioni di aggregazione sono un concetto importante nell'elaborazione dati su larga scala. L'aggregazione riunisce dati concettualmente lontani, rendendoli estremamente utili per la correlazione. Il modello di programmazione Dataflow rappresenta le operazioni di aggregazione come le trasformazioni GroupByKey, CoGroupByKey e Combine.

Le operazioni di aggregazione di Dataflow combinano i dati dell'intero set di dati, inclusi quelli che potrebbero essere distribuiti su più worker. Durante queste operazioni di aggregazione, spesso la soluzione più efficiente è combinare il maggior numero possibile di dati a livello locale prima di combinare i dati tra le istanze. Quando applichi una trasformazione GroupByKey o un'altra trasformazione di aggregazione, il servizio Dataflow esegue automaticamente la combinazione parziale a livello locale prima dell'operazione di raggruppamento principale.

Durante l'esecuzione di combinazioni parziali o multilivello, il servizio Dataflow prende decisioni diverse a seconda che la pipeline stia lavorando con dati in batch o in flussi. Per i dati limitati, il servizio favorisce l'efficienza e eseguerà la massima combinazione locale possibile. Per i dati non limitati, il servizio favorisce una latenza inferiore e potrebbe non eseguire la combinazione parziale, perché potrebbe aumentare la latenza.