Ihr Dataflow-Programm stellt eine Datenverarbeitungspipeline von Anfang bis Ende dar. Dieser Abschnitt erklärt die Funktionsweise, wie die Klassen in den Dataflow-SDKs verwendet werden, um eine Pipeline zu erstellen. Um eine Pipeline zu konstruieren, die die Klassen in den Dataflow-SDKs verwendet, muss Ihr Programm folgende allgemeine Schritte ausführen:
- Erstellen Sie ein
Pipeline
-Objekt. - Verwenden Sie die Transformation Read oder Create, um eine oder mehrere
PCollection
s für Ihre Pipelinedaten zu erstellen. - Wenden Sie Transformationen auf jede
PCollection
an. Transformationen können die Elemente in einerPCollection
ändern, filtern, gruppieren, analysieren oder auch ausführen. Jede Transformation erstellt eine neue Ausgabe-PCollection
, auf die Sie weitere Transformationen anwenden können, bis die Verarbeitung abgeschlossen ist. - Schreiben Sie die abschließenden, transformierten
PCollection
s oder geben Sie sie auf andere Weise aus. - Ausführen der Pipeline.
In der unten stehenden einfachen Beispielpipeline sehen Sie ein komplettes Beispiel, das jeden einzelnen allgemeinen Schritt beschreibt.
Pipelineobjekt erstellen
Ein Dataflow-Programm beginnt häufig durch die Erstellung eines Pipeline
-Objekts.
In den Dataflow-SDKs wird jede Pipeline durch einen expliziten Pipeline
-Objekttyp dargestellt. Jedes Pipeline
-Objekt ist eine unabhängige Einheit, die sowohl die Daten, mit denen die Pipeline arbeitet, als auch die Transformationen, die auf diesen Daten angewendet werden, zusammenfasst.
Java
Deklarieren Sie zum Erstellen einer Pipeline ein Pipeline
-Objekt und fügen Sie diesem einige Konfigurationsoptionen hinzu. Sie übergeben die Konfigurationsoptionen, indem Sie ein Objekt vom Typ PipelineOptions
erstellen, das Sie mithilfe der statischen Methode PipelineOptionsFactory.create()
erstellen können.
// Start by defining the options for the pipeline. PipelineOptions options = PipelineOptionsFactory.create(); // Then create the pipeline. Pipeline p = Pipeline.create(options);
Pipelineoptionen konfigurieren
Mithilfe der Pipeline-Optionen können Sie verschiedene Aspekte Ihrer Pipeline konfigurieren. Diese können Folgendes beinhalten:
- Wo Ihre Pipeline läuft
- Wo Ihr Pipelinejob Dateien präsentiert
- Mit welchem Cloud Platform-Projekt Ihre Pipeline verknüpft ist
- Wie viele Compute Engine-Instanzen Ihre Pipeline als Worker verwendet
Die Properties der Pipeline-Optionen umfassen Informationen über Ihr Cloud Platform-Projekt, die für den Cloud Dataflow-Dienst erforderlich sind, wie Ihre Projekt-ID und Cloud Storage-Staging-Standorte. Über die Pipeline-Optionen können Sie auch steuern, wie viele Worker der Dataflow-Dienst Ihren Pipelinejobs zuweisen soll, und wo die Statusmeldungen Ihrer Pipelineobs hingeleitet werden sollen.
Eine Haupt-Property in den Pipeline-Optionen, die bestimmt, wo Ihre Pipeline ausführt wird (entweder auf dem Cloud Dataflow-Dienst oder lokal), ist der Pipeline-Runner. Die Pipeline-Runner-Property gibt auch an, ob die Ausführung Ihrer Pipeline asynchron oder blockiert sein soll.
Java
Sie können zwar die Attribute des PipelineOptions
-Objekts mithilfe von Setter-Methoden direkt in Ihrem Pipelineprogramm festlegen (PipelineOptions.set[OptionName]
), es empfiehlt sich jedoch, die Werte mithilfe von Befehlszeilenoptionen zu übergeben.
Das Dataflow-SDK für Java bietet eine PipelineOptionsFactory
-Klasse, die die Befehlszeilenoptionen, die Ihrer Pipeline hinzugefügt wurden, parst und überprüft. Mithilfe von Befehlszeilenoptionen zur Bestimmung des PipelineRunner
und anderer Felder in PipelineOptions
zur Laufzeit können Sie denselben Code verwenden, um Ihre Pipeline sowohl lokal als auch in der Cloud zu erstellen und auszuführen.
Unter Ausführungsparameter festlegen finden Sie weitere Informationen, wie Pipelineoptionen programmatisch für die Ausführung in der Cloud oder im lokalen Modus festgelegt werden. Die WordCount-Beispielpipeline zeigt auch, wie Pipelineoptionen während der Laufzeit unter Verwendung der Befehlszeilenoptionen festgelegt werden.
Daten in meine Pipeline einlesen
Um eine erste PCollection
für Ihre Pipeline zu erstellen, fügen Sie Ihrem Pipelineobjekt eine Root-Transformation hinzu. Eine Root-Transformation erstellt eine PCollection
entweder aus einer externen Datenquelle oder aus einigen lokalen Daten, die Sie angeben.
Java
Im Dataflow-Java-SDK gibt es zwei Arten von Stammtransformationen: Read
und Create
. Read
wandelt gelesene Daten aus einer externen Quelle wie BigQuery oder einer Textdatei in Google Cloud Storage um. Create
-Transformationen erstellen eine PCollection
aus einer speicherinternen java.util.Collection
.
Der folgende Beispielcode zeigt, wie Sie einen apply
für TextIO.Read
-Root-Transformationen zum Einlesen von Daten aus einer Textdatei in Google Cloud Storage anwenden. Die Transformation wird auf ein Pipeline
-Objekt p
angewendet und gibt ein Pipeline-Dataset im Format PCollection<String>
zurück:
PCollection<String> lines = p.apply( TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));
Transformationen zur Verarbeitung von Pipelinedaten anwenden
Zur Verwendung von Transformationen in einer Pipeline nutzen Sie apply, um die betreffenden Transformationen auf die PCollection
anzuwenden, die Sie transformieren möchten.
Java
Zum Anwenden einer Transformation rufen Sie die Methode apply
auf jeder PCollection
auf, die Sie verarbeiten möchten, und übergeben Sie das gewünschte Transformationsobjekt als Argument.
Die Dataflow-SDKs enthalten eine Reihe verschiedener Transformationen, die Sie auf die PCollection
s Ihrer Pipeline anwenden können. Dazu gehören auch allgemeine Kerntransformationen wie ParDo oder Combine. Im SDK sind auch vorgefertigte zusammengesetzte Transformationen enthalten, die eine oder mehrere Kerntransformationen in einem nützlichen Verarbeitungsmuster kombinieren, wie beispielsweise das Zählen oder Kombinieren von Elementen in einer Sammlung. Sie können ebenfalls Ihre eigenen, komplexeren zusammengesetzten Transformationen definieren, um sie den Anforderungen Ihrer Pipeline genau anzupassen.
Java
Im Dataflow-Java-SDK ist jede Transformation eine abgeleitete Klasse der PTransform
-Basisklasse.
Wenn Sie apply
auf einer PCollection
aufrufen, übergeben Sie die PTransform
, die Sie als Argument verwenden möchten.
Der folgende Code zeigt, wie apply
in PCollection
-Strings umgewandelt werden kann. Die Transformation ist eine benutzerdefinierte Transformation, die den Inhalt jedes Strings umkehrt und eine neue PCollection
ausgibt, die die umgekehrten Strings enthält.
Die Eingabe ist ein PCollection<String>
namens words
; der Code übergibt eine Instanz eines PTransform
-Objekts mit dem Namen ReverseWords
an apply
und speichert den Rückgabewert als PCollection<String>
namens reversedWords
.
PCollection<String> words = ...; PCollection<String> reversedWords = words.apply(new ReverseWords());
Endgültige Pipelinedaten schreiben oder ausgeben
Sobald Ihre Pipeline alle ihre Transformationen angewendet hat, müssen Sie normalerweise die Ergebnisse ausgeben.
Zur Ausgabe der endgültigen PCollection
s Ihrer Pipeline wenden Sie eine Write
-Transformation in PCollection
an. Write
-Transformationen können die Elemente einer PCollection
in eine externe Datensenke ausgeben, z. B. eine Datei in Google Cloud Storage oder eine BigQuery-Tabelle. Sie können jederzeit Write
für die Ausgabe von PCollection
in Ihrer Pipeline anwenden, obwohl Sie normalerweise Daten am Ende Ihrer Pipeline schreiben.
Java
Der folgende Beispielcode zeigt, wie apply
eine TextIO.Write
-Transformation nutzt, um eine PCollection
eines String
als Textdatei in Google Cloud Storage auszugeben:
PCollection<String> filteredWords = ...; filteredWords.apply(TextIO.Write.named("WriteMyFile").to("gs://some/outputData.txt"));
Pipeline ausführen
Nachdem Sie Ihre Pipeline erstellt haben, verwenden Sie die Methode run
zum Ausführen der Pipeline. Pipelines werden asynchron ausgeführt: Das von Ihnen erstellte Programm sendet eine Spezifikation für Ihre Pipeline zu einem Pipeline-Runner, der dann die eigentliche Serie von Pipelinevorgängen erstellt und ausführt. Sie können angeben, wo Ihre Pipeline ausgeführt wird: entweder lokal zu Test- und Fehlerbehebungszwecken oder auf dem von Cloud Dataflow verwalteten Dienst. Unter Ausführungsparameter festlegen finden Sie weitere Informationen zu Pipeline-Runnern, zur Konfiguration von Pipelineoptionen sowie zur lokalen und cloudbasierten Ausführung.
In den Dataflow-SDKs geben Sie einen PipelineRunner
in Ihren Pipelineoptionen an, wenn Sie Ihr Pipeline
-Objekt erstellen. Wenn Sie mit der Konstruktion Ihrer Pipeline fertig sind, rufen Sie run
für Ihr Pipelineobjekt so auf:
Java
p.run();