Questa pagina fornisce una panoramica del ciclo di vita della pipeline, dal codice alla del job Dataflow.
Questa pagina illustra i seguenti concetti:
- Che cos'è un grafico di esecuzione e come una pipeline Apache Beam diventa Job Dataflow
- In che modo Dataflow gestisce gli errori
- In che modo Dataflow esegue automaticamente il parallelismo e la distribuzione della logica di elaborazione nella pipeline ai worker che eseguono il job
- Ottimizzazioni dei job che Dataflow potrebbe rendere
Grafico di esecuzione
Quando esegui la pipeline Dataflow, Dataflow
crea un grafico di esecuzione dal codice che costruisce l'oggetto Pipeline
,
incluse tutte le trasformazioni e le relative funzioni di elaborazione associate,
come oggetti DoFn
. Questo è il grafico di esecuzione della pipeline, dove la fase viene chiamata
tempo di creazione del grafico.
Durante la creazione del grafico, Apache Beam esegue localmente il codice
il principale punto di ingresso del codice della pipeline, fermandosi alle chiamate a origini,
o di trasformazione, trasformando queste chiamate in nodi del grafico.
Di conseguenza, una porzione di codice nel punto di ingresso di una pipeline (Java e Go main
o il livello più alto di uno script Python) viene eseguito localmente sulla macchina
esegue la pipeline. Lo stesso codice dichiarato in un metodo di un oggetto DoFn
viene eseguita nei worker Dataflow.
Ad esempio, il valore WordCount incluso negli SDK Apache Beam, contiene una serie di trasformazioni leggere, estrarre, contare, formattare e scrivere le singole parole di una raccolta di testo, insieme a un conteggio di occorrenze per ogni parola. Il seguente diagramma mostra come le trasformazioni nella pipeline WordCount vengono espanse in un grafo di esecuzione:
Figura 1: grafico di esecuzione di esempio di conteggio parole
Il grafico di esecuzione è spesso diverso dall'ordine in cui hai specificato i quando crei la pipeline. Questa differenza esiste perché Il servizio Dataflow esegue varie ottimizzazioni e fusioni di esecuzione prima che venga eseguito su risorse cloud gestite. La Il servizio Dataflow rispetta le dipendenze dei dati durante l'esecuzione una pipeline o un blocco note personalizzato. Tuttavia, i passaggi senza dipendenze tra i dati possono essere eseguiti in qualsiasi ordine.
Per vedere il grafico di esecuzione non ottimizzato di Dataflow generati per la pipeline, seleziona il job Interfaccia di monitoraggio di Dataflow. Per ulteriori informazioni sulla visualizzazione offerte di lavoro, consulta Utilizzare l'interfaccia di monitoraggio di Dataflow.
Durante la costruzione del grafo, Apache Beam convalida che tutte le risorse a cui fa riferimento la pipeline, ad esempio i bucket Cloud Storage, le tabelle BigQuery e gli argomenti o gli abbonamenti Pub/Sub, esistano effettivamente e siano accessibili. La convalida viene eseguita tramite di chiamate API standard ai rispettivi servizi, quindi è fondamentale che l'utente L'account utilizzato per eseguire una pipeline disponga di una connettività adeguata ai servizi necessari ed è autorizzato a chiamare le API dei servizi. Prima di inviare la pipeline nel servizio Dataflow, Apache Beam controlla anche altri errori, e garantisce che il grafico della pipeline non contenga operazioni illegali.
Il grafico di esecuzione viene quindi tradotto in formato JSON e l'esecuzione in JSON viene trasmesso all'endpoint del servizio Dataflow.
Il servizio Dataflow convalida quindi il grafico di esecuzione JSON. Quando il grafico viene convalidato, diventa un job nel servizio Dataflow. Puoi visualizzare il job, il relativo grafico di esecuzione, lo stato e il log utilizzando l'interfaccia di monitoraggio di Dataflow.
Java
Il servizio Dataflow invia una risposta alla macchina su cui esegui
il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto
DataflowPipelineJob
, che contiene il jobId
del tuo job Dataflow.
Utilizza jobId
per monitorare, tracciare e risolvere i problemi del job tramite l'
Interfaccia di monitoraggio di Dataflow
e l'interfaccia a riga di comando di Dataflow.
Per ulteriori informazioni, consulta
Riferimento API per DataflowPipelineJob.
Python
Il servizio Dataflow invia una risposta alla macchina su cui esegui
il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto
DataflowPipelineResult
, che contiene il valore job_id
del tuo job Dataflow.
Utilizza l'job_id
per monitorare e tracciare il job, nonché per risolvere eventuali problemi
utilizzando
Interfaccia di monitoraggio di Dataflow
e ai
Interfaccia a riga di comando di Dataflow.
Vai
Il servizio Dataflow invia una risposta alla macchina su cui esegui
il tuo programma Dataflow. Questa risposta è incapsulata nell'oggetto
dataflowPipelineResult
, che contiene il valore jobID
del tuo job Dataflow.
Utilizza l'jobID
per monitorare e tracciare il job, nonché per risolvere eventuali problemi
utilizzando
Interfaccia di monitoraggio di Dataflow
e ai
Interfaccia a riga di comando di Dataflow.
La creazione del grafico avviene anche quando esegui la pipeline localmente, ma il grafico non viene tradotto in JSON né trasmesso al servizio. Il grafico viene invece eseguito localmente sulla stessa macchina in cui hai avviato programma Dataflow. Per ulteriori informazioni, consulta la pagina sulla configurazione di PipelineOptions per l'esecuzione locale.
Gestione di errori ed eccezioni
La pipeline potrebbe generare eccezioni durante l'elaborazione dei dati. Alcuni di questi errori sono temporanei, ad esempio difficoltà temporanee di accesso a un servizio esterno. Altro gli errori sono permanenti, ad esempio errori causati da dati di input corrotti o non analizzabili, o puntatori nulli durante il calcolo.
Dataflow elabora gli elementi in pacchetti arbitrari e riprova il pacchetto completo quando viene generato un errore per un elemento del pacchetto. Quando in modalità batch, i bundle che includono un elemento con errori vengono ritentati quattro volte. Si verifica un errore completo della pipeline quando un singolo bundle presenta errori per quattro volte. Quando in esecuzione in modalità flusso di dati, viene ritentato un bundle che include un elemento con errori a tempo indeterminato, il che potrebbe causare il blocco permanente della pipeline.
Durante l'elaborazione in modalità batch, potresti notare singoli errori prima di un errore completo di un job della pipeline, che si verifica quando un determinato bundle fallisce dopo quattro tentativi. Ad esempio, se la tua pipeline tenta di elaborare 100 pacchetti, Dataflow generare diverse centinaia di singoli errori finché un singolo bundle non raggiunge condizione "4-failure" per l'uscita.
Gli errori del worker di avvio, come la mancata installazione di pacchetti sui worker, sono: transitorio. Questo scenario comporta nuovi tentativi indefiniti e potrebbe causare la pipeline si blocchi definitivamente.
Parallelizzazione e distribuzione
Il servizio Dataflow lo parallelizza e distribuisce automaticamente
la logica di elaborazione della pipeline ai worker assegnati per l'esecuzione
il tuo lavoro. Dataflow utilizza le astrazioni
modello di programmazione per rappresentare
funzioni di elaborazione parallela. Ad esempio, ParDo
trasforma in una pipeline
fanno sì che Dataflow distribuisca automaticamente il codice di elaborazione,
rappresentati da DoFn
oggetti, a più worker da eseguire in parallelo.
Esistono due tipi di parallelismo dei lavori:
Il parallelismo orizzontale si verifica quando i dati della pipeline vengono suddivisi ed elaborati più worker contemporaneamente. Il runtime di Dataflow è alimentato da un pool di worker distribuiti. Una pipeline ha un parallelismo potenziale più alto quando il pool contiene più worker, ma anche questa configurazione ha un costo maggiore. In teoria, il parallelismo horizontale non ha un limite superiore. Tuttavia, Dataflow limita il pool di worker a 4000 per ottimizzare l'utilizzo delle risorse a livello di parco.
Il parallelismo verticale si verifica quando i dati della pipeline vengono suddivisi ed elaborati da più core della CPU sullo stesso worker. Ogni worker è alimentato da un alla VM di Compute Engine. Una VM può eseguire più processi per saturare tutti i suoi CPU Cores. Una VM con più core ha un potenziale più elevato parallelismo verticale, ma questa configurazione comporta costi più alti. Spesso è necessario un numero di core più alto comporta un aumento dell'utilizzo della memoria, quindi il numero di core solitamente è scalate insieme alle dimensioni della memoria. Dato il limite fisico del computer di architetture, il limite superiore di parallelismo verticale è molto più basso limite superiore del parallelismo orizzontale.
Parallelismo gestito
Per impostazione predefinita, Dataflow gestisce automaticamente il parallelismo dei job. Dataflow monitora le statistiche di runtime per il job, ad esempio la CPU e memoria utilizzata per determinare come scalare il job. A seconda delle impostazioni del job, Dataflow può eseguire lo scaling dei job in orizzontale, definito scalabilità automatica orizzontale, o in verticale, definito scalabilità verticale. La scalabilità automatica per il parallelismo ottimizza i costi del job le prestazioni lavorative.
Per migliorare le prestazioni dei job, Dataflow ottimizza anche le pipeline internamente. Le ottimizzazioni tipiche sono l'ottimizzazione della fusione e combinare l'ottimizzazione. Unendo i passaggi della pipeline, Dataflow elimina i costi non necessari associati al coordinamento dei passaggi in un sistema distribuito e all'esecuzione di ogni singolo passaggio separatamente.
Fattori che influiscono sul parallelismo
I seguenti fattori influenzano il funzionamento del parallelismo di job Dataflow.
Origine di input
Quando un'origine di input non consente il parallelismo, il passaggio di importazione dell'origine di input può diventare un collo di bottiglia in un job Dataflow. Ad esempio, quando importi dati da un singolo file di testo compresso, Dataflow non può caricare i dati di input. Poiché la maggior parte dei formati di compressione non può essere suddivisa arbitrariamente più piccoli durante l'importazione, Dataflow deve leggere i dati in sequenza all'inizio del file. La velocità effettiva complessiva della pipeline viene rallentata parte non parallela della pipeline. La soluzione a questo problema consiste nel utilizza un'origine di input più scalabile.
In alcuni casi, la step fusion riduce anche il parallelismo. Quando l'origine di input non consente il parallelismo, se Dataflow unisce il passaggio di importazione dati con i passaggi successivi e assegna questo passaggio integrato in un singolo thread, l'intera pipeline potrebbe essere eseguita più lentamente.
Per evitare questo scenario, inserisci un passaggio Reshuffle
dopo il passaggio di importazione della fonte di input. Per ulteriori informazioni, consulta
Sezione Impedisci fusione di questo documento.
Fanout e forma dei dati predefiniti
Il fan-out predefinito di un singolo passaggio di trasformazione può diventare un
collo di bottiglia e limitare il parallelismo. Ad esempio, "fan-out elevato" La trasformazione ParDo
può causare
fusione per limitare la capacità di Dataflow di ottimizzare l'utilizzo dei worker. Nel
un'operazione di questo tipo, potresti avere una raccolta di input con
ma ParDo
produce un output con centinaia o migliaia di volte
tanti elementi, seguiti da altri ParDo
. Se la Dataflow
il servizio fonde insieme queste operazioni ParDo
, il parallelismo in questo passaggio è
limitato al massimo al numero di elementi nella raccolta di input, anche se
il campo PCollection
intermedio contiene molti più elementi.
Per potenziali soluzioni, consulta la sezione Impedire la fusione di questo documento.
Forma dei dati
La forma dei dati, che si tratti di dati di input o intermedi, può limitare il parallelismo.
Ad esempio, quando un GroupByKey
sale su una chiave naturale, come una città,
seguito da un passaggio map
o Combine
, Dataflow fonde i due
passaggi. Quando lo spazio delle chiavi è piccolo, ad esempio cinque città, e una chiave è molto
caldo, ad esempio una grande città, la maggior parte degli elementi nell'output di GroupByKey
vengono distribuiti in un unico processo. Questo processo diventa un collo di bottiglia e rallenta
fino al completamento del lavoro.
In questo esempio, puoi ridistribuire i risultati del passaggio GroupByKey
in un
uno spazio artificiale più ampio rispetto ai tasti naturali. Inserisci un
Reshuffle
passaggio tra il passaggio GroupByKey
e il passaggio map
o Combine
.
Nel passaggio Reshuffle
, crea lo spazio della chiave artificiale, ad esempio utilizzando una
hash
, per superare il limitato parallelismo causato dalla forma dei dati.
Per ulteriori informazioni, consulta Sezione Impedisci fusione di questo documento.
Sink di output
Un sink è una trasformazione che scrive in un sistema di archiviazione dati esterno, come
o un database. In pratica, i sink vengono modellati e implementati come standard
DoFn
e vengono utilizzati per materializzare un PCollection
ai sistemi esterni.
In questo caso, PCollection
contiene i risultati finali della pipeline. Thread che
le API di sink delle chiamate possono essere eseguite in parallelo per scrivere dati nei sistemi esterni. Di
per impostazione predefinita, non si verifica alcun coordinamento tra i thread. Senza una intermedia
per il buffering delle richieste di scrittura e del flusso di controllo, il sistema esterno
si sovraccarichi e si riducono
la velocità effettiva di scrittura. Fai lo scale up delle risorse aggiungendo altre
il parallelismo potrebbe rallentare ulteriormente la pipeline.
La soluzione a questo problema consiste nel ridurre il parallelismo nella fase di scrittura.
Puoi aggiungere un passaggio GroupByKey
subito prima del passaggio di scrittura. GroupByKey
i gruppi di passaggi inviano i dati in un insieme più piccolo di batch per ridurre il numero totale di chiamate RPC
e connessioni a sistemi esterni. Ad esempio, utilizza un GroupByKey
per creare un
spazio hash di 50 punti dati su 1 milione.
Lo svantaggio di questo approccio è che introduce un limite hardcoded al parallelismo. Un'altra opzione è implementare il backoff esponenziale nel sink durante la scrittura e i dati di Google Cloud. Questa opzione può fornire una limitazione del client minima.
Monitora il parallelismo
Per monitorare il parallelismo, puoi utilizzare la console Google Cloud per visualizzare in ritardo. Per ulteriori informazioni, consulta Risolvere i problemi relativi ai job in ritardo nei job batch e Risolvere i problemi relativi ai job in ritardo nei job streaming.
Ottimizzazione della fusione
Dopo aver convalidato il formato JSON del grafico di esecuzione della pipeline,
Il servizio Dataflow potrebbe modificare il grafico per eseguire le ottimizzazioni.
Le ottimizzazioni possono includere la fusione di più passaggi o trasformazioni
il grafico di esecuzione della pipeline in singoli passaggi. L'unione dei passaggi impedisce
dal servizio Dataflow alla necessità di materializzare ogni intermedio
PCollection
nella pipeline, il che può essere costoso in termini di memoria
sull'overhead di elaborazione.
Sebbene tutte le trasformazioni specificate nella creazione della pipeline siano eseguiti sul servizio, al fine di garantire l'esecuzione più efficiente del pipeline, le trasformazioni possono essere eseguite in un ordine diverso o come parte di una trasformazione fusa più grande. Il servizio Dataflow rispetta i dati dipendenze tra i passaggi nel grafico di esecuzione, ma altrimenti i passaggi potrebbero essere eseguiti in qualsiasi ordine.
Esempio di Fusion
Il seguente diagramma mostra come il grafico di esecuzione Esempio di WordCount incluso con L'SDK Apache Beam per Java può essere ottimizzato e integrato Servizio Dataflow per un'esecuzione efficiente:
Figura 2: esempio di grafico di esecuzione ottimizzato per il conteggio parole
Prevenzione della fusione
In alcuni casi, Dataflow potrebbe indovinare erroneamente il modo ottimale per fondere le operazioni nella pipeline, Abilità di Dataflow di utilizzare tutti i worker disponibili. In questi casi, puoi impedire a Dataflow di eseguire ottimizzazioni della fusione.
Puoi impedire la fusione dei passaggi aggiungendo alla pipeline un'operazione che
obbliga il servizio Dataflow a materializzare l'intermedio
PCollection
. Prendi in considerazione l'utilizzo di una delle seguenti operazioni:
- Inserisci un
GroupByKey
e annulla il raggruppamento dopo il primoParDo
. Il servizio Dataflow non unisce mai le operazioniParDo
in un'aggregazione. - Supera il tuo livello
PCollection
intermedio come input aggiuntivo a un altroParDo
. Il servizio Dataflow materializza sempre gli input laterali. - Inserisci un passaggio
Reshuffle
.Reshuffle
impedisce la fusione, esegue i controlli di sicurezza sui dati ed esegue la deduplica dei record. Il rimpasto è supportato da Dataflow anche se è contrassegnato come deprecato nel documentazione di Apache Beam.
Monitora Fusion
Puoi accedere al grafico ottimizzato e alle fasi raggruppate nella console Google Cloud. con gcloud CLI o l'API.
Console
Per visualizzare le fasi e i passaggi del grafico Nella console, nella scheda Dettagli esecuzione relativa al tuo Job Dataflow, apri Flusso di lavoro della fase visualizzazione grafico.
Per visualizzare i passaggi del componente integrati per una fase, nel grafico fai clic su nella fase di fusione. Nel riquadro Informazioni sulla fase, la riga Passaggi dei componenti mostra tutte le fasi. A volte parti di una singola trasformazione composita vengono fuse in più fasi.
gcloud
Per accedere al grafico ottimizzato e alle fasi unite utilizzando il
gcloud CLI, esegui questo comando gcloud
:
gcloud dataflow jobs describe --full JOB_ID --format json
Sostituisci JOB_ID
con l'ID del tuo job Dataflow.
Per estrarre i bit pertinenti, invia l'output del comando gcloud
a jq
:
gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'
Per vedere la descrizione delle fasi fusi nel file di risposta di output, all'interno della
ComponentTransform
osserva
ExecutionStageSummary
.
API
Per accedere al grafico ottimizzato e alle fasi fuse utilizzando l'API, chiama
project.locations.jobs.get
.
Per visualizzare la descrizione delle fasi fuse nel file di risposta di output, consulta l'oggetto
ExecutionStageSummary
all'interno dell'array
ComponentTransform
.
Combina ottimizzazione
Le operazioni di aggregazione sono un concetto importante nell'elaborazione dati su larga scala.
L'aggregazione riunisce dati concettualmente lontani, in modo che
estremamente utile per la correlazione. Il modello di programmazione Dataflow rappresenta le operazioni di aggregazione come le trasformazioni GroupByKey
, CoGroupByKey
e
Combine
.
Le operazioni di aggregazione di Dataflow combinano i dati dell'intero
set di dati, inclusi quelli che potrebbero essere distribuiti su più worker. Durante queste operazioni di aggregazione, spesso è più efficiente combinare il maggior numero possibile di dati localmente prima di combinarli tra le istanze. Quando applichi un
GroupByKey
o altra trasformazione di aggregazione, il servizio Dataflow
esegue automaticamente la combinazione parziale in locale prima del raggruppamento principale
operativa.
Quando si esegue una combinazione parziale o multilivello, il Dataflow servizio prende decisioni diverse a seconda che la pipeline stia utilizzando o meno in modalità flusso o batch. Per i dati limitati, il servizio favorisce l'efficienza eseguirà il maggior numero possibile di combinazioni locali. Per i dati illimitati, favorisce una latenza più bassa e potrebbe non eseguire una combinazione parziale, potrebbe aumentare la latenza.