WordCount-Beispielpipeline

Führen Sie zuerst die Schritte im Schnellstart aus, bevor Sie fortfahren.

Die WordCount-Beispiele zeigen, wie eine Verarbeitungspipeline eingerichtet wird, die Text lesen, Textzeilen in einzelne Wörter tokenisieren und für jedes dieser Wörter eine Häufigkeitszählung durchführen kann. Die Dataflow-SDKs enthalten eine Serie dieser vier aufeinanderfolgenden immer detaillierter werdenden WordCount-Beispiele, die aufeinander aufbauen. Bei dem Eingabetext handelt es sich bei allen Beispielen um verschiedene Shakespeare-Texte.

Bei jedem WordCount-Beispiel werden unterschiedliche Konzepte in die Dataflow-SDKs eingeführt.

  • Minimal WordCount demonstriert die allgemeinen Grundsätze, die bei der Erstellung einer Dataflow-Pipeline von Bedeutung sind.
  • WordCount führt einige der am häufigsten verwendeten Best Practices für die Erstellung wiederverwendbarer und wartbarer Pipelines ein.
  • Debugging WordCount führt Verfahren zur Protokollierung und zur Fehlerbehebung ein.
  • Windowed WordCount zeigt, wie Sie mit dem Dataflow-Programmiermodell sowohl begrenzte als auch unbegrenzte Datasets verarbeiten können.

Als Erstes sollten Sie das einfachste Beispiel verstehen: Minimal WordCount. Nachdem Sie sich mit den allgemeinen Grundsätzen für die Erstellung einer Pipeline vertraut gemacht haben, fahren Sie mit den Best Practices für das Schreiben von Dataflow-Programmen in WordCount fort. Anschließend lesen Sie sich Debugging WordCount durch, um zu verstehen, wie übliche Verfahren zum Logging und zur Fehlerbehebung angewendet werden. Zu guter Letzt erfahren Sie, wie Sie dasselbe Rechenmuster in begrenzten und unbegrenzten Datasets in Windowed WordCount verwenden.

MinimalWordCount

Minimal WordCount zeigt eine einfache Pipeline, die einen Textblock aus einer Datei in Google Cloud Storage liest, Transformationen anwendet, um Wörter zu tokenisieren und zu zählen, und die Daten in eine Ausgabedatei in einem Cloud Storage-Bucket schreibt. In diesem Beispiel werden die Speicherorte für die Eingabe- und Ausgabedateien als vordefinierter Code aufgenommen und es wird keine Fehlersuche durchgeführt. Es sollen nur die allerwichtigsten Schritte bei der Erstellung einer Dataflow-Pipeline gezeigt werden. In späteren Beispielen werden die Eingabe- und Ausgabequellen parametrisiert sowie weitere Best Practices vorgestellt.

Java

Wichtige Konzepte:
  1. Pipeline erstellen
  2. Transformationen auf die Pipeline anwenden
    • Eingabe lesen (in diesem Beispiel: Textdateien lesen)
    • ParDo-Transformationen anwenden
    • Von SDK bereitgestellte Transformationen anwenden (in diesem Beispiel Count)
    • Ausgabe schreiben (in diesem Beispiel: in Google Cloud Storage schreiben)
  3. Pipeline ausführen

In den folgenden Abschnitten werden diese Konzepte ausführlich erklärt und Ausschnitte des relevanten Codes aus der Minimal WordCount-Pipeline bereitgestellt.

Pipeline erstellen

Bei der Erstellung einer Cloud Dataflow-Pipeline muss als Erstes ein Objekt Pipeline-Optionen erstellt werden. Mit diesem Objekt können wir verschiedene Optionen für unsere Pipeline festlegen, beispielsweise den Pipeline-Runner, der unsere Pipeline ausführt, die ID unseres Projekts und den Staging-Ort für die Pipeline, an dem ihre Dateien gespeichert werden. Über ihn kann in der Cloud auf Ihre JAR-Dateien zugegriffen werden. Zwar legen wir die Optionen in diesem Beispiel programmatisch fest, doch immer öfter werden Befehlszeilenargumente verwendet, um Pipeline-Optionen festzulegen.

In unserem Beispiel haben wir BlockingDataflowPipelineRunner als PipelineRunner angegeben, damit unsere Pipeline mit dem Google Cloud Dataflow-Dienst in der Cloud ausgeführt wird. Es gibt weitere Optionen, die Sie für die Ausführung Ihrer Pipeline in der Cloud festlegen können. Sie können diese Option auch vollständig weglassen. In diesem Fall wird Ihre Pipeline vom Standard-Runner lokal ausgeführt. Diese Optionen werden in den nächsten beiden WordCount-Beispielen vorgestellt und unter Ausführungsparameter festlegen genauer erläutert.

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

Im nächsten Schritt erstellen Sie ein Pipeline-Objekt mit den soeben erstellten Optionen. Das Pipeline-Objekt baut die auszuführende Transformationsgrafik auf, die mit dieser bestimmten Pipeline verknüpft ist.

Java

Pipeline p = Pipeline.create(options);

Ausführliche Informationen zum Pipeline-Objekt und seiner Funktionsweise finden Sie unter Pipelines.

Pipelinetransformationen

Die Pipeline Minimal WordCount enthält mehrere Transformationen, die Daten in die Pipeline einlesen, ändern oder auf andere Weise transformieren und die Ergebnisse ausgeben. Jede Transformation stellt einen Vorgang in der Pipeline dar.

Bei jeder Transformation wird eine Art von Eingabe verwendet (Daten oder sonstiges) und es werden Ausgabedaten produziert. Die Eingabe- und Ausgabedaten werden von der SDK-Klasse PCollection dargestellt. PCollection ist eine spezielle Klasse, die von der Dataflow-SDK bereitgestellt wird, die Sie für die Darstellung eines Datasets mit beliebiger Größe verwenden können. Dies schließt unbegrenzte Datasets ein.

Abbildung 1 zeigt den Datenfluss der Pipeline:

Die Pipeline verwendet eine TextIO.Read-Transformation, um eine PCollection aus Daten zu erstellen, die in einer Eingabedatendatei gespeichert wurden. Die CountWords-Transformation produziert eine PCollection mit Wortzählungen aus der PCollection mit reinem Text. TextIO.Write schreibt die formatierten Wortzählungen in eine Ausgabedatendatei.
Abbildung 1: Der Datenfluss der Pipeline

Die Minimal WordCount-Pipeline enthält fünf Transformationen:

  1. Die Transformation Read einer Textdatei wird auf das Pipeline-Objekt angewendet und erzeugt einen PCollection als Ausgabe. Jedes Element in der Ausgabe PCollection stellt eine Textzeile aus der Eingabedatei dar.
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. Eine Transformation vom Typ ParDo, die eine DoFn (inline als anonyme Klasse definiert) für jedes Element aufruft, das die Textzeilen in einzelne Wörter tokenisiert. Die Eingabe für diese Transformation ist PCollection der Textzeilen, die von der vorherigen TextIO.Read-Transformation generiert wurden. Die Transformation ParDo gibt einen neuen PCollection aus, wobei jedes Element ein einzelnes Wort im Text darstellt.
  4. Java

    Sie können Ihrer Transformation einen Namen geben, der in der Dataflow Monitoring Interface angezeigt wird. Verwenden Sie dazu den Vorgang .named(), wie in diesem Beispiel dargestellt. Bei Ausführung Ihrer Pipeline durch den Dataflow-Dienst wird in der Monitoring-Oberfläche die Ausführungszeit der jeweiligen ParDo-Transformation angegeben.

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. Die vom SDK bereitgestellte Count-Transformation ist eine generische Transformation, die einen PCollection eines beliebigen Typs verwendet und einen PCollection mit Schlüssel/Wert-Paaren zurückgibt. Jeder Schlüssel stellt ein eindeutiges Element aus der Eingabesammlung dar und jeder Wert stellt dar, wie häufig dieser Schlüssel in der Eingabesammlung erschienen ist.

    In dieser Pipeline ist die Eingabe für Count der PCollection einzelner Wörter, die durch den vorangegangenen ParDo generiert wurden, und die Ausgabe ist ein PCollection von Schlüssel/Wert-Paaren, wobei jeder Schlüssel ein eindeutiges Wort im Text darstellt und der zugehörige Wert die Häufigkeitszahl für jedes Wort ist.
  6. Java

      .apply(Count.<String>perElement())
    
  7. Als Nächstes wird eine Transformation ausführt, die jedes Schlüssel/Wert-Paar eindeutiger Wörter und die Anzahl der Vorkommen als druckbaren String formatiert, der als Ausgabedatei ausgegeben werden kann.
  8. Java

    MapElements ist eine zusammengesetzte Transformation auf höherer Ebene, die ein einfaches ParDo einschließt. Für jedes Element in der Eingabe PCollection wendet MapElementseine Funktion an, die genau ein Ausgabeelement generiert. Dieser MapElements ruft einen SimpleFunction auf (definiert als anonyme Klasse), der die Formatierung durchführt. Als Eingabe verwendet dieser MapElements einen PCollection mit Schlüssel/Wert-Paaren, die von Count generiert wurden, und produziert einen neuen PCollection mit druckbaren Strings.

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. Eine Textdatei Write. Diese Transformation verwendet die letzte PCollection formatierter Strings als Eingabe und gibt jedes Element in einer Ausgabetextdatei aus. Jedes Element in der Ausgabe PCollection stellt eine Textzeile aus der entstehenden Ausgabedatei dar.
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Beachten Sie, dass die Write-Transformation einen trivialen Ergebniswert vom Typ PDone erzeugt, der in diesem Fall ignoriert wird.

Pipeline ausführen

Führen Sie die Pipeline durch Aufrufen der Methode run aus. Diese sendet Ihre Pipeline zur Ausführung an den Pipeline-Runner, den Sie beim Erstellen der Pipeline angegeben haben.

Java

p.run();

WordCount-Beispiel

Dieses WordCount-Beispiel führt einige empfohlene Programmierverfahren ein, die die Durchführung von Lese- und Schreibvorgängen in Ihrer Pipeline und deren Wartung erleichtern. Obwohl sie nicht explizit erforderlich sind, können diese Verfahren die Flexibilität bei der Ausführung Ihrer Pipeline erhöhen, bei Tests Ihrer Pipeline helfen und dazu beitragen, dass der Code Ihrer Pipeline wiederverwendet werden kann.

In diesem Abschnitt wird davon ausgegangen, dass Sie mit den grundlegenden Konzepten für die Erstellung einer Pipeline vertraut sind. Wenn Sie glauben, dass Sie noch nicht so weit sind, lesen Sie oben den Abschnitt Minimal WordCount.

Java

Neue Konzepte:
  1. Anwenden eines ParDo mit einem expliziten DoFn
  2. Zusammengesetzte Transformationen erstellen
  3. Parametrisierbaren PipelineOptions anwenden

In den folgenden Abschnitten werden diese Schlüsselkonzepte ausführlich erklärt und der Pipelinecode in kleinere Abschnitte aufgeschlüsselt.

Explizite DoFns angeben

Wenn Sie Transformationen vom Typ ParDo verwenden, ist es erforderlich, dass Sie den Verarbeitungsvorgang angeben, der auf jedes Element in der Eingabe PCollection angewendet werden soll. Dieser Verarbeitungsvorgang ist eine Unterklasse der SDK-Klasse DoFn. Durch die Beispielpipeline im vorigen Abschnitt "Minimal WordCount" werden die DoFn-Unterklassen für jedes ParDo-Element in der Zeile als anonyme innere Klasseninstanz erstellt.

Es ist jedoch häufig sinnvoll, DoFn auf globaler Ebene zu definieren, was den Einheitentest vereinfacht und den ParDo-Code lesbarer macht.

Wie im vorherigen Beispiel (Minimal WordCount) erwähnt, zeigt die Dataflow Monitoring Interface beim Ausführen Ihrer Pipeline an, wann jede ParDo-Transformation ausgeführt wird. Der Dataflow-Dienst generiert automatisch Transformationsnamen für ParDo-Transformationen aus dem Namen des DoFn, den Sie weitergeben. Beispiel: ParDo, der den FormatAsTextFn() anwendet, wird in der Monitoring-Oberfläche als ParDo(FormatAsText) angezeigt.

Java

In diesem Beispiel sind DoFns als statische Klassen definiert:

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

Ausführliche Informationen zum Erstellen und Festlegen von abgeleiteten DoFn-Klassen für Ihre ParDo-Transformationen finden Sie unter Parallele Verarbeitung mit ParDo.

Zusammengesetzte Transformationen erstellen

Sie können einen Verarbeitungsvorgang, der aus mehreren Transformationen oder ParDo-Schritten besteht, als abgeleiteten Klasse von PTransform erstellen. Mit abgeleiteten PTransform-Klassen können Sie komplexe wiederverwendbare Transformationen erstellen, die die Struktur Ihrer Pipeline klarer und modularer machen und Einheitentests vereinfachen.

Da die logische Struktur der Pipeline mit abgeleiteten PTransform-Klassen explizit wird, können Sie damit auch einfacher die Pipeline überwachen. Bei der Erstellung der endgültigen, optimierten Struktur Ihrer Pipeline durch den Dataflow-Dienst kann die Struktur Ihrer Pipeline anhand der von Ihnen erstellten Transformationen auf der Dataflow Monitoring-Oberfläche genauer wiedergeben werden.

Java

In diesem Beispiel werden zwei Transformationen als abgeleitete PTransform-Klasse CountWords gekapselt. CountWords enthält das ParDo-Element, das ExtractWordsFn ausführt, und die vom SDK bereitgestellte Count-Transformation.

Wenn CountWords definiert ist, geben wir die zugehörige absolute Eingabe und Ausgabe an. Die Eingabe ist der PCollection<String> für den Extraktionsvorgang und die Ausgabe ist der PCollection<KV<String, Long>>, der vom Zählvorgang produziert wird.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

Parametrisierbaren PipelineOptions anwenden

Im vorigen Beispiel "Minimal WordCount" haben wir bei der Erstellung unserer Pipeline verschiedene Ausführungsoptionen festgelegt. In diesem Beispiel definieren wir eigene benutzerdefinierte Konfigurationsoptionen. Dazu erweitern wir PipelineOptions.

Sie können Ihre eigenen Argumente hinzufügen, damit sie vom Befehlszeilen-Parser verarbeitet werden, und entsprechende Standardwerte für sie angeben. Anschließend können Sie in Ihrem Pipeline-Code auf die Optionswerte zugreifen.

Im Beispiel "Minimal WordCount" haben wir Pipeline-Optionen als vordefinierten Code aufgenommen. In der Regel werden PipelineOptions allerdings durch das Parsen des Befehlszeilenarguments erstellt.

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

Beispiel "Debugging WordCount"

Im Beispiel Debugging WordCount werden einige Best Practices für die Instrumentierung Ihres Pipeline-Codes erläutert. Dataflow Monitoring Interface und Dataflow-Aggregatoren liefern während der Ausführung der Pipeline zusätzliche Einblicke und Informationen.

Java

Sie können auch das DataflowAssert-Element des SDKs verwenden, um die Ausgabe Ihrer Transformationen in den verschiedenen Phasen Ihrer Pipeline zu testen.

Java

Neue Konzepte:
  1. Logs in der Dataflow-Überwachungsoberfläche anzeigen
  2. Log-Ebenen der Dataflow-Worker kontrollieren
  3. Aggregatorss erstellen
  4. Pipeline über DataflowAssert testen

In den folgenden Abschnitten werden diese Schlüsselkonzepte ausführlich erklärt und der Pipelinecode in kleinere Abschnitte aufgeschlüsselt.

Logs in der Dataflow-Überwachungsoberfläche anzeigen

Google Cloud Logging aggregiert die Logs aller Worker Ihres Dataflow-Jobs an einem einzigen Speicherort in der Google Cloud Console. Sie können mit der Dataflow-Überwachungsoberfläche nach den Logs aus allen Compute Engine-Instanzen suchen, die Dataflow hochgefahren hat, um Ihren Dataflow-Job abzuschließen, und auf diese Logs zugreifen. Sie können Logging-Anweisungen in die DoFn-Instanzen Ihrer Pipeline einfügen, die während der Ausführung Ihrer Pipeline auf der Monitoring-Oberfläche angezeigt werden.

Java

Der folgende SLF4J-Logger verwendet den vollständig qualifizierten Klassennamen von FilterTextFn als Logger-Namen. Von diesem Namen wird auf alle Log-Anweisungen verwiesen, die von diesem Logger ausgegeben wurden. Mit den entsprechenden Einstellungen auf Log-Ebene sind sie in der Dataflow-Überwachungsoberfläche sichtbar.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Log-Ebenen der Dataflow-Worker kontrollieren

Java

Dataflow-Worker, die Nutzercode ausführen, melden sich entsprechend ihrer Konfiguration standardmäßig mit Log-Ebene INFO oder höher bei Cloud Logging an. Sie können Log-Ebenen für bestimmte Logging-Namespaces überschreiben, indem Sie Folgendes angeben:

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

Beispiel: Sie geben bei Ausführung der Pipeline mit dem Dataflow-Dienst

--workerLogLevelOverrides={"com.example":"DEBUG"}

an, dass die Monitoring Interface für das Paket com.example neben den Logs der Ebene DEBUG oder höher nur Logs der Ebene INFO oder höher enthält.

Zusätzlich können Sie die standardmäßige Dataflow-Worker-Logging-Konfiguration überschreiben. Geben Sie dafür Folgendes an:

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

Beispiel: Sie geben bei Ausführung der Pipeline mit dem Dataflow-Dienst

--defaultWorkerLogLevel=DEBUG

an, dass die Monitoring Interface alle Logs der Ebene DEBUG oder höher enthält. Beachten Sie, dass eine Änderung der standardmäßigen Ebene des Worker-Logs in TRACE oder DEBUG die Anzahl der Logging-Informationen erheblich erhöhen wird.

Weitere Informationen über das Logging in Cloud Dataflow

Aggregatoren erstellen

Ein benutzerdefinierter Aggregator kann Werte in Ihrer Pipeline verfolgen, während sie ausgeführt wird. Diese Werte werden in der Dataflow Monitoring-Oberfläche angezeigt, wenn die Pipeline mit dem Dataflow-Dienst ausgeführt wird.

Aggregatoren können erst sichtbar werden, wenn das System mit der Ausführung der ParDo-Transformation beginnt, die sie erstellt hat, und/oder ihr erster Wert geändert wird. Sie sind dann in der Überwachungsoberfläche unter Jobzusammenfassung sichtbar.

Die folgenden benutzerdefinierten Aggregatoren verfolgen die Anzahl der übereinstimmenden und nicht übereinstimmenden Wörter.

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

Aggregatoren in Batch- und Streaming-Pipelines

Aggregatoren in Batch-Pipelines sorgen für Einheitlichkeit. Sie werden exakt einmal für erfolgreiche Bundles und nicht für fehlgeschlagene Bundles festgelegt.

In Streaming-Pipelines bieten Aggregatoren mehr nachsichtige Semantik. Der Beitrag aus erfolgreichen Bündeln ist die bestmögliche Leistung und kann sich bei fehlgeschlagenen Bündeln im endgültigen Wert widerspiegeln.

Pipeline über DataflowAssert testen

Java

DataflowAssert ist eine Sammlung praktischer PTransforms im Stil von Hamcrest Collection Matchers, die bei der Entwicklung von Pipelinetests zur Validierung der Inhalte von PCollections verwendet werden. DataflowAssert eignet sich am besten für Einheitentests mit kleinen Datasets. Hier wird es jedoch als Schulungstool demonstriert.

Mit dem folgenden Test soll verifiziert werden, ob die Anzahl der gefilterten Wörter mit der erwarteten Anzahl übereinstimmt. Beachten Sie, dass DataflowAssert keine Ausgabe liefert. Die erfolgreiche Ausführung der Pipeline ist Indiz dafür, dass die Erwartungen erfüllt wurden. Erfahren Sie, wie Sie die Pipeline testen können. Unter DebuggingWordCountTest können Sie ein Beispiel für einen Einheitentest aufrufen.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

Java

In dem Beispiel WindowedWordCount werden genau wie in den vorangegangenen Beispielen Wörter in Text gezählt, doch dieses Mal werden einige erweiterte Konzepte vorgestellt. Die Eingabe für WindowedWordCount kann ein festes Dataset sein (wie in den vorherigen Beispielen) oder ein unbegrenzter Datenstrom.

Das Dataflow-SDK ist praktisch, weil es Ihnen die Erstellung einer einzelnen Pipeline ermöglicht, die sowohl begrenzte als auch unbegrenzte Eingabetypen verarbeiten kann. Wenn die Eingabe unbegrenzt ist, werden auch alle PCollectionss der Pipeline unbegrenzt sein. Dasselbe gilt für die begrenzte Eingabe.

Bevor Sie diesen Abschnitt lesen, sollten Sie mit den allgemeinen Grundsätzen für die Erstellung einer Pipeline vertraut sein.

Neue Konzepte:
  1. Unbegrenzte und begrenzte Eingabe lesen
  2. Zeitstempel zu Daten hinzufügen
  3. Windowing
  4. Unbegrenzte und begrenzte Ausgabe schreiben

In den folgenden Abschnitten werden diese Schlüsselkonzepte ausführlich erklärt und der Pipelinecode in kleinere Abschnitte aufgeschlüsselt.

Unbegrenzte und begrenzte Eingabe lesen

Die Eingabe für WindowedWordCount kann entweder begrenzt oder unbegrenzt sein. Wenn Ihre Eingabe über eine feste Anzahl an Elementen verfügt, wird sie als "begrenztes" Dataset bezeichnet. Wird Ihre Eingabe kontinuierlich aktualisiert, wird sie als "unbegrenzt" bezeichnet. Unter Begrenzte und unbegrenzte PCollections erfahren Sie mehr über Eingabetypen.

In diesem Beispiel können Sie auswählen, ob die Eingabe begrenzt oder unbegrenzt ist. Denken Sie daran, dass die Eingabe bei allen Beispielen eine Sammlung von Shakespeare-Texten ist. Es handelt sich also um eine begrenzte Eingabe. Um allerdings die neuen Konzepte in diesem Beispiel zu erklären, handelt es sich hier um eine Wiederholung von Shakespeare-Texten.

Wenn Ihre Eingabe in diesem Beispiel nicht begrenzt ist, wird sie von einem Google Cloud Pub/Sub-Thema gelesen. In diesem Fall ist die auf die Pipeline angewendete Read-Transformation PubSubIO.Read. Andernfalls wird die Eingabe von Google Cloud Storage gelesen.

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

Zeitstempel zu Daten hinzufügen

Jedem Element in einer PCollection ist ein Zeitstempel zugeordnet. Der Zeitstempel wird jedem Element von der Quelle zugewiesen, die die PCollection erstellt. Wenn Sie in diesem Beispiel eine unbegrenzte Eingabe für Ihre Pipeline auswählen, kommen die Zeitstempel von der Pub/Sub-Datenquelle. Wenn Sie eine begrenzte Eingabe auswählen, setzt die AddTimestampsFn benannte und von ParDo aufgerufene DoFn-Methode einen Zeitstempel für jedes Element in PCollection fest.

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

Unten sehen Sie den Code für AddTimestampsFn, ein von ParDo aufgerufenes DoFn-Element, der das Datenelement des Zeitstempels festlegt, das von dem Element selbst angegeben wurde. Wenn es sich bei den Elementen beispielsweise um Logzeilen handelt, könnte ParDo die Zeitüberschreitung des Logstrings analysieren und als Zeitstempel des Elements festlegen. Da die Originalwerke von Shakespeare keine Zeitstempel enthalten, haben wir zufällige Zeitstempel erstellt, um das Konzept zu veranschaulichen. Jede Zeile des Eingabetexts erhält irgendwann in einem 2-stündigen Zeitraum einen zufälligen verknüpften Zeitstempel.

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

Unter Zeitstempel der PCollection-Elemente erfahren Sie mehr über Zeitstempel.

Windowing

Das Dataflow SDK verwendet ein Konzept namens Windowing, um eine PCollection gemäß den Zeitstempeln der einzelnen Elemente zu unterteilen. Dataflow-Transformationen, die mehrere Elemente zusammenfassen, verarbeiten jede PCollection als eine Abfolge mehrerer, finiter Fenster, obwohl die gesamte Sammlung selbst möglicherweise von unendlicher Größe (unbegrenzt) ist.

Das WindowingWordCount-Beispiel wendet Windowing mit festgelegten Zeitfenstern an, in denen jedes Fenster ein festes Zeitintervall darstellt. Die Größe des festen Fensters für dieses Beispiel wird auf eine Minute zurückgesetzt. Sie können diese Einstellung mit einer Befehlszeilenoption ändern. Anschließend wendet die Pipeline die CountWords-Transformation an.

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

Unbegrenzte und begrenzte Ausgabe schreiben

Da Ihre Eingabe entweder begrenzt oder unbegrenzt sein kann, gilt auch dasselbe für unsere Ausgabe-PCollection. Wir müssen eine geeignete Senke auswählen. Einige Ausgabesenken unterstützen nur die begrenzte Ausgabe oder nur die unbegrenzte Ausgabe. Bei einer Textdatei handelt es sich beispielsweise um eine Senke, die nur begrenzte Daten erhalten kann. Die BigQuery-Ausgabequelle unterstützt sowohl die begrenzte als auch die unbegrenzte Eingabe.

In diesem Beispiel streamen wir die Ergebnisse in eine BigQuery-Tabelle. Die Ergebnisse werden dann für eine BigQuery-Tabelle formatiert und anschließend mit BigQueryIO.Write in BigQuery geschrieben.

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));