Wenn Sie einen bestimmten Dataflow-Job auswählen, bietet die Monitoring-Oberfläche eine grafische Darstellung Ihrer Pipeline: die Jobgrafik. Die Seite „Jobgrafik“ in der Console enthält auch eine Jobübersicht, ein Joblog und Informationen zu jedem Schritt in der Pipeline.
Darin wird jede Transformation in der Pipeline als Feld dargestellt. Jedes Feld enthält den Transformationsnamen und Informationen zum Jobstatus, darunter:
- Aktiv: Der Schritt wird ausgeführt.
- Queued (In der Warteschlange): Der Schritt in einem FlexRS-Job wurde in die Warteschlange gestellt.
- Succeeded (Erfolgreich): Der Schritt wurde erfolgreich abgeschlossen.
- Angehalten: Der Schritt wurde angehalten, weil der Job angehalten wurde.
- Unknown (Unbekannt): In diesem Schritt konnte der Status nicht gemeldet werden.
- Failed (Fehlgeschlagen): Der Schritt konnte nicht abgeschlossen werden.
Standardmäßig wird auf der Seite für die Jobgrafik die Diagrammansicht angezeigt. Job-Grafik als Tabelle ansehen. Dazu Wählen Sie unter Ansicht mit Jobschritten die Option Tabellenansicht aus. Tabellenansicht enthält die gleichen Informationen in einem anderen Format. Die Tabellenansicht ist in folgenden Fällen hilfreich:
- Ihr Job besteht aus vielen Phasen, was die Navigation in der Jobgrafik erschwert.
- Sie möchten die Jobschritte nach einer bestimmten Property sortieren. Sie können beispielsweise die Tabelle nach Echtzeit soprtieren, um langsame Schritte zu identifizieren.
Einfache Jobgrafik
Pipelinecode:
Java// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())); Python( pipeline # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(args.input_file) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(args.output_path)) Go// Create the pipeline. p := beam.NewPipeline() s := p.Root() // Read the lines of the input text. lines := textio.Read(s, *input) // Count the words. counted := beam.ParDo(s, CountWords, lines) // Write the formatted word counts to output. textio.Write(s, *output, formatted) |
Jobgrafik:
|
Zusammengesetzte Transformationen
In der Ausführungsgrafik lassen sich zusammengesetzte Transformationen – solche, die mehrere verschachtelte Untertransformationen enthalten – maximieren. Diese Transformationen sind in der Grafik mit einem Pfeil markiert. Klicken Sie auf den Pfeil, um die Transformation zu maximieren und die darin enthaltenen Untertransformationen anzeigen zu lassen.
Pipelinecode:
Java// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } Python# The CountWords Composite Transform inside the WordCount pipeline. @beam.ptransform_fn def CountWords(pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Go// The CountWords Composite Transform inside the WordCount pipeline. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } |
Jobgrafik:
|
In Ihrem Pipeline-Code haben Sie Ihre zusammengesetzte Transformation möglicherweise so aufgerufen:
result = transform.apply(input);
In zusammengesetzten Transformationen, die auf diese Weise aufgerufen werden, wird die erwartete Verschachtelung weggelassen. Aus diesem Grund können sie in der Monitoring-Oberfläche von Dataflow erweitert angezeigt werden. Ihre Pipeline kann während der Pipelineausführung außerdem Warnmeldungen oder Fehler zu stabilen eindeutigen Namen generieren.
Achten Sie darauf, Ihre Transformation im empfohlenen Format aufzurufen, um diese Probleme zu vermeiden:
result = input.apply(transform);
Transformationsnamen
Dataflow hat verschiedene Möglichkeiten, den Transformationsnamen zu ermitteln, der in der Jobgrafik der Überwachung angezeigt wird.
Java
- Dataflow kann den Namen verwenden, den Sie beim Anwenden der Transformation zuweisen. Das erste Argument, das Sie an die Methode
apply
geben, ist Ihr Transformationsname. - Dataflow kann den Transformationsnamen ableiten – entweder vom Klassennamen, wenn Sie eine benutzerdefinierte Transformation erstellt haben, oder vom Namen des Funktionsobjekts
DoFn
, wenn Sie eine Kerntransformation wieParDo
verwenden.
Python
- Dataflow kann den Namen verwenden, den Sie beim Anwenden der Transformation zuweisen. Durch das Angeben des Arguments
label
der Transformation können Sie den Transformationsnamen angeben. - Dataflow kann den Transformationsnamen ableiten – entweder vom Klassennamen, wenn Sie eine benutzerdefinierte Transformation erstellt haben, oder vom Namen des Funktionsobjekts
DoFn
, wenn Sie eine Kerntransformation wieParDo
verwenden.
Go
- Dataflow kann den Namen verwenden, den Sie beim Anwenden der Transformation zuweisen. Durch Angabe von
Scope
können Sie den Transformationsnamen festlegen. - Dataflow kann den Transformationsnamen ableiten, entweder aus dem struct-Namen, wenn Sie einen strukturellen
DoFn
verwenden, oder aus dem Funktionsnamen, wenn Sie einen funktionalenDoFn
verwenden.
Messwerte interpretieren
Dieser Abschnitt enthält Details zu den Messwerten, die mit der Jobgrafik verknüpft sind.
Echtzeit
Wenn Sie auf einen Schritt klicken, wird der Messwert Echtzeit im Bereich Schrittinformationen angezeigt. Die Echtzeit gibt die ungefähre Zeit an, die in allen Threads in allen Workern für die folgenden Aktionen aufgewendet wurde:
- Schritt initialisieren
- Daten verarbeiten
- Daten nach dem Zufallsprinzip verteilen
- Schritt beenden
Bei zusammengesetzten Schritten zeigt die Echtzeit die Summe der für die einzelnen Schritte aufgewendeten Zeit an. Mit diesem Wert können Sie Schritte ermitteln, die mehr Zeit benötigen, und so feststellen, welcher Teil Ihrer Pipeline mehr Zeit in Anspruch nimmt, als er sollte.
Messwerte zu Nebeneingaben
Messwerte zu Nebeneingaben zeigen an, wie sich die Zugriffsmuster und -algorithmen Ihrer Nebeneingaben auf die Leistung der Pipeline auswirken. Wenn Ihre Pipeline eine Nebeneingabe verwendet, schreibt Dataflow die Sammlung in eine nichtflüchtige Ebene, z. B. auf eine Festplatte, und wandelt die Leseergebnisse aus dieser nichtflüchtigen Sammlung um. Diese Lese- und Schreibvorgänge beeinflussen die Ausführungszeit Ihres Jobs.
Die Monitoring-Oberfläche von Dataflow zeigt Messwerte zu Nebeneingaben an, wenn Sie eine Transformation auswählen, die eine Sammlung von Nebeneingaben erstellt oder verwendet. Sie können sich die Messwerte im Bereich Schrittinformationen unter Messwerte zu Nebeneingaben ansehen.
Transformationen, die eine Nebeneingabe erstellen
Wenn die ausgewählte Transformation eine Sammlung von Nebeneingaben erstellt, werden im Bereich Messwerte zu Nebeneingaben der Name der Sammlung und die folgenden Messwerte angezeigt.
- Zeit zum Schreiben: Die Ausführungszeit für das Schreiben der Sammlung von Nebeneingaben.
- Geschriebene Byte: Gesamtzahl der Byte, die in die Sammlung der Nebeneingaben geschrieben wurden.
- Zeit und Byte zum Lesen von Nebeneingaben: Eine Tabelle mit zusätzlichen Messwerten für alle Transformationen, die auf die Sammlung der Nebeneingaben zugreifen. Sie werden als Nutzer von Nebeneingaben bezeichnet.
Unter Zeit und Byte zum Lesen von Nebeneingaben werden für jeden Nutzer der Nebeneingaben folgende Informationen angezeigt:
- Nutzer der Nebeneingabe: Transformationsname des Nutzers der Nebeneingabe
- Zeit zum Lesen: Zeit, die für das Lesen der Nebeneingabesammlung benötigt wurde.
- Gelesene Byte: Anzahl der Byte, die dieser Nutzer aus der Nebeneingabesammlung gelesen hat
Wenn Ihre Pipeline eine zusammengesetzte Transformation enthält, die eine Nebeneingabe erzeugt, erweitern Sie die zusammengesetzte Transformation, bis Sie die spezifische Subtransformation sehen, von der die Nebeneingabe erzeugt wird. Wählen Sie dann diese Subtransformation aus, um den Abschnitt Messwerte zu Nebeneingaben zu sehen.
Abbildung 4 zeigt Messwerte zu Nebeneingaben für eine Transformation, die eine Sammlung von Nebeneingaben erstellt.
<ph type="x-smartling-placeholder">Transformationen, die eine oder mehrere Nebeneingaben nutzen
Wenn die ausgewählte Transformation eine oder mehrere Nebeneingaben nutzt, wird im Bereich Messwerte zu Nebeneingaben die Tabelle Zeit und Byte zum Lesen von Nebeneingaben angezeigt. Diese Tabelle enthält für jede Nebeneingabesammlung folgende Informationen:
- Nebeneingabesammlung: Name der Nebeneingabesammlung
- Zeit zum Lesen: Zum Lesen dieser Nebeneingabesammlung benötigte Zeit
- Gelesene Byte: Anzahl der Byte, die die Transformation aus dieser Nebeneingabesammlung gelesen hat
Wenn Ihre Pipeline eine zusammengesetzte Transformation enthält, die eine Nebeneingabe liest, erweitern Sie die zusammengesetzte Transformation, bis Sie die bestimmte Subtransformation sehen, die die Nebeneingabe liest. Wählen Sie dann diese Subtransformation aus, um den Abschnitt Messwerte zu Nebeneingaben zu sehen.
Abbildung 5 zeigt Messwerte zu Nebeneingaben für eine Transformation, die aus einer Sammlung von Nebeneingaben liest.
Leistungsprobleme bei Nebeneingaben identifizieren
Ein häufig auftretendes Leistungsproblem bei Nebeneingaben wird durch Wiederholung verursacht. Wenn die Nebeneingabesammlung PCollection
zu groß ist, können die Worker nicht die gesamte Sammlung im Cache speichern.
Das bedeutet, die Worker müssen mehrmals aus der nichtflüchtigen Nebeneingabesammlung lesen.
In Abbildung 6 zeigen die Messwerte der Nebeneingaben, dass die Gesamtzahl der Byte, die aus der Sammlung von Nebeneingaben gelesen wurden, viel höher ist als die Größe der Sammlung selbst (insgesamt geschriebene Byte).
Um die Leistung dieser Pipeline zu verbessern, müssen Sie den Algorithmus neu gestalten, um zu vermeiden, dass die Nebeneingabedaten iteriert oder wiederholt abgerufen werden. In diesem Fall erstellt die Pipeline das kartesische Produkt zweier Sammlungen. Der Algorithmus durchläuft die gesamte Nebeneingabesammlung für jedes Element der Hauptsammlung. Sie können das Zugriffsmuster der Pipeline verbessern, indem Sie mehrere Elemente der Hauptsammlung gruppieren. Diese Änderung verringert die Häufigkeit, mit der Worker das Lesen der Nebeneingabesammlung wiederholen müssen.
Ein weiteres Leistungsproblem kann auftreten, wenn Ihre Pipeline einen Join vornimmt. Dabei wird ein ParDo
auf eine oder mehrere große Nebeneingaben angewendet. In diesem Fall können die Worker einen hohen prozentualen Anteil der Verarbeitungszeit für die Join-Vorgänge aus den Sammlungen der Nebeneingaben lesen.
Abbildung 7 zeigt Beispielmesswerte für Nebeneingaben, die dieses Problem verdeutlichen:
Um die Leistung dieser Pipeline zu verbessern, können Sie anstelle von Nebeneingaben CoGroupByKey verwenden.