Dataflow-Jobgrafiken

Wenn Sie einen bestimmten Dataflow-Job auswählen, bietet die Monitoring-Oberfläche eine grafische Darstellung Ihrer Pipeline: die Jobgrafik. Die Seite „Jobgrafik“ in der Console enthält auch eine Jobübersicht, ein Joblog und Informationen zu jedem Schritt in der Pipeline.

Darin wird jede Transformation in der Pipeline als Feld dargestellt. Jedes Feld enthält den Transformationsnamen und Informationen zum Jobstatus, darunter:

  • Aktiv: Der Schritt wird ausgeführt.
  • Queued (In der Warteschlange): Der Schritt in einem FlexRS-Job wurde in die Warteschlange gestellt.
  • Succeeded (Erfolgreich): Der Schritt wurde erfolgreich abgeschlossen.
  • Angehalten: Der Schritt wurde angehalten, weil der Job angehalten wurde.
  • Unknown (Unbekannt): In diesem Schritt konnte der Status nicht gemeldet werden.
  • Failed (Fehlgeschlagen): Der Schritt konnte nicht abgeschlossen werden.

Einfache Jobgrafik

Pipelinecode:

Java


  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python


(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go


  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)

Jobgrafik:

Die Ausführungsgrafik für eine WordCount-Pipeline wie in der Dataflow-Überwachungsoberfläche angezeigt.

Abbildung 1: Der Pipelinecode für eine WordCount-Pipeline, wie sie in der Dataflow-Überwachungsoberfläche angezeigt wird.

Zusammengesetzte Transformationen

In der Ausführungsgrafik lassen sich zusammengesetzte Transformationen – solche, die mehrere verschachtelte Untertransformationen enthalten – maximieren. Diese Transformationen sind in der Grafik mit einem Pfeil markiert. Klicken Sie auf den Pfeil, um die Transformation zu maximieren und die darin enthaltenen Untertransformationen anzeigen zu lassen.

Pipelinecode:

Java


  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python


# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go


  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Jobgrafik:

Die Jobgrafik für eine WordCount-Pipeline mit erweiterter CountWords-Transformation, um die darin enthaltenen Untertransformationen zu zeigen.

Abbildung 2: Der Pipelinecode für die Teilschritte der CountWords-Transformation, die mit der erweiterten Jobgrafik der gesamten Pipeline angezeigt wird.

In Ihrem Pipeline-Code haben Sie Ihre zusammengesetzte Transformation möglicherweise so aufgerufen:

result = transform.apply(input);

In zusammengesetzten Transformationen, die auf diese Weise aufgerufen werden, wird die erwartete Verschachtelung weggelassen. Aus diesem Grund können sie in der Monitoring-Oberfläche von Dataflow erweitert angezeigt werden. Ihre Pipeline kann während der Pipelineausführung außerdem Warnmeldungen oder Fehler zu stabilen eindeutigen Namen generieren.

Achten Sie darauf, Ihre Transformation im empfohlenen Format aufzurufen, um diese Probleme zu vermeiden:

result = input.apply(transform);

Transformationsnamen

Dataflow hat verschiedene Möglichkeiten, den Transformationsnamen zu ermitteln, der in der Jobgrafik der Überwachung angezeigt wird.

Java

  • Dataflow kann den Namen verwenden, den Sie beim Anwenden der Transformation zuweisen. Das erste Argument, das Sie an die Methode apply geben, ist Ihr Transformationsname.
  • Dataflow kann den Transformationsnamen ableiten – entweder vom Klassennamen, wenn Sie eine benutzerdefinierte Transformation erstellt haben, oder vom Namen des Funktionsobjekts DoFn, wenn Sie eine Kerntransformation wie ParDo verwenden.

Python

  • Dataflow kann den Namen verwenden, den Sie beim Anwenden der Transformation zuweisen. Durch das Angeben des Arguments label der Transformation können Sie den Transformationsnamen angeben.
  • Dataflow kann den Transformationsnamen ableiten – entweder vom Klassennamen, wenn Sie eine benutzerdefinierte Transformation erstellt haben, oder vom Namen des Funktionsobjekts DoFn, wenn Sie eine Kerntransformation wie ParDo verwenden.

Go

  • Dataflow kann den Namen verwenden, den Sie beim Anwenden der Transformation zuweisen. Durch Angabe von Scope können Sie den Transformationsnamen festlegen.
  • Dataflow kann den Transformationsnamen ableiten, entweder aus dem struct-Namen, wenn Sie einen strukturellen DoFn verwenden, oder aus dem Funktionsnamen, wenn Sie einen funktionalen DoFn verwenden.

Messwerte interpretieren

Dieser Abschnitt enthält Details zu den Messwerten, die mit der Jobgrafik verknüpft sind.

Echtzeit

Wenn Sie auf einen Schritt klicken, wird der Messwert Echtzeit im Bereich Schrittinformationen angezeigt. Die Echtzeit gibt die ungefähre Zeit an, die in allen Threads in allen Workern für die folgenden Aktionen aufgewendet wurde:

  • Schritt initialisieren
  • Daten verarbeiten
  • Daten nach dem Zufallsprinzip verteilen
  • Schritt beenden

Bei zusammengesetzten Schritten zeigt die Echtzeit die Summe der für die einzelnen Schritte aufgewendeten Zeit an. Mit diesem Wert können Sie Schritte ermitteln, die mehr Zeit benötigen, und so feststellen, welcher Teil Ihrer Pipeline mehr Zeit in Anspruch nimmt, als er sollte.

Sie können sehen, wie lange es dauert, einen Schritt in Ihrer Pipeline auszuführen.
Abbildung 3: Der Messwert für Wall time (Echtzeit) gibt an, ob Ihre Pipeline effizient ausgeführt wird.

Messwerte zu Nebeneingaben

Messwerte zu Nebeneingaben zeigen an, wie sich die Zugriffsmuster und -algorithmen Ihrer Nebeneingaben auf die Leistung der Pipeline auswirken. Wenn Ihre Pipeline eine Nebeneingabe verwendet, schreibt Dataflow die Sammlung in eine nichtflüchtige Ebene, z. B. auf eine Festplatte, und wandelt die Leseergebnisse aus dieser nichtflüchtigen Sammlung um. Diese Lese- und Schreibvorgänge beeinflussen die Ausführungszeit Ihres Jobs.

Die Monitoring-Oberfläche von Dataflow zeigt Messwerte zu Nebeneingaben an, wenn Sie eine Transformation auswählen, die eine Sammlung von Nebeneingaben erstellt oder verwendet. Sie können sich die Messwerte im Bereich Schrittinformationen unter Messwerte zu Nebeneingaben ansehen.

Transformationen, die eine Nebeneingabe erstellen

Wenn die ausgewählte Transformation eine Sammlung von Nebeneingaben erstellt, werden im Bereich Messwerte zu Nebeneingaben der Name der Sammlung und die folgenden Messwerte angezeigt.

  • Zeit zum Schreiben: Die Ausführungszeit für das Schreiben der Sammlung von Nebeneingaben.
  • Geschriebene Byte: Gesamtzahl der Byte, die in die Sammlung der Nebeneingaben geschrieben wurden.
  • Zeit und Byte zum Lesen von Nebeneingaben: Eine Tabelle mit zusätzlichen Messwerten für alle Transformationen, die auf die Sammlung der Nebeneingaben zugreifen. Sie werden als Nutzer von Nebeneingaben bezeichnet.

Unter Zeit und Byte zum Lesen von Nebeneingaben werden für jeden Nutzer der Nebeneingaben folgende Informationen angezeigt:

  • Nutzer der Nebeneingabe: Transformationsname des Nutzers der Nebeneingabe
  • Zeit zum Lesen: Zeit, die für das Lesen der Nebeneingabesammlung benötigt wurde.
  • Gelesene Byte: Anzahl der Byte, die dieser Nutzer aus der Nebeneingabesammlung gelesen hat

Wenn Ihre Pipeline eine zusammengesetzte Transformation enthält, die eine Nebeneingabe erzeugt, erweitern Sie die zusammengesetzte Transformation, bis Sie die spezifische Subtransformation sehen, von der die Nebeneingabe erzeugt wird. Wählen Sie dann diese Subtransformation aus, um den Abschnitt Messwerte zu Nebeneingaben zu sehen.

Abbildung 4 zeigt Messwerte zu Nebeneingaben für eine Transformation, die eine Sammlung von Nebeneingaben erstellt.

Sie können die Subtransformation auswählen, um deren Messwerte zu Nebeneingaben auf der Infoseite des Schritts anzeigen zu lassen.
Abbildung 4: Die Jobgrafik enthält eine erweiterte zusammengesetzte Transformation (MakeMapView). Die Untertransformation, durch die die Nebeneingabe (CreateDataflowView) erstellt wird, ist markiert und die Messwerte zu Nebeneingaben werden in der Seitenleiste Details zum Schritt angezeigt.

Transformationen, die eine oder mehrere Nebeneingaben nutzen

Wenn die ausgewählte Transformation eine oder mehrere Nebeneingaben nutzt, wird im Bereich Messwerte zu Nebeneingaben die Tabelle Zeit und Byte zum Lesen von Nebeneingaben angezeigt. Diese Tabelle enthält für jede Nebeneingabesammlung folgende Informationen:

  • Nebeneingabesammlung: Name der Nebeneingabesammlung
  • Zeit zum Lesen: Zum Lesen dieser Nebeneingabesammlung benötigte Zeit
  • Gelesene Byte: Anzahl der Byte, die die Transformation aus dieser Nebeneingabesammlung gelesen hat

Wenn Ihre Pipeline eine zusammengesetzte Transformation enthält, die eine Nebeneingabe liest, erweitern Sie die zusammengesetzte Transformation, bis Sie die bestimmte Subtransformation sehen, die die Nebeneingabe liest. Wählen Sie dann diese Subtransformation aus, um den Abschnitt Messwerte zu Nebeneingaben zu sehen.

Abbildung 5 zeigt Messwerte zu Nebeneingaben für eine Transformation, die aus einer Sammlung von Nebeneingaben liest.

Sie können die Transformation auswählen, um deren Messwerte zu Nebeneingaben auf der Infoseite des Schritts anzeigen zu lassen.
Abbildung 5: Die Transformation JoinBothCollections liest aus einer Sammlung von Nebeneingaben. JoinBothCollections ist in der Jobgrafik markiert und die Messwerte zu Nebeneingaben werden in der Seitenleiste Details zum Schritt angezeigt.

Leistungsprobleme bei Nebeneingaben identifizieren

Ein häufig auftretendes Leistungsproblem bei Nebeneingaben wird durch Wiederholung verursacht. Wenn die Nebeneingabesammlung PCollection zu groß ist, können die Worker nicht die gesamte Sammlung im Cache speichern. Das bedeutet, die Worker müssen mehrmals aus der nichtflüchtigen Nebeneingabesammlung lesen.

In Abbildung 6 zeigen die Messwerte der Nebeneingaben, dass die Gesamtzahl der Byte, die aus der Sammlung von Nebeneingaben gelesen wurden, viel höher ist als die Größe der Sammlung selbst (insgesamt geschriebene Byte).

Sie können die Transformation auswählen, um deren Messwerte zu Nebeneingaben auf der Infoseite des Schritts anzeigen zu lassen.
Abbildung 6: Ein Beispiel für eine Wiederholung. Die Nebeneingabesammlung ist 563 MB groß und die Summe der Byte, die von den auf die Sammlung zugreifenden Transformationen gelesen werden, beträgt fast 12 GB.

Um die Leistung dieser Pipeline zu verbessern, müssen Sie den Algorithmus neu gestalten, um zu vermeiden, dass die Nebeneingabedaten iteriert oder wiederholt abgerufen werden. In diesem Fall erstellt die Pipeline das kartesische Produkt zweier Sammlungen. Der Algorithmus durchläuft die gesamte Nebeneingabesammlung für jedes Element der Hauptsammlung. Sie können das Zugriffsmuster der Pipeline verbessern, indem Sie mehrere Elemente der Hauptsammlung gruppieren. Diese Änderung verringert die Häufigkeit, mit der Worker das Lesen der Nebeneingabesammlung wiederholen müssen.

Ein weiteres Leistungsproblem kann auftreten, wenn Ihre Pipeline einen Join vornimmt. Dabei wird ein ParDo auf eine oder mehrere große Nebeneingaben angewendet. In diesem Fall können die Worker einen hohen prozentualen Anteil der Verarbeitungszeit für die Join-Vorgänge aus den Sammlungen der Nebeneingaben lesen.

Abbildung 7 zeigt Beispielmesswerte für Nebeneingaben, die dieses Problem verdeutlichen:

Sie können die Transformation auswählen, um deren Messwerte zu Nebeneingaben auf der Infoseite des Schritts anzeigen zu lassen.
Abbildung 7: Die Transformation JoinBothCollections hat eine Verarbeitungszeit von 18 Minuten und 31 Sekunden. Worker verbringen den Großteil der Verarbeitungszeit (10 Min. und 3 Sek.) mit dem Lesen der 10 GB großen Sammlung von Nebeneingaben.

Um die Leistung dieser Pipeline zu verbessern, können Sie anstelle von Nebeneingaben CoGroupByKey verwenden.