Führen Sie zuerst die Schritte im Schnellstart aus, bevor Sie fortfahren.
Die WordCount-Beispiele zeigen, wie eine Verarbeitungspipeline eingerichtet wird, die Text lesen, Textzeilen in einzelne Wörter tokenisieren und für jedes dieser Wörter eine Häufigkeitszählung durchführen kann. Die Dataflow-SDKs enthalten eine Serie dieser vier aufeinanderfolgenden immer detaillierter werdenden WordCount-Beispiele, die aufeinander aufbauen. Bei dem Eingabetext handelt es sich bei allen Beispielen um verschiedene Shakespeare-Texte.
Bei jedem WordCount-Beispiel werden unterschiedliche Konzepte in die Dataflow-SDKs eingeführt.
- Minimal WordCount demonstriert die allgemeinen Grundsätze, die bei der Erstellung einer Dataflow-Pipeline von Bedeutung sind.
- WordCount führt einige der am häufigsten verwendeten Best Practices für die Erstellung wiederverwendbarer und wartbarer Pipelines ein.
- Debugging WordCount führt Verfahren zur Protokollierung und zur Fehlerbehebung ein.
- Windowed WordCount zeigt, wie Sie mit dem Dataflow-Programmiermodell sowohl begrenzte als auch unbegrenzte Datasets verarbeiten können.
Als Erstes sollten Sie das einfachste Beispiel verstehen: Minimal WordCount. Nachdem Sie sich mit den allgemeinen Grundsätzen für die Erstellung einer Pipeline vertraut gemacht haben, fahren Sie mit den Best Practices für das Schreiben von Dataflow-Programmen in WordCount fort. Anschließend lesen Sie sich Debugging WordCount durch, um zu verstehen, wie übliche Verfahren zum Logging und zur Fehlerbehebung angewendet werden. Zu guter Letzt erfahren Sie, wie Sie dasselbe Rechenmuster in begrenzten und unbegrenzten Datasets in Windowed WordCount verwenden.
MinimalWordCount
Minimal WordCount zeigt eine einfache Pipeline, die einen Textblock aus einer Datei in Google Cloud Storage liest, Transformationen anwendet, um Wörter zu tokenisieren und zu zählen, und die Daten in eine Ausgabedatei in einem Cloud Storage-Bucket schreibt. In diesem Beispiel werden die Speicherorte für die Eingabe- und Ausgabedateien als vordefinierter Code aufgenommen und es wird keine Fehlersuche durchgeführt. Es sollen nur die allerwichtigsten Schritte bei der Erstellung einer Dataflow-Pipeline gezeigt werden. In späteren Beispielen werden die Eingabe- und Ausgabequellen parametrisiert sowie weitere Best Practices vorgestellt.
Java
- Pipeline erstellen
- Transformationen auf die Pipeline anwenden
- Eingabe lesen (in diesem Beispiel: Textdateien lesen)
ParDo
-Transformationen anwenden- Von SDK bereitgestellte Transformationen anwenden (in diesem Beispiel
Count
) - Ausgabe schreiben (in diesem Beispiel: in Google Cloud Storage schreiben)
- Pipeline ausführen
In den folgenden Abschnitten werden diese Konzepte ausführlich erklärt und Ausschnitte des relevanten Codes aus der Minimal WordCount-Pipeline bereitgestellt.
Pipeline erstellen
Bei der Erstellung einer Cloud Dataflow-Pipeline muss als Erstes ein Objekt Pipeline-Optionen erstellt werden. Mit diesem Objekt können wir verschiedene Optionen für unsere Pipeline festlegen, beispielsweise den Pipeline-Runner, der unsere Pipeline ausführt, die ID unseres Projekts und den Staging-Ort für die Pipeline, an dem ihre Dateien gespeichert werden. Über ihn kann in der Cloud auf Ihre JAR-Dateien zugegriffen werden. Zwar legen wir die Optionen in diesem Beispiel programmatisch fest, doch immer öfter werden Befehlszeilenargumente verwendet, um Pipeline-Optionen festzulegen.
In unserem Beispiel haben wir BlockingDataflowPipelineRunner
als PipelineRunner
angegeben, damit unsere Pipeline mit dem Google Cloud Dataflow-Dienst in der Cloud ausgeführt wird. Es gibt weitere Optionen, die Sie für die Ausführung Ihrer Pipeline in der Cloud festlegen können. Sie können diese Option auch vollständig weglassen. In diesem Fall wird Ihre Pipeline vom Standard-Runner lokal ausgeführt. Diese Optionen werden in den nächsten beiden WordCount-Beispielen vorgestellt und unter Ausführungsparameter festlegen genauer erläutert.
Java
DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(BlockingDataflowPipelineRunner.class); options.setProject("SET-YOUR-PROJECT-ID-HERE"); // The 'gs' URI means that this is a Google Cloud Storage path options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");
Im nächsten Schritt erstellen Sie ein Pipeline
-Objekt mit den soeben erstellten Optionen. Das Pipeline
-Objekt baut die auszuführende Transformationsgrafik auf, die mit dieser bestimmten Pipeline verknüpft ist.
Java
Pipeline p = Pipeline.create(options);
Ausführliche Informationen zum Pipeline-Objekt und seiner Funktionsweise finden Sie unter Pipelines.
Pipelinetransformationen
Die Pipeline Minimal WordCount enthält mehrere Transformationen, die Daten in die Pipeline einlesen, ändern oder auf andere Weise transformieren und die Ergebnisse ausgeben. Jede Transformation stellt einen Vorgang in der Pipeline dar.
Bei jeder Transformation wird eine Art von Eingabe verwendet (Daten oder sonstiges) und es werden Ausgabedaten produziert.
Die Eingabe- und Ausgabedaten werden von der SDK-Klasse PCollection
dargestellt.
PCollection ist eine spezielle Klasse, die von der Dataflow-SDK bereitgestellt wird, die Sie für die Darstellung eines Datasets mit beliebiger Größe verwenden können. Dies schließt unbegrenzte Datasets ein.
Abbildung 1 zeigt den Datenfluss der Pipeline:

Die Minimal WordCount-Pipeline enthält fünf Transformationen:
- Die Transformation Read einer Textdatei wird auf das
Pipeline
-Objekt angewendet und erzeugt einenPCollection
als Ausgabe. Jedes Element in der AusgabePCollection
stellt eine Textzeile aus der Eingabedatei dar. - Eine Transformation vom Typ ParDo, die eine
DoFn
(inline als anonyme Klasse definiert) für jedes Element aufruft, das die Textzeilen in einzelne Wörter tokenisiert. Die Eingabe für diese Transformation istPCollection
der Textzeilen, die von der vorherigenTextIO.Read
-Transformation generiert wurden. Die TransformationParDo
gibt einen neuenPCollection
aus, wobei jedes Element ein einzelnes Wort im Text darstellt. - Die vom SDK bereitgestellte
Count
-Transformation ist eine generische Transformation, die einenPCollection
eines beliebigen Typs verwendet und einenPCollection
mit Schlüssel/Wert-Paaren zurückgibt. Jeder Schlüssel stellt ein eindeutiges Element aus der Eingabesammlung dar und jeder Wert stellt dar, wie häufig dieser Schlüssel in der Eingabesammlung erschienen ist.
In dieser Pipeline ist die Eingabe fürCount
derPCollection
einzelner Wörter, die durch den vorangegangenenParDo
generiert wurden, und die Ausgabe ist einPCollection
von Schlüssel/Wert-Paaren, wobei jeder Schlüssel ein eindeutiges Wort im Text darstellt und der zugehörige Wert die Häufigkeitszahl für jedes Wort ist. - Als Nächstes wird eine Transformation ausführt, die jedes Schlüssel/Wert-Paar eindeutiger Wörter und die Anzahl der Vorkommen als druckbaren String formatiert, der als Ausgabedatei ausgegeben werden kann.
- Eine Textdatei Write. Diese Transformation verwendet die letzte
PCollection
formatierterString
s als Eingabe und gibt jedes Element in einer Ausgabetextdatei aus. Jedes Element in der AusgabePCollection
stellt eine Textzeile aus der entstehenden Ausgabedatei dar.
Java
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
Java
Sie können Ihrer Transformation einen Namen geben, der in der Dataflow Monitoring Interface angezeigt wird. Verwenden Sie dazu den Vorgang .named()
, wie in diesem Beispiel dargestellt. Bei Ausführung Ihrer Pipeline durch den Dataflow-Dienst wird in der Monitoring-Oberfläche die Ausführungszeit der jeweiligen ParDo
-Transformation angegeben.
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { c.output(word); } } } }))
Java
.apply(Count.<String>perElement())
Java
MapElements
ist eine zusammengesetzte Transformation auf höherer Ebene, die ein einfaches ParDo einschließt. Für jedes Element in der Eingabe PCollection
wendet MapElements
eine Funktion an, die genau ein Ausgabeelement generiert. Dieser MapElements
ruft einen SimpleFunction
auf (definiert als anonyme Klasse), der die Formatierung durchführt. Als Eingabe verwendet dieser MapElements
einen PCollection
mit Schlüssel/Wert-Paaren, die von Count
generiert wurden, und produziert einen neuen PCollection
mit druckbaren Strings.
.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> element) { return element.getKey() + ": " + element.getValue(); } }))
Java
.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
Beachten Sie, dass die Write
-Transformation einen trivialen Ergebniswert vom Typ PDone
erzeugt, der in diesem Fall ignoriert wird.
Pipeline ausführen
Führen Sie die Pipeline durch Aufrufen der Methode run
aus. Diese sendet Ihre Pipeline zur Ausführung an den Pipeline-Runner, den Sie beim Erstellen der Pipeline angegeben haben.
Java
p.run();
WordCount-Beispiel
Dieses WordCount-Beispiel führt einige empfohlene Programmierverfahren ein, die die Durchführung von Lese- und Schreibvorgängen in Ihrer Pipeline und deren Wartung erleichtern. Obwohl sie nicht explizit erforderlich sind, können diese Verfahren die Flexibilität bei der Ausführung Ihrer Pipeline erhöhen, bei Tests Ihrer Pipeline helfen und dazu beitragen, dass der Code Ihrer Pipeline wiederverwendet werden kann.
In diesem Abschnitt wird davon ausgegangen, dass Sie mit den grundlegenden Konzepten für die Erstellung einer Pipeline vertraut sind. Wenn Sie glauben, dass Sie noch nicht so weit sind, lesen Sie oben den Abschnitt Minimal WordCount.
Java
- Anwenden eines
ParDo
mit einem explizitenDoFn
- Zusammengesetzte Transformationen erstellen
- Parametrisierbaren
PipelineOptions
anwenden
In den folgenden Abschnitten werden diese Schlüsselkonzepte ausführlich erklärt und der Pipelinecode in kleinere Abschnitte aufgeschlüsselt.
Explizite DoFns angeben
Wenn Sie Transformationen vom Typ ParDo
verwenden, ist es erforderlich, dass Sie den Verarbeitungsvorgang angeben, der auf jedes Element in der Eingabe PCollection
angewendet werden soll. Dieser Verarbeitungsvorgang ist eine Unterklasse der SDK-Klasse DoFn
. Durch die Beispielpipeline im vorigen Abschnitt "Minimal WordCount" werden die DoFn
-Unterklassen für jedes ParDo
-Element in der Zeile als anonyme innere Klasseninstanz erstellt.
Es ist jedoch häufig sinnvoll, DoFn
auf globaler Ebene zu definieren, was den Einheitentest vereinfacht und den ParDo
-Code lesbarer macht.
Wie im vorherigen Beispiel (Minimal WordCount) erwähnt, zeigt die Dataflow Monitoring Interface beim Ausführen Ihrer Pipeline an, wann jede ParDo
-Transformation ausgeführt wird. Der Dataflow-Dienst generiert automatisch Transformationsnamen für ParDo
-Transformationen aus dem Namen des DoFn
, den Sie weitergeben. Beispiel: ParDo
, der den FormatAsTextFn()
anwendet, wird in der Monitoring-Oberfläche als ParDo(FormatAsText)
angezeigt.
Java
In diesem Beispiel sind DoFn
s als statische Klassen definiert:
/** A DoFn that converts a Word and Count into a printable string. */ static class FormatAsTextFn extends DoFn<KV<String, Long>, String> { ... @Override public void processElement(ProcessContext c) { ... } } public static void main(String[] args) throws IOException { Pipeline p = ... // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform. p.apply(...) .apply(...) .apply(ParDo.of(new FormatAsTextFn())) ... }
Ausführliche Informationen zum Erstellen und Festlegen von abgeleiteten DoFn
-Klassen für Ihre ParDo
-Transformationen finden Sie unter Parallele Verarbeitung mit ParDo.
Zusammengesetzte Transformationen erstellen
Sie können einen Verarbeitungsvorgang, der aus mehreren Transformationen oder ParDo
-Schritten besteht, als abgeleiteten Klasse von PTransform
erstellen. Mit abgeleiteten PTransform
-Klassen können Sie komplexe wiederverwendbare Transformationen erstellen, die die Struktur Ihrer Pipeline klarer und modularer machen und Einheitentests vereinfachen.
Da die logische Struktur der Pipeline mit abgeleiteten PTransform
-Klassen explizit wird, können Sie damit auch einfacher die Pipeline überwachen. Bei der Erstellung der endgültigen, optimierten Struktur Ihrer Pipeline durch den Dataflow-Dienst kann die Struktur Ihrer Pipeline anhand der von Ihnen erstellten Transformationen auf der Dataflow Monitoring-Oberfläche genauer wiedergeben werden.
Java
In diesem Beispiel werden zwei Transformationen als abgeleitete PTransform
-Klasse CountWords
gekapselt. CountWords
enthält das ParDo
-Element, das ExtractWordsFn
ausführt, und die vom SDK bereitgestellte Count
-Transformation.
Wenn CountWords
definiert ist, geben wir die zugehörige absolute Eingabe und Ausgabe an. Die Eingabe ist der PCollection<String>
für den Extraktionsvorgang und die Ausgabe ist der PCollection<KV<String, Long>>
, der vom Zählvorgang produziert wird.
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } public static void main(String[] args) throws IOException { Pipeline p = ... p.apply(...) .apply(new CountWords()) ... }
Parametrisierbaren PipelineOptions
anwenden
Im vorigen Beispiel "Minimal WordCount" haben wir bei der Erstellung unserer Pipeline verschiedene Ausführungsoptionen festgelegt. In diesem Beispiel definieren wir eigene benutzerdefinierte Konfigurationsoptionen. Dazu erweitern wir PipelineOptions
.
Sie können Ihre eigenen Argumente hinzufügen, damit sie vom Befehlszeilen-Parser verarbeitet werden, und entsprechende Standardwerte für sie angeben. Anschließend können Sie in Ihrem Pipeline-Code auf die Optionswerte zugreifen.
Im Beispiel "Minimal WordCount" haben wir Pipeline-Optionen als vordefinierten Code aufgenommen. In der Regel werden PipelineOptions
allerdings durch das Parsen des Befehlszeilenarguments erstellt.
Java
public static interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); ... } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); ... }
Beispiel "Debugging WordCount"
Im Beispiel Debugging WordCount werden einige Best Practices für die Instrumentierung Ihres Pipeline-Codes erläutert. Dataflow Monitoring Interface und Dataflow-Aggregatoren liefern während der Ausführung der Pipeline zusätzliche Einblicke und Informationen.
Java
Sie können auch das DataflowAssert-Element des SDKs verwenden, um die Ausgabe Ihrer Transformationen in den verschiedenen Phasen Ihrer Pipeline zu testen.
Java
- Logs in der Dataflow-Überwachungsoberfläche anzeigen
- Log-Ebenen der Dataflow-Worker kontrollieren
Aggregators
s erstellen- Pipeline über
DataflowAssert
testen
In den folgenden Abschnitten werden diese Schlüsselkonzepte ausführlich erklärt und der Pipelinecode in kleinere Abschnitte aufgeschlüsselt.
Logs in der Dataflow-Überwachungsoberfläche anzeigen
Google Cloud Logging aggregiert die Logs aller Worker Ihres Dataflow-Jobs an einem einzigen Speicherort in der Google Cloud Console. Sie können mit der Dataflow-Überwachungsoberfläche nach den Logs aus allen Compute Engine-Instanzen suchen, die Dataflow hochgefahren hat, um Ihren Dataflow-Job abzuschließen, und auf diese Logs zugreifen. Sie können Logging-Anweisungen in die DoFn
-Instanzen Ihrer Pipeline einfügen, die während der Ausführung Ihrer Pipeline auf der Monitoring-Oberfläche angezeigt werden.
Java
Der folgende SLF4J-Logger verwendet den vollständig qualifizierten Klassennamen von FilterTextFn
als Logger-Namen. Von diesem Namen wird auf alle Log-Anweisungen verwiesen, die von diesem Logger ausgegeben wurden. Mit den entsprechenden Einstellungen auf Log-Ebene sind sie in der Dataflow-Überwachungsoberfläche sichtbar.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DebuggingWordCount { public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); ... public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI // only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); ... } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. LOG.trace("Did not match: " + c.element().getKey()); ... } } } }
Log-Ebenen der Dataflow-Worker kontrollieren
Java
Dataflow-Worker, die Nutzercode ausführen, melden sich entsprechend ihrer Konfiguration standardmäßig mit Log-Ebene INFO oder höher bei Cloud Logging an. Sie können Log-Ebenen für bestimmte Logging-Namespaces überschreiben, indem Sie Folgendes angeben:
--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}
Beispiel: Sie geben bei Ausführung der Pipeline mit dem Dataflow-Dienst
--workerLogLevelOverrides={"com.example":"DEBUG"}
an, dass die Monitoring Interface für das Paket com.example
neben den Logs der Ebene DEBUG oder höher nur Logs der Ebene INFO oder höher enthält.
Zusätzlich können Sie die standardmäßige Dataflow-Worker-Logging-Konfiguration überschreiben. Geben Sie dafür Folgendes an:
--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>
Beispiel: Sie geben bei Ausführung der Pipeline mit dem Dataflow-Dienst
--defaultWorkerLogLevel=DEBUG
an, dass die Monitoring Interface alle Logs der Ebene DEBUG oder höher enthält. Beachten Sie, dass eine Änderung der standardmäßigen Ebene des Worker-Logs in TRACE oder DEBUG die Anzahl der Logging-Informationen erheblich erhöhen wird.
Aggregatoren erstellen
Ein benutzerdefinierter Aggregator kann Werte in Ihrer Pipeline verfolgen, während sie ausgeführt wird. Diese Werte werden in der Dataflow Monitoring-Oberfläche angezeigt, wenn die Pipeline mit dem Dataflow-Dienst ausgeführt wird.
Aggregatoren können erst sichtbar werden, wenn das System mit der Ausführung der ParDo
-Transformation beginnt, die sie erstellt hat, und/oder ihr erster Wert geändert wird. Sie sind dann in der Überwachungsoberfläche unter Jobzusammenfassung sichtbar.
Die folgenden benutzerdefinierten Aggregatoren verfolgen die Anzahl der übereinstimmenden und nicht übereinstimmenden Wörter.
Java
public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private final Aggregator<Long, Long> matchedWords = createAggregator("matchedWords", new Sum.SumLongFn()); private final Aggregator<Long, Long> unmatchedWords = createAggregator("umatchedWords", new Sum.SumLongFn()); @Override public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { ... matchedWords.addValue(1L); ... } else { ... unmatchedWords.addValue(1L); } } } }
Aggregatoren in Batch- und Streaming-Pipelines
Aggregatoren in Batch-Pipelines sorgen für Einheitlichkeit. Sie werden exakt einmal für erfolgreiche Bundles und nicht für fehlgeschlagene Bundles festgelegt.
In Streaming-Pipelines bieten Aggregatoren mehr nachsichtige Semantik. Der Beitrag aus erfolgreichen Bündeln ist die bestmögliche Leistung und kann sich bei fehlgeschlagenen Bündeln im endgültigen Wert widerspiegeln.
Pipeline über DataflowAssert
testen
Java
DataflowAssert ist eine Sammlung praktischer PTransforms
im Stil von Hamcrest Collection Matchers, die bei der Entwicklung von Pipelinetests zur Validierung der Inhalte von PCollections
verwendet werden. DataflowAssert
eignet sich am besten für Einheitentests mit kleinen Datasets. Hier wird es jedoch als Schulungstool demonstriert.
Mit dem folgenden Test soll verifiziert werden, ob die Anzahl der gefilterten Wörter mit der erwarteten Anzahl übereinstimmt. Beachten Sie, dass DataflowAssert
keine Ausgabe liefert. Die erfolgreiche Ausführung der Pipeline ist Indiz dafür, dass die Erwartungen erfüllt wurden. Erfahren Sie, wie Sie die Pipeline testen können. Unter DebuggingWordCountTest können Sie ein Beispiel für einen Einheitentest aufrufen.
public static void main(String[] args) { ... List<KV<String, Long>> expectedResults = Arrays.asList( KV.of("Flourish", 3L), KV.of("stomach", 1L)); DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); ... }
WindowedWordCount
Java
In dem Beispiel WindowedWordCount
werden genau wie in den vorangegangenen Beispielen Wörter in Text gezählt, doch dieses Mal werden einige erweiterte Konzepte vorgestellt. Die Eingabe für WindowedWordCount
kann ein festes Dataset sein (wie in den vorherigen Beispielen) oder ein unbegrenzter Datenstrom.
Das Dataflow-SDK ist praktisch, weil es Ihnen die Erstellung einer einzelnen Pipeline ermöglicht, die sowohl begrenzte als auch unbegrenzte Eingabetypen verarbeiten kann. Wenn die Eingabe unbegrenzt ist, werden auch alle PCollections
s der Pipeline unbegrenzt sein. Dasselbe gilt für die begrenzte Eingabe.
Bevor Sie diesen Abschnitt lesen, sollten Sie mit den allgemeinen Grundsätzen für die Erstellung einer Pipeline vertraut sein.
Neue Konzepte:- Unbegrenzte und begrenzte Eingabe lesen
- Zeitstempel zu Daten hinzufügen
- Windowing
- Unbegrenzte und begrenzte Ausgabe schreiben
In den folgenden Abschnitten werden diese Schlüsselkonzepte ausführlich erklärt und der Pipelinecode in kleinere Abschnitte aufgeschlüsselt.
Unbegrenzte und begrenzte Eingabe lesen
Die Eingabe für WindowedWordCount
kann entweder begrenzt oder unbegrenzt sein. Wenn Ihre Eingabe über eine feste Anzahl an Elementen verfügt, wird sie als "begrenztes" Dataset bezeichnet. Wird Ihre Eingabe kontinuierlich aktualisiert, wird sie als "unbegrenzt" bezeichnet. Unter Begrenzte und unbegrenzte PCollections erfahren Sie mehr über Eingabetypen.
In diesem Beispiel können Sie auswählen, ob die Eingabe begrenzt oder unbegrenzt ist. Denken Sie daran, dass die Eingabe bei allen Beispielen eine Sammlung von Shakespeare-Texten ist. Es handelt sich also um eine begrenzte Eingabe. Um allerdings die neuen Konzepte in diesem Beispiel zu erklären, handelt es sich hier um eine Wiederholung von Shakespeare-Texten.
Wenn Ihre Eingabe in diesem Beispiel nicht begrenzt ist, wird sie von einem Google Cloud Pub/Sub-Thema gelesen. In diesem Fall ist die auf die Pipeline angewendete Read
-Transformation PubSubIO.Read
. Andernfalls wird die Eingabe von Google Cloud Storage gelesen.
public static void main(String[] args) throws IOException { ... PCollection<String> input; if (options.isUnbounded()) { LOG.info("Reading from PubSub."); // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg. input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic())); } else { // Else, this is a bounded pipeline. Read from the Google Cloud Storage file. input = pipeline.apply(TextIO.Read.from(options.getInputFile())) ... } ... }
Zeitstempel zu Daten hinzufügen
Jedem Element in einer PCollection
ist ein Zeitstempel zugeordnet. Der Zeitstempel wird jedem Element von der Quelle zugewiesen, die die PCollection
erstellt. Wenn Sie in diesem Beispiel eine unbegrenzte Eingabe für Ihre Pipeline auswählen, kommen die Zeitstempel von der Pub/Sub-Datenquelle. Wenn Sie eine begrenzte Eingabe auswählen, setzt die AddTimestampsFn
benannte und von ParDo
aufgerufene DoFn
-Methode einen Zeitstempel für jedes Element in PCollection
fest.
public static void main(String[] args) throws IOException { ... input = pipeline .apply(...) // Add an element timestamp, using an artificial time. .apply(ParDo.of(new AddTimestampFn())); }
Unten sehen Sie den Code für AddTimestampsFn
, ein von ParDo
aufgerufenes DoFn
-Element, der das Datenelement des Zeitstempels festlegt, das von dem Element selbst angegeben wurde. Wenn es sich bei den Elementen beispielsweise um Logzeilen handelt, könnte ParDo
die Zeitüberschreitung des Logstrings analysieren und als Zeitstempel des Elements festlegen. Da die Originalwerke von Shakespeare keine Zeitstempel enthalten, haben wir zufällige Zeitstempel erstellt, um das Konzept zu veranschaulichen. Jede Zeile des Eingabetexts erhält irgendwann in einem 2-stündigen Zeitraum einen zufälligen verknüpften Zeitstempel.
static class AddTimestampFn extends DoFn<String, String> { private static final Duration RAND_RANGE = Duration.standardHours(2); private final Instant minTimestamp; AddTimestampFn() { this.minTimestamp = new Instant(System.currentTimeMillis()); } @Override public void processElement(ProcessContext c) { // Generate a timestamp that falls somewhere in the past 2 hours. long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); Instant randomTimestamp = minTimestamp.plus(randMillis); // Set the data element with that timestamp. c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); } }
Unter Zeitstempel der PCollection-Elemente erfahren Sie mehr über Zeitstempel.
Windowing
Das Dataflow SDK verwendet ein Konzept namens Windowing, um eine PCollection
gemäß den Zeitstempeln der einzelnen Elemente zu unterteilen.
Dataflow-Transformationen, die mehrere Elemente zusammenfassen, verarbeiten jede PCollection
als eine Abfolge mehrerer, finiter Fenster, obwohl die gesamte Sammlung selbst möglicherweise von unendlicher Größe (unbegrenzt) ist.
Das WindowingWordCount
-Beispiel wendet Windowing mit festgelegten Zeitfenstern an, in denen jedes Fenster ein festes Zeitintervall darstellt. Die Größe des festen Fensters für dieses Beispiel wird auf eine Minute zurückgesetzt. Sie können diese Einstellung mit einer Befehlszeilenoption ändern. Anschließend wendet die Pipeline die CountWords
-Transformation an.
PCollection<KV<String, Long>> wordCounts = input .apply(Window.<String>into( FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) .apply(new WordCount.CountWords());
Unbegrenzte und begrenzte Ausgabe schreiben
Da Ihre Eingabe entweder begrenzt oder unbegrenzt sein kann, gilt auch dasselbe für unsere Ausgabe-PCollection
. Wir müssen eine geeignete Senke auswählen. Einige Ausgabesenken unterstützen nur die begrenzte Ausgabe oder nur die unbegrenzte Ausgabe. Bei einer Textdatei handelt es sich beispielsweise um eine Senke, die nur begrenzte Daten erhalten kann. Die BigQuery-Ausgabequelle unterstützt sowohl die begrenzte als auch die unbegrenzte Eingabe.
In diesem Beispiel streamen wir die Ergebnisse in eine BigQuery-Tabelle. Die Ergebnisse werden dann für eine BigQuery-Tabelle formatiert und anschließend mit BigQueryIO.Write
in BigQuery geschrieben.
wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));