Quando selezioni un job Dataflow specifico, l'interfaccia di monitoraggio fornisce una rappresentazione grafica della pipeline: il grafico del job. La pagina del grafico del job nella console fornisce anche un riepilogo del job, una log del job e informazioni su ogni passaggio della pipeline.
Il grafico del job di una pipeline rappresenta ogni trasformazione nella pipeline come un riquadro. Ciascuna contiene il nome della trasformazione e le informazioni sullo stato del job, include:
- In esecuzione: il passaggio è in esecuzione.
- In coda: il passaggio di un job FlexRS è in coda.
- Riuscito: il passaggio è stato completato correttamente.
- Arrestato: il passaggio è stato interrotto perché il job è stato arrestato.
- Sconosciuto: non è stato possibile segnalare lo stato del passaggio.
- Non riuscito: il passaggio non è stato completato.
Per impostazione predefinita, la pagina del grafico del job mostra la Visualizzazione grafico. Per visualizzare il job grafico come tabella, nella visualizzazione Passaggi del job seleziona Visualizzazione tabella. La visualizzazione tabella contiene le stesse informazioni in un formato diverso. La visualizzazione tabella è utili nei seguenti scenari:
- Il job prevede molte fasi, che rendono difficile la navigazione nel grafico del job.
- Vuoi ordinare i passaggi del lavoro in base a una proprietà specifica. Ad esempio, puoi ordinare la tabella in base al tempo di esecuzione per identificare i passaggi lenti.
Grafico del job di base
Codice pipeline:
Java// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())); Python( pipeline # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(args.input_file) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(args.output_path)) Vai// Create the pipeline. p := beam.NewPipeline() s := p.Root() // Read the lines of the input text. lines := textio.Read(s, *input) // Count the words. counted := beam.ParDo(s, CountWords, lines) // Write the formatted word counts to output. textio.Write(s, *output, formatted) |
Grafico del job:
|
Trasformazioni composte
Nel grafico del job, trasformazioni composte, che contengono più sottotrasformazioni nidificate, sono espandibili. Le trasformazioni composte espandibili sono contrassegnate da una freccia nel grafico. Fai clic sull' Freccia per espandere la trasformazione e visualizzare le sottotrasformazioni al suo interno.
Codice pipeline:
Java// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } Python# The CountWords Composite Transform inside the WordCount pipeline. @beam.ptransform_fn def CountWords(pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Vai// The CountWords Composite Transform inside the WordCount pipeline. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } |
Grafico del job:
|
Nel codice della pipeline, potresti aver richiamato la trasformazione composita come segue:
result = transform.apply(input);
Le trasformazioni composte richiamate in questo modo omettono la nidificazione prevista e potrebbero pertanto appaiono espansi Interfaccia. La pipeline potrebbe anche generare avvisi o errori relativi alla nomi univoci al momento dell'esecuzione della pipeline.
Per evitare questi problemi, assicurati di richiamare le trasformazioni utilizzando formato consigliato:
result = input.apply(transform);
Trasforma nomi
Dataflow ha diversi modi per ottenere il nome della trasformazione mostrato nel grafico del job di monitoraggio.
Java
- Dataflow può utilizzare un nome che assegni quando applichi la trasformazione. Il primo
che fornisci al metodo
apply
è il nome della trasformazione. - Dataflow può dedurre il nome della trasformazione, sia dal nome della classe (se hai creato una
trasformazione personalizzata) o il nome dell'oggetto funzione
DoFn
(se utilizzi un oggetto trasformazioni del core comeParDo
).
Python
- Dataflow può utilizzare un nome che assegni quando applichi la trasformazione. Puoi impostare
del nome della trasformazione specificando l'argomento
label
della trasformazione. - Dataflow può dedurre il nome della trasformazione, sia dal nome della classe (se hai creato una
trasformazione personalizzata) o il nome dell'oggetto funzione
DoFn
(se utilizzi un oggetto trasformazioni del core comeParDo
).
Vai
- Dataflow può utilizzare un nome che assegni quando applichi la trasformazione. Puoi impostare
del nome della trasformazione specificando
Scope
. - Dataflow può dedurre il nome della trasformazione, sia dal
struct se utilizzi un
DoFn
strutturale o dalla funzione se utilizzi unDoFn
funzionale.
Comprendere le metriche
Questa sezione fornisce dettagli sulle metriche associate al grafico del job.
Tempo totale di esecuzione
Quando fai clic su un passaggio, la metrica Tempo di muro viene visualizzata nella Riquadro Informazioni passaggio. Il tempo di esecuzione fornisce il tempo totale approssimativo trascorso in tutti i thread in tutti i worker sulle seguenti azioni:
- Inizializzazione del passaggio
- Elaborazione dei dati
- Riproduzione casuale dei dati
- Fine del passaggio
Per i passaggi compositi, il tempo totale di esecuzione indica la somma del tempo trascorso nel componente passaggi. Questa stima può aiutarti a identificare i passaggi lenti e a diagnosticare quale parte del la pipeline sta richiedendo più tempo del necessario.
Metriche di input aggiuntive
Le metriche di input secondarie mostrano in che modo input aggiuntivo i pattern e gli algoritmi di accesso influiscono sulle prestazioni della pipeline. Quando usa un input aggiuntivo, Dataflow scrive la raccolta in una permanente, come un disco, e le trasformazioni lette da questo livello . Queste operazioni di lettura e scrittura influiscono sul tempo di esecuzione del job.
L'interfaccia di monitoraggio di Dataflow mostra le metriche di input secondario quando selezioni una trasformazione che crea o utilizza una raccolta di input aggiuntivi. Puoi visualizza le metriche nella sezione Metriche di input aggiuntivo del riquadro Informazioni sul passaggio.
Trasformazioni che creano un input aggiuntivo
Se la trasformazione selezionata crea una raccolta di input laterale, La sezione Metriche di input aggiuntivo mostra il nome della raccolta, insieme con le seguenti metriche:
- Tempo dedicato alla scrittura: il tempo dedicato a scrivere il raccolta di input aggiuntivi.
- Byte scritti:il numero totale. di byte scritti nella raccolta di input laterali.
- Tempo e byte letti dall'input aggiuntivo: una tabella che contiene e metriche aggiuntive per tutte le trasformazioni che utilizzano la raccolta di input aggiuntivi. chiamati consumatori di input aggiuntivi.
Il tempo e byte letti dall'input aggiuntivo contiene le seguenti informazioni per ogni consumatore di input:
- Client input aggiuntivo: il nome della trasformazione del lato un consumatore di input.
- Tempo dedicato alla lettura: il tempo che questo consumatore ha dedicato alla lettura. la raccolta di input aggiuntivi.
- Byte letti: il numero di byte letti da questo consumer. la raccolta di input aggiuntivi.
Se la pipeline ha una trasformazione composita che crea un input aggiuntivo, espandi la trasformazione composita fino a vediamo la sottotrasformazione specifica che crea l'input aggiuntivo. Quindi, seleziona la sottotrasformazione per visualizzare le metriche di input aggiuntivo. .
La figura 4 mostra le metriche di input laterali per una trasformazione che crea un lato raccolta di input.
Trasformazioni che consumano uno o più input aggiuntivi
Se la trasformazione selezionata utilizza uno o più input aggiuntivi, La sezione Metriche di input aggiuntivo mostra Tempo e byte letti dall'input aggiuntivo. Questa tabella contiene le seguenti informazioni per ogni raccolta di input lato:
- Raccolta input laterale:il nome dell'input aggiuntivo. .
- Tempo dedicato alla lettura: il tempo che la trasformazione ha dedicato a leggere. da questa raccolta di input.
- Byte letti:il numero di byte da cui viene letta la trasformazione. da questa raccolta di input.
Se la pipeline ha una trasformazione composita che legge un input aggiuntivo, espandi la trasformazione composita fino a vediamo la sottotrasformazione specifica che legge l'input aggiuntivo. Quindi, seleziona la sottotrasformazione per visualizzare le metriche di input aggiuntivo. .
La figura 5 mostra le metriche di input laterali per una trasformazione che legge da un raccolta di input aggiuntivi.
Identificare i problemi di prestazioni degli input collaterali
La ripetizione è un problema comune di prestazioni degli input. Se il tuo input aggiuntivo
PCollection
è troppo grande. I worker non possono memorizzare nella cache l'intera raccolta.
Di conseguenza, i worker devono leggere ripetutamente dall'input lato permanente
.
Nella figura 6, le metriche di input secondario mostrano che il totale dei byte letti la raccolta di input aggiuntivi è molto più grande delle dimensioni della raccolta, dei byte totali scritto.
Per migliorare le prestazioni di questa pipeline, riprogetta l'algoritmo in evitare di ripetere o recuperare i dati di input aggiuntivi. In questo esempio, la pipeline crea il prodotto cartesiano di due collezioni. L'algoritmo ripete l'intera raccolta di input aggiuntivi per ogni elemento dell'elemento . Puoi migliorare il pattern di accesso alla pipeline eseguendo il batch insieme a più elementi della raccolta principale. Questa modifica riduce di volte in cui i worker devono rileggere la raccolta di input aggiuntivi.
Se la pipeline esegue un join, può verificarsi un altro problema di prestazioni comune.
applicando un valore ParDo
con uno o più input laterali grandi. In questo caso,
i worker dedicano un'ampia percentuale del tempo di elaborazione all'operazione di join
le raccolte di input laterali.
La figura 7 mostra esempi di metriche di input secondarie per questo problema:
Per migliorare le prestazioni di questa pipeline, utilizza CoGroupByKey invece che come input aggiuntivi.