Grafici job Dataflow

Quando selezioni un job Dataflow specifico, l'interfaccia di monitoraggio fornisce una rappresentazione grafica della pipeline: il grafico del job. La pagina del grafico dei job nella console fornisce anche un riepilogo del job, un log del job e informazioni su ogni passaggio della pipeline.

Il grafico dei job di una pipeline rappresenta ogni trasformazione nella pipeline come un riquadro. Ogni casella contiene il nome della trasformazione e informazioni sullo stato del job, tra cui:

  • In esecuzione: il passaggio è in esecuzione.
  • In coda: il passaggio in un job FlexRS è in coda.
  • Riuscito: il passaggio è stato completato correttamente.
  • Arrestato: il passaggio è stato interrotto perché il job è stato interrotto.
  • Sconosciuto: il passaggio non è riuscito a segnalare lo stato.
  • Non riuscito: il passaggio non è stato completato.

Grafico job di base

Codice pipeline:

Java


  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python


(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go


  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)

Grafico delle offerte di lavoro:

Il grafico di esecuzione per una pipeline WordCount come mostrato nell'interfaccia di monitoraggio di Dataflow.

Figura 1: il codice della pipeline per una pipeline WordCount mostrato con il grafico di esecuzione risultante nell'interfaccia di monitoraggio di Dataflow.

Trasformazioni composite

Nel grafico del job, le trasformazioni composite, che contengono più trasformazioni secondarie nidificate, sono espandibili. Le trasformazioni composte espandibili sono contrassegnate da una freccia nel grafico. Fai clic sulla freccia per espandere la trasformazione e visualizzare le trasformazioni secondarie al suo interno.

Codice pipeline:

Java


  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python


# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go


  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Grafico delle offerte di lavoro:

Il grafico del job per una pipeline di CountWords con la trasformazione CountWords espansa per mostrare le trasformazioni dei suoi componenti.

Figura 2: il codice della pipeline per i passaggi secondari della trasformazione di CountWords mostrato con il grafico espanso del job per l'intera pipeline.

Nel codice della pipeline, potresti aver richiamato la trasformazione composita come segue:

result = transform.apply(input);

Le trasformazioni composite richiamate in questo modo omettono la nidificazione prevista e potrebbero quindi apparire espanse nell'interfaccia di monitoraggio di Dataflow. La pipeline potrebbe anche generare avvisi o errori relativi a nomi univoci stabili al momento dell'esecuzione.

Per evitare questi problemi, assicurati di richiamare le trasformazioni utilizzando il formato consigliato:

result = input.apply(transform);

Trasforma nomi

Dataflow ha diversi modi per ottenere il nome della trasformazione mostrato nel grafico del job di monitoraggio.

Java

  • Dataflow può utilizzare un nome assegnato da te quando applichi la trasformazione. Il primo argomento che fornisci al metodo apply è il nome della trasformazione.
  • Dataflow può dedurre il nome della trasformazione dal nome della classe (se hai creato una trasformazione personalizzata) o dal nome dell'oggetto funzione DoFn (se utilizzi una trasformazione di core come ParDo).

Python

  • Dataflow può utilizzare un nome assegnato da te quando applichi la trasformazione. Puoi impostare il nome della trasformazione specificando l'argomento label della trasformazione.
  • Dataflow può dedurre il nome della trasformazione dal nome della classe (se hai creato una trasformazione personalizzata) o dal nome dell'oggetto funzione DoFn (se utilizzi una trasformazione di core come ParDo).

Go

  • Dataflow può utilizzare un nome assegnato da te quando applichi la trasformazione. Puoi impostare il nome della trasformazione specificando Scope.
  • Dataflow può dedurre il nome della trasformazione, dal nome dello struttura se utilizzi un DoFn strutturale o dal nome della funzione se utilizzi un DoFn funzionale.

Comprendere le metriche

Questa sezione fornisce dettagli sulle metriche associate al grafico del job.

Tempo totale di esecuzione

Quando fai clic su un passaggio, la metrica Tempo definito viene visualizzata nel riquadro Informazioni passaggio. Il tempo di esecuzione fornisce il tempo totale approssimativo trascorso in tutti i thread di tutti i worker per le azioni seguenti:

  • Inizializzazione del passaggio
  • Elaborazione dei dati
  • Riproduzione casuale dei dati
  • Fine del passaggio

Per i passaggi composti, tempo totale di esecuzione è la somma del tempo speso nei passaggi del componente. Questa stima può aiutarti a identificare i passaggi lenti e a diagnosticare quale parte della pipeline richiede più tempo del necessario.

Puoi visualizzare il tempo necessario per l'esecuzione di un passaggio nella tua pipeline.
Figura 3: la metrica Tempo firewall può aiutarti a garantire che la pipeline funzioni in modo efficiente.

Metriche di input secondario

Le metriche di input laterale mostrano in che modo i pattern e gli algoritmi di accesso all'input lato influiscono sulle prestazioni della pipeline. Quando la pipeline utilizza un input laterale, Dataflow scrive la raccolta in un livello persistente, ad esempio un disco, e le trasformazioni leggono da questa raccolta permanente. Queste operazioni di lettura e scrittura influiscono sul tempo di esecuzione del job.

L'interfaccia di monitoraggio di Dataflow mostra le metriche di input laterale quando selezioni una trasformazione che crea o utilizza una raccolta di input laterale. Puoi visualizzare le metriche nella sezione Metriche di input laterale del riquadro Informazioni passaggio.

Trasformazioni che creano un input secondario

Se la trasformazione selezionata crea una raccolta di input laterale, la sezione Metriche di input laterale mostra il nome della raccolta e le seguenti metriche:

  • Tempo di scrittura: il tempo dedicato alla scrittura della raccolta di input lato.
  • Byte scritti: il numero totale di byte scritti nella raccolta di input laterale.
  • Tempo e byte letti dall'input laterale:una tabella contenente metriche aggiuntive per tutte le trasformazioni che utilizzano la raccolta di input laterale, chiamate consumer di input lato.

La tabella Tempo e byte letti dall'input laterale contiene le seguenti informazioni per ogni consumer di input laterale:

  • Consumatore di input secondario: il nome della trasformazione del consumer di input secondario.
  • Tempo di lettura: il tempo trascorso da questo consumatore a leggere la raccolta di input collaterali.
  • Byte letti: il numero di byte letti da questo consumer dalla raccolta di input laterale.

Se la tua pipeline ha una trasformazione composita che crea un input laterale, espandi la trasformazione composita finché non vedi la sottotrasformazione composita che crea l'input laterale. Poi, seleziona la trasformazione secondaria per visualizzare la sezione Metriche di input laterale.

La Figura 4 mostra le metriche di input laterale per una trasformazione che crea una raccolta di input secondari.

Puoi selezionare la trasformazione secondaria e le relative metriche di input laterale sono visibili nel riquadro laterale delle informazioni sul passaggio.
Figura 4: il grafico del job contiene una trasformazione composita espansa (MakeMapView). La trasformazione secondaria che crea l'input laterale (CreateDataflowView) è selezionata e le metriche di input laterale sono visibili nel riquadro laterale Informazioni sul passaggio.

Trasformazioni che consumano uno o più input collaterali

Se la trasformazione selezionata utilizza uno o più input collaterali, la sezione Metriche di input laterale mostra la tabella Tempo e byte letti dall'input laterale. Questa tabella contiene le seguenti informazioni per ogni raccolta di input secondari:

  • Raccolta di input laterale: il nome della raccolta di input laterale.
  • Tempo dedicato alla lettura: il tempo impiegato dalla trasformazione per leggere questa raccolta di input secondari.
  • Byte letti: il numero di byte letti dalla trasformazione da questa raccolta di input lato.

Se la tua pipeline ha una trasformazione composita che legge un input laterale, espandi la trasformazione composita finché non vedi la sottotrasformazione specifica che legge l'input laterale. Poi, seleziona la trasformazione secondaria per visualizzare la sezione Metriche di input laterale.

La Figura 5 mostra le metriche di input laterale per una trasformazione che legge da una raccolta di input aggiuntivi.

Puoi selezionare la trasformazione e le relative metriche di input laterale sono visibili nel riquadro laterale delle informazioni sul passaggio.
Figura 5: la trasformazione JoinBothCollections legge da una raccolta di input laterale. Nel grafico del job è selezionata l'opzione JoinBothCollections e le metriche di input laterale sono visibili nel riquadro laterale Informazioni sul passaggio.

Identifica i problemi di prestazioni degli input secondari

La reiterazione è un problema comune relativo alle prestazioni dell'input secondario. Se l'input secondario è troppo grande, i worker non possono memorizzare nella cache l'intera raccolta.PCollection Di conseguenza, i worker devono leggere ripetutamente la raccolta di input lato permanente.

Nella Figura 6, le metriche di input laterale mostrano che i byte totali letti dalla raccolta di input laterale sono molto più grandi delle dimensioni della raccolta, ovvero i byte totali scritti.

Puoi selezionare la trasformazione e le relative metriche di input laterale sono visibili nel riquadro laterale delle informazioni sul passaggio.
Figura 6: un esempio di reiterazione. La raccolta di input laterale è di 563 MB e la somma dei byte letti dalle trasformazioni di consumo è di quasi 12 GB.

Per migliorare le prestazioni di questa pipeline, riprogetta l'algoritmo per evitare l'iterazione o il recupero dei dati di input aggiuntivi. In questo esempio, la pipeline crea il prodotto cartesiano di due raccolte. L'algoritmo esegue l'iterazione nell'intera raccolta di input secondari per ogni elemento della raccolta principale. Puoi migliorare il modello di accesso della pipeline raggruppando più elementi della raccolta principale. Questa modifica riduce il numero di volte in cui i worker devono rileggere la raccolta di input aggiuntivi.

Un altro problema comune di prestazioni può verificarsi se la pipeline esegue un join applicando ParDo con uno o più input collaterali di grandi dimensioni. In questo caso, i worker dedicano una grande percentuale del tempo di elaborazione alla lettura dell'operazione di join dalle raccolte di input laterali.

La figura 7 mostra alcuni esempi di metriche di input laterale per questo problema:

Puoi selezionare la trasformazione e le relative metriche di input laterale sono visibili nel riquadro laterale delle informazioni sul passaggio.
Figura 7: la trasformazione JoinBothCollections ha un tempo di elaborazione totale di 18 minuti e 31 secondi. I worker passano la maggior parte del tempo di elaborazione (10 minuti e 3 secondi) a leggere la raccolta di input lato di 10 GB.

Per migliorare le prestazioni di questa pipeline, utilizza CoGroupByKey anziché input secondari.