Pipeline aufbauen

Ihr Dataflow-Programm stellt eine Datenverarbeitungspipeline von Anfang bis Ende dar. Dieser Abschnitt erklärt die Funktionsweise, wie die Klassen in den Dataflow-SDKs verwendet werden, um eine Pipeline zu erstellen. Um eine Pipeline zu konstruieren, die die Klassen in den Dataflow-SDKs verwendet, muss Ihr Programm folgende allgemeine Schritte ausführen:

  • Erstellen Sie ein Pipeline-Objekt.
  • Verwenden Sie die Transformation Read oder Create, um eine oder mehrere PCollections für Ihre Pipelinedaten zu erstellen.
  • Wenden Sie Transformationen auf jede PCollection an. Transformationen können die Elemente in einer PCollection ändern, filtern, gruppieren, analysieren oder auch ausführen. Jede Transformation erstellt eine neue Ausgabe-PCollection, auf die Sie weitere Transformationen anwenden können, bis die Verarbeitung abgeschlossen ist.
  • Schreiben Sie die abschließenden, transformierten PCollections oder geben Sie sie auf andere Weise aus.
  • Ausführen der Pipeline.

In der unten stehenden einfachen Beispielpipeline sehen Sie ein komplettes Beispiel, das jeden einzelnen allgemeinen Schritt beschreibt.

Pipelineobjekt erstellen

Ein Dataflow-Programm beginnt häufig durch die Erstellung eines Pipeline-Objekts.

In den Dataflow-SDKs wird jede Pipeline durch einen expliziten Pipeline-Objekttyp dargestellt. Jedes Pipeline-Objekt ist eine unabhängige Einheit, die sowohl die Daten, mit denen die Pipeline arbeitet, als auch die Transformationen, die auf diesen Daten angewendet werden, zusammenfasst.

Java

Deklarieren Sie zum Erstellen einer Pipeline ein Pipeline-Objekt und fügen Sie diesem einige Konfigurationsoptionen hinzu. Sie übergeben die Konfigurationsoptionen, indem Sie ein Objekt vom Typ PipelineOptions erstellen, das Sie mithilfe der statischen Methode PipelineOptionsFactory.create() erstellen können.

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

Pipelineoptionen konfigurieren

Mithilfe der Pipeline-Optionen können Sie verschiedene Aspekte Ihrer Pipeline konfigurieren. Diese können Folgendes beinhalten:

  • Wo Ihre Pipeline läuft
  • Wo Ihr Pipelinejob Dateien präsentiert
  • Mit welchem Cloud Platform-Projekt Ihre Pipeline verknüpft ist
  • Wie viele Compute Engine-Instanzen Ihre Pipeline als Worker verwendet

Die Properties der Pipeline-Optionen umfassen Informationen über Ihr Cloud Platform-Projekt, die für den Cloud Dataflow-Dienst erforderlich sind, wie Ihre Projekt-ID und Cloud Storage-Staging-Standorte. Über die Pipeline-Optionen können Sie auch steuern, wie viele Worker der Dataflow-Dienst Ihren Pipelinejobs zuweisen soll, und wo die Statusmeldungen Ihrer Pipelineobs hingeleitet werden sollen.

Eine Haupt-Property in den Pipeline-Optionen, die bestimmt, wo Ihre Pipeline ausführt wird (entweder auf dem Cloud Dataflow-Dienst oder lokal), ist der Pipeline-Runner. Die Pipeline-Runner-Property gibt auch an, ob die Ausführung Ihrer Pipeline asynchron oder blockiert sein soll.

Java

Sie können zwar die Attribute des PipelineOptions-Objekts mithilfe von Setter-Methoden direkt in Ihrem Pipelineprogramm festlegen (PipelineOptions.set[OptionName]), es empfiehlt sich jedoch, die Werte mithilfe von Befehlszeilenoptionen zu übergeben. Das Dataflow-SDK für Java bietet eine PipelineOptionsFactory-Klasse, die die Befehlszeilenoptionen, die Ihrer Pipeline hinzugefügt wurden, parst und überprüft. Mithilfe von Befehlszeilenoptionen zur Bestimmung des PipelineRunner und anderer Felder in PipelineOptions zur Laufzeit können Sie denselben Code verwenden, um Ihre Pipeline sowohl lokal als auch in der Cloud zu erstellen und auszuführen.

Unter Ausführungsparameter festlegen finden Sie weitere Informationen, wie Pipelineoptionen programmatisch für die Ausführung in der Cloud oder im lokalen Modus festgelegt werden. Die WordCount-Beispielpipeline zeigt auch, wie Pipelineoptionen während der Laufzeit unter Verwendung der Befehlszeilenoptionen festgelegt werden.

Daten in meine Pipeline einlesen

Um eine erste PCollection für Ihre Pipeline zu erstellen, fügen Sie Ihrem Pipelineobjekt eine Root-Transformation hinzu. Eine Root-Transformation erstellt eine PCollection entweder aus einer externen Datenquelle oder aus einigen lokalen Daten, die Sie angeben.

Java

Im Dataflow-Java-SDK gibt es zwei Arten von Stammtransformationen: Read und Create. Read wandelt gelesene Daten aus einer externen Quelle wie BigQuery oder einer Textdatei in Google Cloud Storage um. Create-Transformationen erstellen eine PCollection aus einer speicherinternen java.util.Collection.

Der folgende Beispielcode zeigt, wie Sie einen apply für TextIO.Read-Root-Transformationen zum Einlesen von Daten aus einer Textdatei in Google Cloud Storage anwenden. Die Transformation wird auf ein Pipeline-Objekt p angewendet und gibt ein Pipeline-Dataset im Format PCollection<String> zurück:

PCollection<String> lines = p.apply(
  TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));

Transformationen zur Verarbeitung von Pipelinedaten anwenden

Zur Verwendung von Transformationen in einer Pipeline nutzen Sie apply, um die betreffenden Transformationen auf die PCollection anzuwenden, die Sie transformieren möchten.

Java

Zum Anwenden einer Transformation rufen Sie die Methode apply auf jeder PCollection auf, die Sie verarbeiten möchten, und übergeben Sie das gewünschte Transformationsobjekt als Argument.

Die Dataflow-SDKs enthalten eine Reihe verschiedener Transformationen, die Sie auf die PCollections Ihrer Pipeline anwenden können. Dazu gehören auch allgemeine Kerntransformationen wie ParDo oder Combine. Im SDK sind auch vorgefertigte zusammengesetzte Transformationen enthalten, die eine oder mehrere Kerntransformationen in einem nützlichen Verarbeitungsmuster kombinieren, wie beispielsweise das Zählen oder Kombinieren von Elementen in einer Sammlung. Sie können ebenfalls Ihre eigenen, komplexeren zusammengesetzten Transformationen definieren, um sie den Anforderungen Ihrer Pipeline genau anzupassen.

Java

Im Dataflow-Java-SDK ist jede Transformation eine abgeleitete Klasse der PTransform-Basisklasse. Wenn Sie apply auf einer PCollection aufrufen, übergeben Sie die PTransform, die Sie als Argument verwenden möchten.

Der folgende Code zeigt, wie apply in PCollection-Strings umgewandelt werden kann. Die Transformation ist eine benutzerdefinierte Transformation, die den Inhalt jedes Strings umkehrt und eine neue PCollection ausgibt, die die umgekehrten Strings enthält.

Die Eingabe ist ein PCollection<String> namens words; der Code übergibt eine Instanz eines PTransform-Objekts mit dem Namen ReverseWords an apply und speichert den Rückgabewert als PCollection<String> namens reversedWords.

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

Endgültige Pipelinedaten schreiben oder ausgeben

Sobald Ihre Pipeline alle ihre Transformationen angewendet hat, müssen Sie normalerweise die Ergebnisse ausgeben. Zur Ausgabe der endgültigen PCollections Ihrer Pipeline wenden Sie eine Write-Transformation in PCollection an. Write-Transformationen können die Elemente einer PCollection in eine externe Datensenke ausgeben, z. B. eine Datei in Google Cloud Storage oder eine BigQuery-Tabelle. Sie können jederzeit Write für die Ausgabe von PCollection in Ihrer Pipeline anwenden, obwohl Sie normalerweise Daten am Ende Ihrer Pipeline schreiben.

Java

Der folgende Beispielcode zeigt, wie apply eine TextIO.Write-Transformation nutzt, um eine PCollection eines String als Textdatei in Google Cloud Storage auszugeben:

PCollection<String> filteredWords = ...;
filteredWords.apply(TextIO.Write.named("WriteMyFile").to("gs://some/outputData.txt"));

Pipeline ausführen

Nachdem Sie Ihre Pipeline erstellt haben, verwenden Sie die Methode run zum Ausführen der Pipeline. Pipelines werden asynchron ausgeführt: Das von Ihnen erstellte Programm sendet eine Spezifikation für Ihre Pipeline zu einem Pipeline-Runner, der dann die eigentliche Serie von Pipelinevorgängen erstellt und ausführt. Sie können angeben, wo Ihre Pipeline ausgeführt wird: entweder lokal zu Test- und Fehlerbehebungszwecken oder auf dem von Cloud Dataflow verwalteten Dienst. Unter Ausführungsparameter festlegen finden Sie weitere Informationen zu Pipeline-Runnern, zur Konfiguration von Pipelineoptionen sowie zur lokalen und cloudbasierten Ausführung.

In den Dataflow-SDKs geben Sie einen PipelineRunner in Ihren Pipelineoptionen an, wenn Sie Ihr Pipeline-Objekt erstellen. Wenn Sie mit der Konstruktion Ihrer Pipeline fertig sind, rufen Sie run für Ihr Pipelineobjekt so auf:

Java

p.run();