Ciclo di vita della pipeline

Questa pagina fornisce una panoramica del ciclo di vita della pipeline, dal codice alla del job Dataflow.

Questa pagina illustra i seguenti concetti:

  • Che cos'è un grafico di esecuzione e come una pipeline Apache Beam diventa Job Dataflow
  • In che modo Dataflow gestisce gli errori
  • In che modo Dataflow esegue automaticamente il parallelismo e la distribuzione della logica di elaborazione nella pipeline ai worker che eseguono il job
  • Ottimizzazioni dei job che Dataflow potrebbe rendere

Grafico di esecuzione

Quando esegui la pipeline Dataflow, Dataflow crea un grafico di esecuzione dal codice che costruisce l'oggetto Pipeline, incluse tutte le trasformazioni e le relative funzioni di elaborazione associate, come oggetti DoFn. Questo è il grafico di esecuzione della pipeline, dove la fase viene chiamata tempo di creazione del grafico.

Durante la creazione del grafico, Apache Beam esegue localmente il codice il principale punto di ingresso del codice della pipeline, fermandosi alle chiamate a origini, o di trasformazione, trasformando queste chiamate in nodi del grafico. Di conseguenza, una porzione di codice nel punto di ingresso di una pipeline (Java e Go main o il livello più alto di uno script Python) viene eseguito localmente sulla macchina esegue la pipeline. Lo stesso codice dichiarato in un metodo di un oggetto DoFn viene eseguita nei worker Dataflow.

Ad esempio, il valore WordCount incluso negli SDK Apache Beam, contiene una serie di trasformazioni leggere, estrarre, contare, formattare e scrivere le singole parole di una raccolta di testo, insieme a un conteggio di occorrenze per ogni parola. Il seguente diagramma mostra come le trasformazioni nella pipeline WordCount vengono espanse in un grafo di esecuzione:

Le trasformazioni nel programma di esempio WordCount sono state espanse in un grafico di esecuzione
di passi da eseguire dal servizio Dataflow.

Figura 1: grafico di esecuzione di esempio di conteggio parole

Il grafico di esecuzione è spesso diverso dall'ordine in cui hai specificato i quando crei la pipeline. Questa differenza esiste perché Il servizio Dataflow esegue varie ottimizzazioni e fusioni di esecuzione prima che venga eseguito su risorse cloud gestite. La Il servizio Dataflow rispetta le dipendenze dei dati durante l'esecuzione una pipeline o un blocco note personalizzato. Tuttavia, i passaggi senza dipendenze tra i dati possono essere eseguiti in qualsiasi ordine.

Per vedere il grafico di esecuzione non ottimizzato di Dataflow generati per la pipeline, seleziona il job Interfaccia di monitoraggio di Dataflow. Per ulteriori informazioni sulla visualizzazione offerte di lavoro, consulta Utilizzare l'interfaccia di monitoraggio di Dataflow.

Durante la costruzione del grafo, Apache Beam convalida che tutte le risorse a cui fa riferimento la pipeline, ad esempio i bucket Cloud Storage, le tabelle BigQuery e gli argomenti o gli abbonamenti Pub/Sub, esistano effettivamente e siano accessibili. La convalida viene eseguita tramite di chiamate API standard ai rispettivi servizi, quindi è fondamentale che l'utente L'account utilizzato per eseguire una pipeline disponga di una connettività adeguata ai servizi necessari ed è autorizzato a chiamare le API dei servizi. Prima di inviare la pipeline nel servizio Dataflow, Apache Beam controlla anche altri errori, e garantisce che il grafico della pipeline non contenga operazioni illegali.

Il grafico di esecuzione viene quindi tradotto in formato JSON e l'esecuzione in JSON viene trasmesso all'endpoint del servizio Dataflow.

Il servizio Dataflow convalida quindi il grafico di esecuzione JSON. Quando il grafico viene convalidato, diventa un job nel servizio Dataflow. Puoi visualizzare il job, il relativo grafico di esecuzione, lo stato e il log utilizzando l'interfaccia di monitoraggio di Dataflow.

Java

Il servizio Dataflow invia una risposta alla macchina su cui esegui il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto DataflowPipelineJob, che contiene il jobId del tuo job Dataflow. Utilizza jobId per monitorare, tracciare e risolvere i problemi del job tramite l' Interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow. Per ulteriori informazioni, consulta Riferimento API per DataflowPipelineJob.

Python

Il servizio Dataflow invia una risposta alla macchina su cui esegui il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto DataflowPipelineResult, che contiene il valore job_id del tuo job Dataflow. Utilizza l'job_id per monitorare e tracciare il job, nonché per risolvere eventuali problemi utilizzando Interfaccia di monitoraggio di Dataflow e ai Interfaccia a riga di comando di Dataflow.

Vai

Il servizio Dataflow invia una risposta alla macchina su cui esegui il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto dataflowPipelineResult, che contiene il valore jobID del tuo job Dataflow. Utilizza l'jobID per monitorare e tracciare il job, nonché per risolvere eventuali problemi utilizzando Interfaccia di monitoraggio di Dataflow e ai Interfaccia a riga di comando di Dataflow.

La creazione del grafico avviene anche quando esegui la pipeline localmente, ma il grafico non viene tradotto in JSON né trasmesso al servizio. Il grafico viene invece eseguito localmente sulla stessa macchina in cui hai avviato programma Dataflow. Per ulteriori informazioni, consulta la pagina sulla configurazione di PipelineOptions per l'esecuzione locale.

Gestione di errori ed eccezioni

La pipeline potrebbe generare eccezioni durante l'elaborazione dei dati. Alcuni di questi errori sono temporanei, ad esempio difficoltà temporanee di accesso a un servizio esterno. Altro gli errori sono permanenti, ad esempio errori causati da dati di input corrotti o non analizzabili, o puntatori nulli durante il calcolo.

Dataflow elabora gli elementi in pacchetti arbitrari e riprova il pacchetto completo quando viene generato un errore per un elemento del pacchetto. Quando in modalità batch, i bundle che includono un elemento con errori vengono ritentati quattro volte. Si verifica un errore completo della pipeline quando un singolo bundle presenta errori per quattro volte. Quando in esecuzione in modalità flusso di dati, viene ritentato un bundle che include un elemento con errori a tempo indeterminato, il che potrebbe causare il blocco permanente della pipeline.

Durante l'elaborazione in modalità batch, potresti notare singoli errori prima di un errore completo di un job della pipeline, che si verifica quando un determinato bundle fallisce dopo quattro tentativi. Ad esempio, se la tua pipeline tenta di elaborare 100 pacchetti, Dataflow generare diverse centinaia di singoli errori finché un singolo bundle non raggiunge condizione "4-failure" per l'uscita.

Gli errori del worker di avvio, come la mancata installazione di pacchetti sui worker, sono: transitorio. Questo scenario comporta nuovi tentativi indefiniti e potrebbe causare la pipeline si blocchi definitivamente.

Parallelizzazione e distribuzione

Il servizio Dataflow lo parallelizza e distribuisce automaticamente la logica di elaborazione della pipeline ai worker assegnati per l'esecuzione il tuo lavoro. Dataflow utilizza le astrazioni modello di programmazione per rappresentare funzioni di elaborazione parallela. Ad esempio, ParDo trasforma in una pipeline fanno sì che Dataflow distribuisca automaticamente il codice di elaborazione, rappresentati da DoFn oggetti, a più worker da eseguire in parallelo.

Esistono due tipi di parallelismo dei lavori:

  • Il parallelismo orizzontale si verifica quando i dati della pipeline vengono suddivisi ed elaborati più worker contemporaneamente. Il runtime di Dataflow è alimentato da un pool di worker distribuiti. Una pipeline ha un parallelismo potenziale più alto quando il pool contiene più worker, ma anche questa configurazione ha un costo maggiore. In teoria, il parallelismo horizontale non ha un limite superiore. Tuttavia, Dataflow limita il pool di worker a 4000 per ottimizzare l'utilizzo delle risorse a livello di parco.

  • Il parallelismo verticale si verifica quando i dati della pipeline vengono suddivisi ed elaborati da più core della CPU sullo stesso worker. Ogni worker è alimentato da un alla VM di Compute Engine. Una VM può eseguire più processi per saturare tutti i suoi CPU Cores. Una VM con più core ha un potenziale più elevato parallelismo verticale, ma questa configurazione comporta costi più alti. Spesso è necessario un numero di core più alto comporta un aumento dell'utilizzo della memoria, quindi il numero di core solitamente è scalate insieme alle dimensioni della memoria. Dato il limite fisico del computer di architetture, il limite superiore di parallelismo verticale è molto più basso limite superiore del parallelismo orizzontale.

Parallelismo gestito

Per impostazione predefinita, Dataflow gestisce automaticamente il parallelismo dei job. Dataflow monitora le statistiche di runtime per il job, ad esempio la CPU e memoria utilizzata per determinare come scalare il job. A seconda delle impostazioni del job, Dataflow può eseguire lo scaling dei job in orizzontale, definito scalabilità automatica orizzontale, o in verticale, definito scalabilità verticale. La scalabilità automatica per il parallelismo ottimizza i costi del job le prestazioni lavorative.

Per migliorare le prestazioni dei job, Dataflow ottimizza anche le pipeline internamente. Le ottimizzazioni tipiche sono l'ottimizzazione della fusione e combinare l'ottimizzazione. Unendo i passaggi della pipeline, Dataflow elimina i costi non necessari associati al coordinamento dei passaggi in un sistema distribuito e all'esecuzione di ogni singolo passaggio separatamente.

Fattori che influiscono sul parallelismo

I seguenti fattori influenzano il funzionamento del parallelismo di job Dataflow.

Origine di input

Quando un'origine di input non consente il parallelismo, il passaggio di importazione dell'origine di input può diventare un collo di bottiglia in un job Dataflow. Ad esempio, quando importi dati da un singolo file di testo compresso, Dataflow non può caricare i dati di input. Poiché la maggior parte dei formati di compressione non può essere suddivisa arbitrariamente più piccoli durante l'importazione, Dataflow deve leggere i dati in sequenza all'inizio del file. La velocità effettiva complessiva della pipeline viene rallentata parte non parallela della pipeline. La soluzione a questo problema consiste nel utilizza un'origine di input più scalabile.

In alcuni casi, la step fusion riduce anche il parallelismo. Quando l'origine di input non consente il parallelismo, se Dataflow unisce il passaggio di importazione dati con i passaggi successivi e assegna questo passaggio integrato in un singolo thread, l'intera pipeline potrebbe essere eseguita più lentamente.

Per evitare questo scenario, inserisci un passaggio Reshuffle dopo il passaggio di importazione della fonte di input. Per ulteriori informazioni, consulta Sezione Impedisci fusione di questo documento.

Fanout e forma dei dati predefiniti

Il fan-out predefinito di un singolo passaggio di trasformazione può diventare un collo di bottiglia e limitare il parallelismo. Ad esempio, "fan-out elevato" La trasformazione ParDo può causare fusione per limitare la capacità di Dataflow di ottimizzare l'utilizzo dei worker. Nel un'operazione di questo tipo, potresti avere una raccolta di input con ma ParDo produce un output con centinaia o migliaia di volte tanti elementi, seguiti da altri ParDo. Se la Dataflow il servizio fonde insieme queste operazioni ParDo, il parallelismo in questo passaggio è limitato al massimo al numero di elementi nella raccolta di input, anche se il campo PCollection intermedio contiene molti più elementi.

Per potenziali soluzioni, consulta la sezione Impedire la fusione di questo documento.

Forma dei dati

La forma dei dati, che si tratti di dati di input o intermedi, può limitare il parallelismo. Ad esempio, quando un GroupByKey sale su una chiave naturale, come una città, seguito da un passaggio map o Combine, Dataflow fonde i due passaggi. Quando lo spazio delle chiavi è piccolo, ad esempio cinque città, e una chiave è molto caldo, ad esempio una grande città, la maggior parte degli elementi nell'output di GroupByKey vengono distribuiti in un unico processo. Questo processo diventa un collo di bottiglia e rallenta fino al completamento del lavoro.

In questo esempio, puoi ridistribuire i risultati del passaggio GroupByKey in un uno spazio artificiale più ampio rispetto ai tasti naturali. Inserisci un Reshuffle passaggio tra il passaggio GroupByKey e il passaggio map o Combine. Nel passaggio Reshuffle, crea lo spazio della chiave artificiale, ad esempio utilizzando una hash, per superare il limitato parallelismo causato dalla forma dei dati.

Per ulteriori informazioni, consulta Sezione Impedisci fusione di questo documento.

Sink di output

Un sink è una trasformazione che scrive in un sistema di archiviazione dati esterno, come o un database. In pratica, i sink vengono modellati e implementati come standard DoFn e vengono utilizzati per materializzare un PCollection ai sistemi esterni. In questo caso, PCollection contiene i risultati finali della pipeline. Thread che le API di sink delle chiamate possono essere eseguite in parallelo per scrivere dati nei sistemi esterni. Di per impostazione predefinita, non si verifica alcun coordinamento tra i thread. Senza una intermedia per il buffering delle richieste di scrittura e del flusso di controllo, il sistema esterno si sovraccarichi e si riducono la velocità effettiva di scrittura. Fai lo scale up delle risorse aggiungendo altre il parallelismo potrebbe rallentare ulteriormente la pipeline.

La soluzione a questo problema consiste nel ridurre il parallelismo nella fase di scrittura. Puoi aggiungere un passaggio GroupByKey subito prima del passaggio di scrittura. GroupByKey i gruppi di passaggi inviano i dati in un insieme più piccolo di batch per ridurre il numero totale di chiamate RPC e connessioni a sistemi esterni. Ad esempio, utilizza un GroupByKey per creare un spazio hash di 50 punti dati su 1 milione.

Lo svantaggio di questo approccio è che introduce un limite hardcoded al parallelismo. Un'altra opzione è implementare il backoff esponenziale nel sink durante la scrittura e i dati di Google Cloud. Questa opzione può fornire una limitazione del client minima.

Monitora il parallelismo

Per monitorare il parallelismo, puoi utilizzare la console Google Cloud per visualizzare in ritardo. Per ulteriori informazioni, consulta Risolvere i problemi relativi ai job in ritardo nei job batch e Risolvere i problemi relativi ai job in ritardo nei job streaming.

Ottimizzazione della fusione

Dopo aver convalidato il formato JSON del grafico di esecuzione della pipeline, Il servizio Dataflow potrebbe modificare il grafico per eseguire le ottimizzazioni. Le ottimizzazioni possono includere la fusione di più passaggi o trasformazioni il grafico di esecuzione della pipeline in singoli passaggi. L'unione dei passaggi impedisce dal servizio Dataflow alla necessità di materializzare ogni intermedio PCollection nella pipeline, il che può essere costoso in termini di memoria sull'overhead di elaborazione.

Sebbene tutte le trasformazioni specificate nella creazione della pipeline siano eseguiti sul servizio, al fine di garantire l'esecuzione più efficiente del pipeline, le trasformazioni possono essere eseguite in un ordine diverso o come parte di una trasformazione fusa più grande. Il servizio Dataflow rispetta i dati dipendenze tra i passaggi nel grafico di esecuzione, ma altrimenti i passaggi potrebbero essere eseguiti in qualsiasi ordine.

Esempio di Fusion

Il seguente diagramma mostra come il grafico di esecuzione Esempio di WordCount incluso con L'SDK Apache Beam per Java può essere ottimizzato e integrato Servizio Dataflow per un'esecuzione efficiente:

Il grafo di esecuzione del programma di esempio WordCount ottimizzato e con i passaggi uniti
dal servizio Dataflow.

Figura 2: esempio di grafico di esecuzione ottimizzato per il conteggio parole

Prevenzione della fusione

In alcuni casi, Dataflow potrebbe indovinare erroneamente il modo ottimale per fondere le operazioni nella pipeline, Abilità di Dataflow di utilizzare tutti i worker disponibili. In questi casi, puoi impedire a Dataflow di eseguire ottimizzazioni della fusione.

Puoi impedire la fusione dei passaggi aggiungendo alla pipeline un'operazione che obbliga il servizio Dataflow a materializzare l'intermedio PCollection. Prendi in considerazione l'utilizzo di una delle seguenti operazioni:

  • Inserisci un GroupByKey e annulla il raggruppamento dopo il primo ParDo. Il servizio Dataflow non unisce mai le operazioni ParDo in un'aggregazione.
  • Supera il tuo livello PCollection intermedio come input aggiuntivo a un altro ParDo. Il servizio Dataflow materializza sempre gli input laterali.
  • Inserisci un passaggio Reshuffle. Reshuffle impedisce la fusione, esegue i controlli di sicurezza sui dati ed esegue la deduplica dei record. Il rimpasto è supportato da Dataflow anche se è contrassegnato come deprecato nel documentazione di Apache Beam.

Monitora Fusion

Puoi accedere al grafico ottimizzato e alle fasi raggruppate nella console Google Cloud. con gcloud CLI o l'API.

Console

Per visualizzare le fasi e i passaggi del grafico Nella console, nella scheda Dettagli esecuzione relativa al tuo Job Dataflow, apri Flusso di lavoro della fase visualizzazione grafico.

Per visualizzare i passaggi del componente integrati per una fase, nel grafico fai clic su nella fase di fusione. Nel riquadro Informazioni sulla fase, la riga Passaggi dei componenti mostra tutte le fasi. A volte parti di una singola trasformazione composita vengono fuse in più fasi.

gcloud

Per accedere al grafico ottimizzato e alle fasi unite utilizzando il gcloud CLI, esegui questo comando gcloud:

  gcloud dataflow jobs describe --full JOB_ID --format json

Sostituisci JOB_ID con l'ID del tuo job Dataflow.

Per estrarre i bit pertinenti, invia l'output del comando gcloud a jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Per vedere la descrizione delle fasi fusi nel file di risposta di output, all'interno della ComponentTransform osserva ExecutionStageSummary .

API

Per accedere al grafico ottimizzato e alle fasi fuse utilizzando l'API, chiama project.locations.jobs.get.

Per visualizzare la descrizione delle fasi fuse nel file di risposta di output, consulta l'oggetto ExecutionStageSummary all'interno dell'array ComponentTransform.

Combina ottimizzazione

Le operazioni di aggregazione sono un concetto importante nell'elaborazione dati su larga scala. L'aggregazione riunisce dati concettualmente lontani, in modo che estremamente utile per la correlazione. Il modello di programmazione Dataflow rappresenta le operazioni di aggregazione come le trasformazioni GroupByKey, CoGroupByKey e Combine.

Le operazioni di aggregazione di Dataflow combinano i dati dell'intero set di dati, inclusi quelli che potrebbero essere distribuiti su più worker. Durante queste operazioni di aggregazione, spesso è più efficiente combinare il maggior numero possibile di dati localmente prima di combinarli tra le istanze. Quando applichi un GroupByKey o altra trasformazione di aggregazione, il servizio Dataflow esegue automaticamente la combinazione parziale in locale prima del raggruppamento principale operativa.

Quando si esegue una combinazione parziale o multilivello, il Dataflow servizio prende decisioni diverse a seconda che la pipeline stia utilizzando o meno in modalità flusso o batch. Per i dati limitati, il servizio favorisce l'efficienza eseguirà il maggior numero possibile di combinazioni locali. Per i dati illimitati, favorisce una latenza più bassa e potrebbe non eseguire una combinazione parziale, potrebbe aumentare la latenza.