Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie du den DataflowTemplateOperator
zum Starten verwendest.
Dataflow-Pipelines von
Cloud Composer
Pipeline von Cloud Storage Text für BigQuery
ist eine Batch-Pipeline, mit der Sie Textdateien hochladen können,
Cloud Storage, transformieren Sie sie mithilfe einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User Defined Function, UDF) und geben die Ergebnisse in
BigQuery
Übersicht
Bevor Sie mit dem Workflow beginnen, erstellen Sie die folgenden Entitäten:
Eine leere BigQuery-Tabelle aus einem leeren Dataset, die die folgenden Spalten mit Informationen enthält:
location
,average_temperature
,month
und optionalinches_of_rain
,is_current
undlatest_measurement
.Eine JSON-Datei, die die Daten aus dem
.txt
normalisiert in das richtige Format für die Schema. Das JSON-Objekt hat ein Array vonBigQuery Schema
, wobei enthält jedes Objekt einen Spaltennamen, den Eingabetyp und die Angabe, kein Pflichtfeld.Eine
.txt
-Eingabedatei mit den Daten, die im Batch hochgeladen werden in die BigQuery-Tabelle.Eine in JavaScript geschriebene benutzerdefinierte Funktion, die jede Zeile der
.txt
-Datei in die für unsere Tabelle relevanten Variablen umwandelt.Eine Airflow-DAG-Datei, die auf den Speicherort dieser Dateien.
Als Nächstes laden Sie die Datei
.txt
, die UDF-Datei.js
und das Schema.json
hoch in einen Cloud Storage-Bucket speichern. Außerdem laden Sie den DAG in Ihre Cloud Composer-Umgebung.Nachdem der DAG hochgeladen wurde, führt Airflow eine Aufgabe davon aus. Bei dieser Aufgabe eine Dataflow-Pipeline starten, Benutzerdefinierte Funktion zur Datei
.txt
hinzu und formatieren Sie sie entsprechend dem JSON-Schema.Schließlich werden die Daten in die BigQuery-Tabelle die Sie zuvor erstellt haben.
Hinweise
- Für dieses Handbuch müssen Sie mit JavaScript vertraut sein, um schreiben zu können. die benutzerdefinierte Funktion.
- In diesem Leitfaden wird davon ausgegangen, dass Sie bereits über zu verbessern. Informationen zum Erstellen einer Umgebung finden Sie unter Umgebung erstellen. Sie können Verwenden Sie in dieser Anleitung eine beliebige Version von Cloud Composer.
-
Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs aktivieren.
Leere BigQuery-Tabelle mit einer Schemadefinition erstellen
Erstellen Sie eine BigQuery-Tabelle mit einer Schemadefinition. Ich wird diese Schemadefinition später in diesem Leitfaden verwenden. Dieses Die BigQuery-Tabelle enthält die Ergebnisse des Batch-Uploads.
So erstellen Sie eine leere Tabelle mit einer Schemadefinition:
Console
Rufen Sie in der Google Cloud Console die Seite „BigQuery“ auf. Seite:
Maximieren Sie im Navigationsbereich im Abschnitt Ressourcen Ihr Projekt.
Klicken Sie im Detailbereich auf Dataset erstellen.
Geben Sie auf der Seite „Dataset erstellen“ im Abschnitt Dataset-ID einen Namen für Ihren Dataset
average_weather
. Übernehmen Sie für alle anderen Felder die Standardwerte. Bundesstaat.Klicken Sie auf Dataset erstellen.
Kehren Sie zum Navigationsbereich zurück. Maximieren Sie im Abschnitt Ressourcen für Ihr Projekt. Klicken Sie dann auf das Dataset
average_weather
.Klicken Sie im Detailfeld auf Tabelle erstellen.
Wählen Sie auf der Seite Tabelle erstellen im Abschnitt Quelle die Option Leere Tabelle.
Gehen Sie auf der Seite Create table (Tabelle erstellen) im Abschnitt Destination (Ziel) so vor:
Wählen Sie für Dataset-Name das Dataset
average_weather
aus.Geben Sie im Feld Tabellenname den Namen
average_weather
ein.Achten Sie darauf, dass der Tabellentyp auf Native Tabelle eingestellt ist.
Geben Sie im Abschnitt Schema die Schemadefinition ein. Sie können einen der folgenden Ansätze verwenden:
Geben Sie Schemainformationen manuell ein, indem Sie Als Text bearbeiten aktivieren und geben Sie das Tabellenschema als JSON-Array ein. Geben Sie Folgendes ein: Felder:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Geben Sie das Schema mit Feld hinzufügen manuell ein:
Behalten Sie für Partitions- und Clustereinstellungen die Standardeinstellung bei Wert
No partitioning
.Lassen Sie im Abschnitt Erweiterte Optionen für Verschlüsselung das Feld Standardwert
Google-managed key
.Klicken Sie auf Tabelle erstellen.
bq
Verwenden Sie den Befehl bq mk
, um ein leeres Dataset und eine Tabelle in diesem
Dataset.
Führen Sie den folgenden Befehl aus, um ein Dataset mit dem durchschnittlichen globalen Wetter zu erstellen:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Ersetzen Sie Folgendes:
LOCATION
: Region, in der sich die Umgebung befindet.PROJECT_ID
: die Projekt-ID.
Führen Sie den folgenden Befehl aus, um eine leere Tabelle in diesem Dataset mit Schemadefinition:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Nachdem die Tabelle erstellt wurde, können Sie Update die Ablaufzeit, die Beschreibung und die Labels der Tabelle. Ebenso können Sie die Schemadefinition ändern.
Python
Diesen Code speichern unter
dataflowtemplateoperator_create_dataset_and_table_helper.py
und aktualisieren Sie die darin enthaltenen Variablen, um Ihr Projekt und Ihren Standort widerzuspiegeln.
führen Sie ihn mit dem folgenden Befehl aus:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Cloud Storage-Bucket erstellen
Erstellen Sie einen Bucket für alle Dateien, die für den Workflow benötigt werden. Der DAG, den Sie erstellt, wird auf die Dateien verwiesen, die Sie in dieses Storage-Bucket. So erstellen Sie einen neuen Storage-Bucket:
Console
Öffnen Sie Cloud Storage in der Google Cloud Console.
Klicken Sie auf Bucket erstellen, um das Formular zum Erstellen eines Buckets zu öffnen.
Geben Sie die Bucket-Informationen ein und klicken Sie zum Ausführen der einzelnen Schritte jeweils auf Weiter:
Geben Sie einen global eindeutigen Namen für den Bucket an. In diesem Leitfaden werden
bucketName
als Beispiel.Wählen Sie Region als Standorttyp aus. Wählen Sie als Nächstes Standort, an dem die Bucket-Daten gespeichert werden.
Wählen Sie Standard als Standard-Speicherklasse für Ihre Daten aus.
Wählen Sie Einheitliche Zugriffssteuerung für den Zugriff auf Ihre Objekte aus.
Klicken Sie auf Fertig.
gsutil
Führen Sie den Befehl gsutil mb
aus:
gsutil mb gs://bucketName/
Ersetzen Sie Folgendes:
bucketName
: der Name des Buckets, den Sie zuvor in diesem Abschnitt erstellt haben. .
Codebeispiele
C#
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Go
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Java
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Python
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Ruby
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
BigQuery-Schema im JSON-Format für Ihre Ausgabetabelle erstellen
Erstellen Sie eine JSON-formatierte BigQuery-Schemadatei, die der zuvor erstellten Ausgabetabelle entspricht. Die Feldnamen, -typen und -modi
müssen mit denen übereinstimmen, die zuvor in der BigQuery-Tabelle definiert wurden
Schema. Durch diese Datei werden die Daten aus der Datei .txt
in ein Format umgewandelt.
sind mit Ihrem BigQuery-Schema kompatibel. Datei benennen
jsonSchema.json
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
JavaScript-Datei zur Formatierung Ihrer Daten erstellen
In dieser Datei definieren Sie Ihre benutzerdefinierte Funktion, die
die Logik zur Transformation der Textzeilen in Ihrer Eingabedatei. Beachten Sie, dass dies
verwendet jede Textzeile in Ihrer Eingabedatei als eigenes Argument,
wird die Funktion für jede Zeile Ihrer Eingabedatei einmal ausgeführt. Datei benennen
transformCSVtoJSON.js
Eingabedatei erstellen
Diese Datei enthält die Informationen, die Sie in Ihren
BigQuery-Tabelle. Datei lokal kopieren und benennen
inputFile.txt
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Dateien in den Bucket hochladen
Laden Sie die folgenden Dateien in den von Ihnen erstellten Cloud Storage-Bucket hoch früher:
- JSON-formatiertes BigQuery-Schema (
.json
) - Benutzerdefinierte JavaScript-Funktion (
transformCSVtoJSON.js
) Die Eingabedatei für den zu verarbeitenden Text (
.txt
)
Console
- Wechseln Sie in der Cloud Console zur Seite Cloud Storage-Buckets.
Klicken Sie in der Liste der Buckets auf Ihren Bucket.
Führen Sie auf dem Tab „Objekte“ für den Bucket einen der folgenden Schritte aus:
Fügen Sie die gewünschten Dateien per Drag-and-drop von Ihrem Desktop oder Dateimanager in den Hauptbereich der Google Cloud Console.
Klicken Sie auf die Schaltfläche Dateien hochladen und wählen Sie die Dateien aus, die Sie hochladen möchten. und klicken Sie auf Öffnen.
gsutil
Führen Sie den Befehl gsutil cp
aus:
gsutil cp OBJECT_LOCATION gs://bucketName
Ersetzen Sie Folgendes:
bucketName
: der Name des Buckets, den Sie zuvor in dieses Leitfadens.OBJECT_LOCATION
: der lokale Pfad zu Ihrem Objekt. Beispiel:Desktop/transformCSVtoJSON.js
Codebeispiele
Python
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Ruby
Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
DataflowTemplateOperator konfigurieren
Bevor Sie den DAG ausführen, legen Sie die folgenden Airflow-Variablen fest.
Airflow-Variable | Wert |
---|---|
project_id
|
Projekt-ID |
gce_zone
|
Compute Engine-Zone, in der sich der Dataflow-Cluster befindet müssen erstellt werden |
bucket_path
|
Der Speicherort des von Ihnen erstellten Cloud Storage-Bucket früher |
Nun verweisen Sie auf die Dateien, die Sie zuvor erstellt haben, um einen DAG zu erstellen, mit dem der Dataflow-Workflow gestartet wird. Diesen DAG kopieren und lokal speichern
als composer-dataflow-dag.py
.
Airflow 2
Airflow 1
DAG in Cloud Storage hochladen
Laden Sie den DAG in den Ordner /dags
der Umgebung hoch.
Bucket. Sobald der Upload erfolgreich abgeschlossen wurde,
Klicken Sie in Cloud Composer auf den Link DAGs-Ordner.
Seite „Umgebungen“
Aufgabenstatus ansehen
- Rufen Sie die Airflow-Weboberfläche auf.
- Klicken Sie auf der Seite „DAGs“ auf den DAG-Namen, z. B.
composerDataflowDAG
. - Klicken Sie auf der DAGs-Detailseite auf Grafikansicht.
Prüfen Sie den Status:
Failed
: Die Aufgabe wird in einem roten Kästchen angezeigt. Sie können auch den Mauszeiger auf eine Aufgabe bewegen und nach Status: Fehlgeschlagen suchen.Success
: Die Aufgabe wird von einem grünen Feld umgeben. Sie können auch den Mauszeiger über die Aufgabe halten und Status: Erfolgreich.
Nach wenigen Minuten können Sie die Ergebnisse in Dataflow BigQuery
Job in Dataflow ansehen
Rufen Sie in der Google Cloud Console die Seite Dataflow auf.
Der Job hat den Namen „
dataflow_operator_transform_csv_to_bq
“ mit einer eindeutigen ID am Ende des Namens durch einen Bindestrich hinzugefügt. Beispiel:Klicken Sie auf den Namen, um die Informationen zum Stellenangebot.
Ergebnisse in BigQuery anzeigen
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Sie können Abfragen mit Standard-SQL senden. Verwenden Sie die folgende Abfrage, um die Zeilen anzuzeigen, die Ihrer Tabelle hinzugefügt wurden:
SELECT * FROM projectId.average_weather.average_weather