Ciclo di vita della pipeline

Questa pagina fornisce una panoramica del ciclo di vita della pipeline, dal codice alla del job Dataflow.

In questa pagina vengono spiegati i seguenti concetti:

  • Che cos'è un grafico di esecuzione e come una pipeline Apache Beam diventa Job Dataflow
  • In che modo Dataflow gestisce gli errori
  • In che modo Dataflow carica e distribuisce automaticamente logica di elaborazione nella pipeline per i worker che eseguono il job
  • Ottimizzazioni dei job che Dataflow potrebbe rendere

Grafico di esecuzione

Quando esegui la pipeline Dataflow, Dataflow crea un grafico di esecuzione dal codice che costruisce l'oggetto Pipeline, incluse tutte le trasformazioni e le relative funzioni di elaborazione associate, come oggetti DoFn. Questo è il grafico di esecuzione della pipeline, dove la fase viene chiamata tempo di creazione del grafico.

Durante la creazione del grafico, Apache Beam esegue localmente il codice il principale punto di ingresso del codice della pipeline, fermandosi alle chiamate a origini, o di trasformazione, trasformando queste chiamate in nodi del grafico. Di conseguenza, una porzione di codice nel punto di ingresso di una pipeline (Java e Go main o il livello più alto di uno script Python) viene eseguito localmente sulla macchina esegue la pipeline. Lo stesso codice dichiarato in un metodo di un oggetto DoFn viene eseguita nei worker Dataflow.

Ad esempio, il valore WordCount incluso negli SDK Apache Beam, contiene una serie di trasformazioni leggere, estrarre, contare, formattare e scrivere le singole parole di una raccolta di testo, insieme a un conteggio di occorrenze per ogni parola. Il seguente diagramma mostra come le trasformazioni nella pipeline di conteggio parole vengono espanse grafico di esecuzione:

Le trasformazioni nel programma di esempio WordCount sono state espanse in un grafico di esecuzione
di passi da eseguire dal servizio Dataflow.

Figura 1: grafico di esecuzione di esempio di conteggio parole

Il grafico di esecuzione è spesso diverso dall'ordine in cui hai specificato i quando crei la pipeline. Questa differenza esiste perché Il servizio Dataflow esegue varie ottimizzazioni e fusioni di esecuzione prima che venga eseguito su risorse cloud gestite. La Il servizio Dataflow rispetta le dipendenze dei dati durante l'esecuzione una pipeline o un blocco note personalizzato. Tuttavia, i passaggi senza dipendenze tra i dati possono essere eseguiti in qualsiasi ordine.

Per vedere il grafico di esecuzione non ottimizzato di Dataflow generati per la pipeline, seleziona il job Interfaccia di monitoraggio di Dataflow. Per ulteriori informazioni sulla visualizzazione offerte di lavoro, consulta Utilizzare l'interfaccia di monitoraggio di Dataflow.

Durante la creazione dei grafici, Apache Beam verifica che tutte le risorse a cui fa riferimento la pipeline, come i bucket Cloud Storage, tabelle BigQuery e argomenti Pub/Sub o esistono e sono accessibili. La convalida viene eseguita tramite di chiamate API standard ai rispettivi servizi, quindi è fondamentale che l'utente L'account utilizzato per eseguire una pipeline disponga di una connettività adeguata ai servizi necessari ed è autorizzato a chiamare le API dei servizi. Prima di inviare la pipeline nel servizio Dataflow, Apache Beam controlla anche altri errori, e garantisce che il grafico della pipeline non contenga operazioni illegali.

Il grafico di esecuzione viene quindi tradotto in formato JSON e l'esecuzione in JSON viene trasmesso all'endpoint del servizio Dataflow.

Il servizio Dataflow convalida quindi il grafico di esecuzione JSON. Quando convalidato, diventa un job in Dataflow completamente gestito di Google Cloud. Puoi visualizzare il job, il relativo grafico di esecuzione, lo stato e il log utilizzando l'interfaccia di monitoraggio di Dataflow.

Java

Il servizio Dataflow invia una risposta alla macchina su cui esegui il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto DataflowPipelineJob, che contiene il valore jobId del tuo job Dataflow. Utilizza jobId per monitorare, tracciare e risolvere i problemi del job tramite l' Interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow. Per ulteriori informazioni, consulta Riferimento API per DataflowPipelineJob.

Python

Il servizio Dataflow invia una risposta alla macchina su cui esegui il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto DataflowPipelineResult, che contiene il valore job_id del tuo job Dataflow. Utilizza l'job_id per monitorare e tracciare il job, nonché per risolvere eventuali problemi utilizzando Interfaccia di monitoraggio di Dataflow e ai Interfaccia a riga di comando di Dataflow.

Vai

Il servizio Dataflow invia una risposta alla macchina su cui esegui il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto dataflowPipelineResult, che contiene il valore jobID del tuo job Dataflow. Utilizza l'jobID per monitorare e tracciare il job, nonché per risolvere eventuali problemi utilizzando Interfaccia di monitoraggio di Dataflow e ai Interfaccia a riga di comando di Dataflow.

La creazione del grafico avviene anche quando esegui la pipeline localmente, ma il grafico non viene tradotto in JSON né trasmesso al servizio. Il grafico viene invece eseguito localmente sulla stessa macchina in cui hai avviato programma Dataflow. Per ulteriori informazioni, vedi Configurazione di PipelineOptions per l'esecuzione locale.

Gestione di errori ed eccezioni

La pipeline potrebbe generare eccezioni durante l'elaborazione dei dati. Alcuni di questi errori sono temporanei, ad esempio difficoltà temporanee di accesso a un servizio esterno. Altro gli errori sono permanenti, ad esempio errori causati da dati di input corrotti o non analizzabili, o puntatori nulli durante il calcolo.

Dataflow elabora gli elementi in bundle arbitrari e riprova il bundle completo quando viene generato un errore per un qualsiasi elemento del bundle. Quando in modalità batch, i bundle che includono un elemento con errori vengono ritentati quattro volte. Si verifica un errore completo della pipeline quando un singolo bundle presenta errori per quattro volte. Quando in esecuzione in modalità flusso di dati, viene ritentato un bundle che include un elemento con errori a tempo indeterminato, il che potrebbe causare il blocco permanente della pipeline.

Durante l'elaborazione in modalità batch, potresti notare singoli errori prima di un errore completo di un job della pipeline, che si verifica quando un determinato bundle fallisce dopo quattro tentativi. Ad esempio, se la tua pipeline tenta di elaborare 100 pacchetti, Dataflow generare diverse centinaia di singoli errori finché un singolo bundle non raggiunge condizione "4-failure" per l'uscita.

Gli errori del worker di avvio, come la mancata installazione di pacchetti sui worker, sono: transitorio. Questo scenario comporta nuovi tentativi indefiniti e potrebbe causare la pipeline si blocchi definitivamente.

Parallelizzazione e distribuzione

Il servizio Dataflow lo parallelizza e distribuisce automaticamente la logica di elaborazione della pipeline ai worker assegnati per l'esecuzione il tuo lavoro. Dataflow utilizza le astrazioni modello di programmazione per rappresentare funzioni di elaborazione parallela. Ad esempio, ParDo trasforma in una pipeline fanno sì che Dataflow distribuisca automaticamente il codice di elaborazione, rappresentati da DoFn oggetti, a più worker da eseguire in parallelo.

Esistono due tipi di parallelismo dei lavori:

  • Il parallelismo orizzontale si verifica quando i dati della pipeline vengono suddivisi ed elaborati più worker contemporaneamente. Il runtime di Dataflow è alimentato da un pool di worker distribuiti. Una pipeline ha un parallelismo potenziale più alto quando il pool contiene più worker, ma anche questa configurazione ha un costo maggiore. In teoria, orizzontale il parallelismo non ha un limite massimo. Tuttavia, Dataflow limita il pool di worker a 4000 worker per ottimizzare l'utilizzo delle risorse a livello di parco risorse.

  • Il parallelismo verticale si verifica quando i dati della pipeline vengono suddivisi ed elaborati più core CPU sullo stesso worker. Ogni worker è alimentato da un alla VM di Compute Engine. Una VM può eseguire più processi per saturare tutti i suoi Core della CPU. Una VM con più core ha un potenziale più elevato parallelismo verticale, ma questa configurazione comporta costi più alti. Spesso è necessario un numero di core più alto comporta un aumento dell'utilizzo della memoria, quindi il numero di core solitamente è scalate insieme alle dimensioni della memoria. Dato il limite fisico del computer di architetture, il limite superiore di parallelismo verticale è molto più basso limite superiore del parallelismo orizzontale.

Parallelismo gestito

Per impostazione predefinita, Dataflow gestisce automaticamente il parallelismo dei job. Dataflow monitora le statistiche di runtime per il job, ad esempio la CPU e memoria utilizzata per determinare come scalare il job. A seconda del lavoro le impostazioni, Dataflow può scalare i job orizzontalmente, Scalabilità automatica orizzontale oppure in verticale, noto come scalabilità verticale. La scalabilità automatica per il parallelismo ottimizza i costi del job le prestazioni lavorative.

Per migliorare le prestazioni del job, Dataflow ottimizza anche le pipeline internamente. Le ottimizzazioni tipiche sono l'ottimizzazione della fusione e combinare l'ottimizzazione. Unendo i passaggi della pipeline, Dataflow elimina le i costi associati al coordinamento delle varie fasi in un sistema distribuito ed eseguire ogni singolo passaggio separatamente.

Fattori che influenzano il parallelismo

I seguenti fattori influiscono sul funzionamento del parallelismo di job Dataflow.

Origine di input

Quando un'origine di input non consente il parallelismo, il passaggio di importazione dell'origine di input può un collo di bottiglia in un job Dataflow. Ad esempio, quando importi dati da un singolo file di testo compresso, Dataflow non può caricare i dati di input. Poiché la maggior parte dei formati di compressione non può essere suddivisa arbitrariamente più piccoli durante l'importazione, Dataflow deve leggere i dati in sequenza all'inizio del file. La velocità effettiva complessiva della pipeline viene rallentata parte non parallela della pipeline. La soluzione a questo problema consiste nel utilizza un'origine di input più scalabile.

In alcuni casi, la step fusion riduce anche il parallelismo. Quando l'origine di input non consente il parallelismo, se Dataflow unisce il passaggio di importazione dati con i passaggi successivi e assegna questo passaggio integrato in un singolo thread, l'intera pipeline potrebbe essere eseguita più lentamente.

Per evitare questo scenario, inserisci un passaggio Reshuffle dopo l'origine di input fase di importazione. Per ulteriori informazioni, consulta Sezione Impedisci fusione di questo documento.

Fanout e forma dei dati predefiniti

Il fan-out predefinito di un singolo passaggio di trasformazione può diventare un collo di bottiglia e limitare il parallelismo. Ad esempio, "fan-out elevato" La trasformazione ParDo può causare fusione per limitare la capacità di Dataflow di ottimizzare l'utilizzo dei worker. Nella un'operazione di questo tipo, potresti avere una raccolta di input con ma ParDo produce un output con centinaia o migliaia di volte tanti elementi, seguiti da altri ParDo. Se la Dataflow il servizio fonde insieme queste operazioni ParDo, il parallelismo in questo passaggio è limitato al massimo al numero di elementi nella raccolta di input, anche se il campo PCollection intermedio contiene molti più elementi.

Per le potenziali soluzioni, vedi Sezione Impedisci fusione di questo documento.

Forma dei dati

La forma dei dati, che si tratti di dati di input o intermedi, può limitare il parallelismo. Ad esempio, quando un GroupByKey sale su una chiave naturale, come una città, seguito da un passaggio map o Combine, Dataflow fonde i due passaggi. Quando lo spazio delle chiavi è piccolo, ad esempio cinque città, e una chiave è molto caldo, ad esempio una grande città, la maggior parte degli elementi nell'output di GroupByKey vengono distribuiti in un unico processo. Questo processo diventa un collo di bottiglia e rallenta fino al completamento del lavoro.

In questo esempio, puoi ridistribuire i risultati del passaggio GroupByKey in un uno spazio artificiale più ampio rispetto ai tasti naturali. Inserisci un Reshuffle passaggio tra il passaggio GroupByKey e il passaggio map o Combine. Nel passaggio Reshuffle, crea lo spazio della chiave artificiale, ad esempio utilizzando un hash, per superare il limitato parallelismo causato dalla forma dei dati.

Per ulteriori informazioni, consulta Sezione Impedisci fusione di questo documento.

Sink di output

Un sink è una trasformazione che scrive in un sistema di archiviazione dati esterno, come o un database. In pratica, i sink vengono modellati e implementati come standard DoFn e vengono utilizzati per materializzare un PCollection ai sistemi esterni. In questo caso, PCollection contiene i risultati finali della pipeline. Thread che le API di sink delle chiamate possono essere eseguite in parallelo per scrivere dati nei sistemi esterni. Di per impostazione predefinita, non si verifica alcun coordinamento tra i thread. Senza una intermedia per il buffering delle richieste di scrittura e del flusso di controllo, il sistema esterno si sovraccarichi e si riducono la velocità effettiva di scrittura. Fai lo scale up delle risorse aggiungendo altre il parallelismo potrebbe rallentare ulteriormente la pipeline.

La soluzione a questo problema consiste nel ridurre il parallelismo nella fase di scrittura. Puoi aggiungere un passaggio GroupByKey subito prima di quello di scrittura. GroupByKey i gruppi di passaggi inviano i dati in un insieme più piccolo di batch per ridurre il numero totale di chiamate RPC e connessioni a sistemi esterni. Ad esempio, utilizza GroupByKey per creare un di 50 punti dati su 1 milione.

Lo svantaggio di questo approccio è che introduce un limite hardcoded al parallelismo. Un'altra opzione è implementare il backoff esponenziale nel sink durante la scrittura e i dati di Google Cloud. Questa opzione può offrire una limitazione minima del client.

Monitora il parallelismo

Per monitorare il parallelismo, puoi utilizzare la console Google Cloud per visualizzare in ritardo. Per ulteriori informazioni, vedi Risolvi i problemi relativi agli elementi in ritardo nei job batch e Risolvi i problemi degli elementi in ritardo nei job di flussi di dati.

Ottimizzazione Fusion

Dopo aver convalidato il formato JSON del grafico di esecuzione della pipeline, Il servizio Dataflow potrebbe modificare il grafico per eseguire le ottimizzazioni. Le ottimizzazioni possono includere la fusione di più passaggi o trasformazioni il grafico di esecuzione della pipeline in singoli passaggi. L'unione dei passaggi impedisce dal servizio Dataflow alla necessità di materializzare ogni intermedio PCollection nella pipeline, il che può essere costoso in termini di memoria sull'overhead di elaborazione.

Sebbene tutte le trasformazioni specificate nella creazione della pipeline siano eseguiti sul servizio, al fine di garantire l'esecuzione più efficiente del pipeline, le trasformazioni possono essere eseguite in un ordine diverso o come parte di una trasformazione fusa più grande. Il servizio Dataflow rispetta i dati dipendenze tra i passaggi nel grafico di esecuzione, ma altrimenti i passaggi potrebbero essere eseguiti in qualsiasi ordine.

Esempio di Fusion

Il seguente diagramma mostra come il grafico di esecuzione Esempio di WordCount incluso con L'SDK Apache Beam per Java può essere ottimizzato e integrato Servizio Dataflow per un'esecuzione efficiente:

Il grafico di esecuzione per il programma di esempio WordCount ottimizzato e con i passi fusi
dal servizio Dataflow.

Figura 2: esempio di grafico di esecuzione ottimizzato per il conteggio parole

Prevenzione della fusione

In alcuni casi, Dataflow potrebbe indovinare erroneamente il modo ottimale per fondere le operazioni nella pipeline, Abilità di Dataflow di utilizzare tutti i worker disponibili. In questi casi, puoi impedire a Dataflow di eseguire ottimizzazioni della fusione.

Puoi impedire la fusione dei passaggi aggiungendo alla pipeline un'operazione che obbliga il servizio Dataflow a materializzare l'intermedio PCollection. Prendi in considerazione l'utilizzo di una delle seguenti operazioni:

  • Inserisci un GroupByKey e separali dopo il primo ParDo. La Il servizio Dataflow non fonde mai le operazioni ParDo in un e aggregazione.
  • Supera il tuo livello PCollection intermedio come input aggiuntivo a un altro ParDo. Il servizio Dataflow si materializza sempre input aggiuntivi.
  • Inserisci un passaggio Reshuffle. Reshuffle impedisce la fusione, i checkpoint dei dati ed esegue la deduplicazione dei record. Il rimpasto è supportato da Dataflow anche se è contrassegnato come deprecato nel documentazione di Apache Beam.

Monitora Fusion

Puoi accedere al grafico ottimizzato e alle fasi raggruppate nella console Google Cloud. con gcloud CLI o l'API.

Console

Per visualizzare le fasi e i passaggi del grafico Nella console, nella scheda Dettagli esecuzione relativa al tuo Job Dataflow, apri Flusso di lavoro della fase visualizzazione grafico.

Per visualizzare i passaggi del componente integrati per una fase, nel grafico fai clic su nella fase di fusione. Nel riquadro Informazioni sulla fase, la riga Passaggi dei componenti mostra tutte le fasi. A volte le parti di una singola trasformazione composita sono fusi in più fasi.

gcloud

Per accedere al grafico ottimizzato e alle fasi unite utilizzando il gcloud CLI, esegui questo comando gcloud:

  gcloud dataflow jobs describe --full JOB_ID --format json

Sostituisci JOB_ID con l'ID del tuo job Dataflow.

Per estrarre i bit pertinenti, invia l'output del comando gcloud a jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Per vedere la descrizione delle fasi fusi nel file di risposta di output, all'interno della ComponentTransform osserva ExecutionStageSummary .

API

Per accedere al grafico ottimizzato e alle fasi unite utilizzando l'API, richiama project.locations.jobs.get

Per vedere la descrizione delle fasi fusi nel file di risposta di output, all'interno della ComponentTransform osserva ExecutionStageSummary .

Combina ottimizzazione

Le operazioni di aggregazione sono un concetto importante nell'elaborazione dati su larga scala. L'aggregazione riunisce dati concettualmente lontani, in modo che estremamente utile per la correlazione. Il modello di programmazione Dataflow rappresenta le operazioni di aggregazione come GroupByKey, CoGroupByKey e Combine trasforma.

Le operazioni di aggregazione di Dataflow combinano dati nell'intero tra cui quelli che potrebbero essere distribuiti su più worker. Durante l'esecuzione di aggregazione, spesso è più efficiente combinare quanti più dati localmente prima di combinare i dati tra le istanze. Quando applichi un GroupByKey o altra trasformazione di aggregazione, il servizio Dataflow esegue automaticamente la combinazione parziale in locale prima del raggruppamento principale operativa.

Quando si esegue una combinazione parziale o multilivello, il Dataflow servizio prende decisioni diverse a seconda che la pipeline stia utilizzando o meno in modalità flusso o batch. Per i dati limitati, il servizio favorisce l'efficienza eseguirà il maggior numero possibile di combinazioni locali. Per i dati illimitati, favorisce una latenza più bassa e potrebbe non eseguire una combinazione parziale, potrebbe aumentare la latenza.