Grafici dei job Dataflow

Quando selezioni un job Dataflow specifico, l'interfaccia di monitoraggio fornisce una rappresentazione grafica della pipeline: il grafico del job. La pagina del grafico del job nella console fornisce anche un riepilogo del job, un log del job e informazioni su ogni passaggio della pipeline.

Il grafico del job di una pipeline rappresenta ogni trasformazione nella pipeline come un riquadro. Ogni casella contiene il nome della trasformazione e le informazioni sullo stato del job, che includono quanto segue:

  • In esecuzione: il passaggio è in esecuzione.
  • In coda: il passaggio di un job FlexRS è in coda.
  • Riuscito: il passaggio è stato completato correttamente.
  • Arrestato: il passaggio è stato interrotto perché il job è stato arrestato.
  • Sconosciuto: non è stato possibile segnalare lo stato del passaggio.
  • Non riuscito: il passaggio non è stato completato.

Per impostazione predefinita, la pagina del grafico del job mostra la Visualizzazione grafico. Per visualizzare il grafico del job sotto forma di tabella, nella visualizzazione dei passaggi del job seleziona Visualizzazione tabella. La visualizzazione tabella contiene le stesse informazioni in un formato diverso. La visualizzazione tabella è utile nei seguenti scenari:

  • Il job prevede molte fasi, che rendono difficile la navigazione nel grafico del job.
  • Vuoi ordinare i passaggi del lavoro in base a una proprietà specifica. Ad esempio, puoi ordinare la tabella per tempo di esecuzione per identificare i passaggi lenti.

Grafico del job di base

Codice pipeline:

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)

Grafico del job:

Il grafico di esecuzione per una pipeline WordCount come mostrato nell'interfaccia di monitoraggio di Dataflow.

Figura 1: il codice della pipeline per una pipeline WordCount mostrata con il grafico di esecuzione risultante nell'interfaccia di monitoraggio di Dataflow.

Trasformazioni composte

Nel grafico del job, le trasformazioni composte che contengono più sottotrasformazioni nidificate sono espandibili. Le trasformazioni composte espandibili sono contrassegnate da una freccia nel grafico. Fai clic sulla freccia per espandere la trasformazione e visualizzare le sottotrasformazioni al suo interno.

Codice pipeline:

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Grafico del job:

Grafico del job per una pipeline WordCount con la trasformazione CountWords espansa
              per mostrare le trasformazioni del suo componente.

Figura 2: codice della pipeline per i passaggi secondari della trasformazione CountWords mostrato con il grafico del job espanso per l'intera pipeline.

Nel codice della pipeline, potresti aver richiamato la trasformazione composita come segue:

result = transform.apply(input);

Le trasformazioni composte richiamate in questo modo omettono la nidificazione prevista e potrebbero quindi apparire espanse nell'interfaccia di monitoraggio di Dataflow. La pipeline potrebbe anche generare avvisi o errori relativi a nomi univoci stabili al momento dell'esecuzione della pipeline.

Per evitare questi problemi, assicurati di richiamare le trasformazioni utilizzando il formato consigliato:

result = input.apply(transform);

Trasforma nomi

Dataflow può avere il nome della trasformazione mostrato nel grafico del job di monitoraggio in vari modi.

Java

  • Dataflow può utilizzare un nome che assegni quando applichi la trasformazione. Il primo argomento che fornisci al metodo apply è il nome della trasformazione.
  • Dataflow può dedurre il nome della trasformazione: dal nome della classe (se hai creato una trasformazione personalizzata) o dal nome dell'oggetto della funzione DoFn (se utilizzi una trasformazione di base come ParDo).

Python

  • Dataflow può utilizzare un nome che assegni quando applichi la trasformazione. Puoi impostare il nome della trasformazione specificando l'argomento label della trasformazione.
  • Dataflow può dedurre il nome della trasformazione: dal nome della classe (se hai creato una trasformazione personalizzata) o dal nome dell'oggetto della funzione DoFn (se utilizzi una trasformazione di base come ParDo).

Go

  • Dataflow può utilizzare un nome che assegni quando applichi la trasformazione. Puoi impostare il nome della trasformazione specificando Scope.
  • Dataflow può dedurre il nome della trasformazione, dal nome della struttura se utilizzi una DoFn strutturale o dal nome della funzione se utilizzi una DoFn funzionale.

Comprendere le metriche

Questa sezione fornisce dettagli sulle metriche associate al grafico del job.

Tempo totale di esecuzione

Quando fai clic su un passaggio, la metrica Tempo di muro viene visualizzata nel riquadro Informazioni passaggio. Il tempo di esecuzione fornisce il tempo totale approssimativo trascorso su tutti i thread di tutti i worker per le seguenti azioni:

  • Inizializzazione del passaggio
  • Elaborazione dei dati
  • Riproduzione casuale dei dati
  • Fine del passaggio

Per i passaggi compositi, il tempo totale di esecuzione indica la somma del tempo speso nei passaggi del componente. Questa stima può aiutarti a identificare i passaggi lenti e a diagnosticare quale parte della pipeline sta richiedendo più tempo del necessario.

Puoi visualizzare il tempo necessario per l&#39;esecuzione di un passaggio nella pipeline.
Figura 3: la metrica Tempo a muro può aiutarti a garantire che la pipeline funzioni in modo efficiente.

Metriche di input aggiuntive

Le metriche di input aggiuntivi mostrano in che modo i pattern e gli algoritmi di accesso tramite input aggiuntivi influiscono sulle prestazioni della pipeline. Quando la pipeline utilizza un input aggiuntivo, Dataflow scrive la raccolta in un livello permanente, ad esempio un disco, e le trasformazioni vengono lette da questa raccolta permanente. Queste operazioni di lettura e scrittura influiscono sul tempo di esecuzione del job.

L'interfaccia di monitoraggio di Dataflow mostra le metriche di input aggiuntivo quando seleziona una trasformazione che crea o utilizza una raccolta di input. Puoi visualizzare le metriche nella sezione Metriche di input aggiuntivo del riquadro Informazioni sul passaggio.

Trasformazioni che creano un input aggiuntivo

Se la trasformazione selezionata crea una raccolta di input aggiuntivi, la sezione Metriche di input aggiuntivo mostra il nome della raccolta, insieme alle seguenti metriche:

  • Tempo dedicato alla scrittura: il tempo dedicato a scrivere la raccolta di input aggiuntivi.
  • Byte scritti: il numero totale di byte scritti nella raccolta di input aggiuntivi.
  • Tempo e byte letti dall'input aggiuntivo: una tabella che contiene metriche aggiuntive per tutte le trasformazioni che utilizzano la raccolta di input aggiuntivi, denominate consumer di input lato.

La tabella Tempo e byte letti dall'input aggiuntivo contiene le seguenti informazioni per ogni consumer di input aggiuntivo:

  • Consumatore di input aggiuntivo: il nome della trasformazione del consumer di input aggiuntivo.
  • Tempo dedicato alla lettura: il tempo che questo consumatore ha dedicato alla lettura della raccolta di input aggiuntivi.
  • Byte letti: il numero di byte letti da questo consumer dalla raccolta di input laterali.

Se la pipeline ha una trasformazione composita che crea un input laterale, espandi la trasformazione composita fino a quando non vedi la sottotrasformazione specifica che crea l'input laterale. Quindi, seleziona la sottotrasformazione per visualizzare la sezione Metriche di input aggiuntivo.

La figura 4 mostra le metriche di input aggiuntivo per una trasformazione che crea una raccolta di input aggiuntivi.

Puoi selezionare la sottotrasformazione e le relative metriche di input laterali sono visibili nel riquadro laterale delle informazioni sui passaggi.
Figura 4: il grafico del job presenta una trasformazione composita espansa (MakeMapView). La trasformazione secondaria che crea l'input laterale (CreateDataflowView) è selezionata e le metriche di input laterali sono visibili nel riquadro laterale Informazioni sul passaggio.

Trasformazioni che consumano uno o più input aggiuntivi

Se la trasformazione selezionata utilizza uno o più input aggiuntivi, la sezione Metriche di input aggiuntivo mostra la tabella Tempo e byte letti dall'input aggiuntivo. Questa tabella contiene le seguenti informazioni per ogni raccolta di input aggiuntivi:

  • Raccolta di input aggiuntivi: il nome della raccolta di input aggiuntivi.
  • Tempo dedicato alla lettura: il tempo impiegato dalla trasformazione per leggere questa raccolta di input laterale.
  • Byte letti: il numero di byte letti dalla trasformazione da questa raccolta di input lato.

Se la pipeline ha una trasformazione composita che legge un input laterale, espandi la trasformazione composita fino a quando non vedi la sottotrasformazione specifica che legge l'input laterale. Quindi, seleziona la sottotrasformazione per visualizzare la sezione Metriche di input aggiuntivo.

La figura 5 mostra le metriche di input aggiuntivo per una trasformazione che legge da una raccolta di input aggiuntivi.

Puoi selezionare la trasformazione e le relative metriche di input laterali sono visibili nel riquadro laterale delle informazioni sui passaggi.
Figura 5: la trasformazione JoinBothCollections legge da una raccolta di input laterali. L'opzione JoinBothCollections è selezionata nel grafico del job e le metriche di input laterali sono visibili nel riquadro laterale Informazioni passaggio.

Identificare i problemi di prestazioni degli input collaterali

La ripetizione è un problema comune di prestazioni degli input. Se l'input aggiuntivo PCollection è troppo grande, i worker non possono memorizzare nella cache l'intera raccolta. Di conseguenza, i worker devono leggere ripetutamente dalla raccolta di input lato permanente.

Nella Figura 6, le metriche di input aggiuntivo mostrano che i byte totali letti dalla raccolta di input aggiuntivi sono molto più grandi delle dimensioni della raccolta (byte totali scritti).

Puoi selezionare la trasformazione e le relative metriche di input laterali sono visibili nel riquadro laterale delle informazioni sui passaggi.
Figura 6: esempio di ripetizione. La raccolta di input aggiuntivi è di 563 MB e la somma dei byte letti dal consumo di trasformazioni è di quasi 12 GB.

Per migliorare le prestazioni di questa pipeline, riprogetta l'algoritmo per evitare di ripetere o recuperare nuovamente i dati di input aggiuntivi. In questo esempio, la pipeline crea un prodotto cartesiano di due raccolte. L'algoritmo esegue l'iterazione dell'intera raccolta di input aggiuntivi per ogni elemento della raccolta principale. Puoi migliorare il pattern di accesso della pipeline raggruppando più elementi della raccolta principale in batch. Questa modifica riduce il numero di volte in cui i worker devono rileggere la raccolta di input aggiuntivi.

Un altro problema comune di prestazioni può verificarsi se la pipeline esegue un join applicando un valore ParDo con uno o più input aggiuntivi di grandi dimensioni. In questo caso, i worker dedicano un'ampia percentuale del tempo di elaborazione all'operazione di join leggendo le raccolte di input laterali.

La figura 7 mostra esempi di metriche di input secondarie per questo problema:

Puoi selezionare la trasformazione e le relative metriche di input laterali sono visibili nel riquadro laterale delle informazioni sui passaggi.
Figura 7: la trasformazione JoinBothCollections ha un tempo di elaborazione totale di 18 minuti e 31 secondi. I worker trascorrono la maggior parte del tempo di elaborazione (10 minuti e 3 secondi) a leggere dalla raccolta di input laterale di 10 GB.

Per migliorare le prestazioni di questa pipeline, utilizza CoGroupByKey anziché gli input aggiuntivi.