Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie mit DataflowTemplateOperator
Dataflow-Pipelines aus Cloud Composer starten.
Die Pipeline „Cloud Storage Text für BigQuery“ ist eine Batchpipeline, mit der Sie in Cloud Storage gespeicherte Textdateien hochladen, diese mit einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User Defined Function, UDF) transformieren und das Ergebnis in BigQuery ausgeben können.
Übersicht
Bevor der Workflow gestartet wird, müssen folgende Entitäten erstellt werden:
Eine leere BigQuery-Tabelle aus einem leeren Dataset, die die folgenden Spalten mit Informationen enthält:
location
,average_temperature
,month
und optionalinches_of_rain
,is_current
undlatest_measurement
.Eine JSON-Datei, die die Daten aus der Datei
.txt
in das richtige Format für das Schema der BigQuery-Tabelle normalisiert, wird normalisiert. Das JSON-Objekt hat ein Array vonBigQuery Schema
, in dem jedes Objekt einen Spaltennamen, einen Eingabetyp und ein Pflichtfeld enthält.Eine
.txt
-Eingabedatei, die die Daten enthält, die per Batch-Upload in die BigQuery-Tabelle geladen werden sollen.Eine in JavaScript geschriebene benutzerdefinierte Funktion, die jede Zeile der
.txt
-Datei in die für unsere Tabelle relevanten Variablen umwandelt.Eine Airflow-DAG-Datei, die auf den Speicherort dieser Dateien verweist.
Als Nächstes laden Sie die Datei
.txt
, die UDF-Datei.js
und die Schemadatei.json
in einen Cloud Storage-Bucket hoch. Außerdem laden Sie den DAG in Ihre Cloud Composer-Umgebung hoch.Nachdem der DAG hochgeladen wurde, führt Airflow eine Aufgabe daraus aus. Mit dieser Aufgabe wird eine Dataflow-Pipeline gestartet, die die benutzerdefinierte Funktion auf die Datei
.txt
anwendet und sie gemäß dem JSON-Schema formatiert.Schließlich werden die Daten in die zuvor erstellte BigQuery-Tabelle hochgeladen.
Hinweise
- Für diese Anleitung sind JavaScript-Kenntnisse erforderlich, um die benutzerdefinierte Funktion zu schreiben.
- In diesem Leitfaden wird davon ausgegangen, dass Sie bereits eine Cloud Composer-Umgebung haben. Informationen zum Erstellen einer Umgebung finden Sie unter Umgebung erstellen. Sie können jede Version von Cloud Composer mit dieser Anleitung verwenden.
Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.
Prüfen Sie, ob Sie die folgenden Berechtigungen haben:
- Cloud Composer-Rollen: Sie können eine Umgebung erstellen (falls Sie noch keine haben), Objekte im Bucket der Umgebung verwalten, DAGs ausführen und auf die Airflow-UI zugreifen.
- Cloud Storage-Rollen: Erstellen Sie einen Bucket und verwalten Sie Objekte darin.
- BigQuery-Rollen: Dataset und Tabelle erstellen, Daten in der Tabelle ändern, Tabellenschema und ‑metadaten ändern.
- Dataflow-Rollen: Dataflow-Jobs ansehen.
Achten Sie darauf, dass das Dienstkonto Ihrer Umgebung die Berechtigungen zum Erstellen von Dataflow-Jobs, zum Zugriff auf den Cloud Storage-Bucket sowie zum Lesen und Aktualisieren von Daten für die Tabelle in BigQuery hat.
Leere BigQuery-Tabelle mit einer Schemadefinition erstellen
Erstellen Sie eine BigQuery-Tabelle mit einer Schemadefinition. Sie werden diese Schemadefinition später in dieser Anleitung verwenden. Diese BigQuery-Tabelle enthält die Ergebnisse des Batch-Uploads.
So erstellen Sie eine leere Tabelle mit einer Schemadefinition:
Console
Rufen Sie in der Google Cloud Console die Seite „BigQuery“ auf:
Maximieren Sie im Navigationsbereich im Abschnitt Ressourcen Ihr Projekt.
Klicken Sie im Detailbereich auf Dataset erstellen.
Geben Sie auf der Seite „Dataset erstellen“ im Abschnitt Dataset-ID
average_weather
als Namen für Ihr Dataset ein. Lassen Sie für alle anderen Felder die Standardwerte unverändert.Klicken Sie auf Dataset erstellen.
Kehren Sie zum Navigationsbereich zurück und maximieren Sie im Abschnitt Ressourcen Ihr Projekt. Klicken Sie dann auf das Dataset
average_weather
.Klicken Sie im Detailfeld auf Tabelle erstellen.
Wählen Sie auf der Seite Tabelle erstellen im Abschnitt Quelle die Option Leere Tabelle aus.
Gehen Sie auf der Seite Create table (Tabelle erstellen) im Abschnitt Destination (Ziel) so vor:
Wählen Sie für Dataset-Name das Dataset
average_weather
aus.Geben Sie im Feld Tabellenname den Namen
average_weather
ein.Achten Sie darauf, dass der Tabellentyp auf Native Tabelle eingestellt ist.
Geben Sie im Abschnitt Schema die Schemadefinition ein. Sie haben folgende Möglichkeiten:
Geben Sie Schemainformationen manuell ein, indem Sie Als Text bearbeiten aktivieren und das Tabellenschema als JSON-Array eingeben. Geben Sie Folgendes in die Felder ein:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Geben Sie das Schema mit Feld hinzufügen manuell ein:
Behalten Sie für Partitions- und Clustereinstellungen den Standardwert
No partitioning
bei.Übernehmen Sie im Abschnitt Erweiterte Optionen ebenfalls den Standardwert für Verschlüsselung, und zwar
Google-owned and managed key
.Klicken Sie auf Tabelle erstellen.
bq
Verwenden Sie den Befehl bq mk
, um ein leeres Dataset und eine Tabelle in diesem Dataset zu erstellen.
Führen Sie den folgenden Befehl aus, um ein Dataset des durchschnittlichen globalen Wetters zu erstellen:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Ersetzen Sie Folgendes:
LOCATION
: die Region, in der sich die Umgebung befindet.PROJECT_ID
: die Projekt-ID.
Führen Sie den folgenden Befehl aus, um in diesem Dataset eine leere Tabelle mit der Schemadefinition zu erstellen:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Nachdem die Tabelle erstellt wurde, können Sie die Ablaufzeit, die Beschreibung und die Labels der Tabelle aktualisieren. Ebenso können Sie die Schemadefinition ändern.
Python
Speichern Sie diesen Code als dataflowtemplateoperator_create_dataset_and_table_helper.py
und aktualisieren Sie die darin enthaltenen Variablen, um Ihr Projekt und Ihren Standort widerzuspiegeln, und führen Sie ihn dann mit dem folgenden Befehl aus:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Cloud Storage-Bucket erstellen
Erstellen Sie einen Bucket, der alle für den Workflow erforderlichen Dateien enthält. Der DAG, den Sie später in dieser Anleitung erstellen, verweist auf die Dateien, die Sie in diesen Storage-Bucket hochladen. So erstellen Sie einen neuen Storage-Bucket:
Console
Öffnen Sie Cloud Storage in der Google Cloud Console.
Klicken Sie auf Bucket erstellen, um das Formular zum Erstellen eines Buckets zu öffnen.
Geben Sie die Bucket-Informationen ein und klicken Sie zum Ausführen der einzelnen Schritte jeweils auf Weiter:
Geben Sie einen global eindeutigen Namen für den Bucket an. In diesem Leitfaden wird
bucketName
als Beispiel verwendet.Wählen Sie Region als Standorttyp aus. Wählen Sie als Nächstes einen Standort aus, an dem die Bucket-Daten gespeichert werden sollen.
Wählen Sie Standard als Standard-Speicherklasse für Ihre Daten aus.
Wählen Sie Einheitliche Zugriffssteuerung für den Zugriff auf Ihre Objekte aus.
Klicken Sie auf Fertig.
gcloud
Führen Sie den Befehl gcloud storage buckets create
aus:
gcloud storage buckets create gs://bucketName/
Ersetzen Sie Folgendes:
bucketName
: Der Name des Buckets, den Sie zuvor in diesem Leitfaden erstellt haben.
Codebeispiele
C#
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Go
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Python
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Ruby
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
BigQuery-Schema im JSON-Format für Ihre Ausgabetabelle erstellen
Erstellen Sie eine JSON-formatierte BigQuery-Schemadatei, die der zuvor erstellten Ausgabetabelle entspricht. Die Feldnamen, -typen und -modi müssen mit den zuvor in Ihrem BigQuery-Tabellenschema definierten Feldern übereinstimmen. Diese Datei normalisiert die Daten aus der Datei .txt
in ein Format, das mit Ihrem BigQuery-Schema kompatibel ist. Geben Sie dieser Datei den Namen jsonSchema.json
.
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
JavaScript-Datei erstellen, um Ihre Daten zu formatieren
In dieser Datei definieren Sie Ihre UDF (User Defined Function), die die Logik bereitstellt, um die Textzeilen in Ihrer Eingabedatei zu transformieren. Beachten Sie, dass diese Funktion jede Textzeile in Ihrer Eingabedatei als eigenes Argument verwendet, sodass die Funktion einmal für jede Zeile Ihrer Eingabedatei ausgeführt wird. Geben Sie dieser Datei den Namen transformCSVtoJSON.js
.
Eingabedatei erstellen
Diese Datei enthält die Informationen, die Sie in Ihre BigQuery-Tabelle hochladen möchten. Kopieren Sie diese Datei lokal und nennen Sie sie inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Dateien in Ihren Bucket hochladen
Laden Sie die folgenden Dateien in den Cloud Storage-Bucket hoch, den Sie zuvor erstellt haben:
- JSON-formatiertes BigQuery-Schema (
.json
) - Benutzerdefinierte JavaScript-Funktion (
transformCSVtoJSON.js
) Die Eingabedatei für den zu verarbeitenden Text (
.txt
)
Console
- Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets.
Klicken Sie in der Liste der Buckets auf Ihren Bucket.
Führen Sie im Tab „Objekte“ für den Bucket eine der folgenden Aktionen aus:
Fügen Sie die gewünschten Dateien per Drag-and-drop von Ihrem Desktop oder Dateimanager in den Hauptbereich der Google Cloud -Konsole ein.
Klicken Sie auf die Schaltfläche Dateien hochladen, wählen Sie im angezeigten Dialogfeld die Dateien aus, die Sie hochladen möchten, und klicken Sie auf Öffnen.
gcloud
Führen Sie den Befehl gcloud storage cp
aus:
gcloud storage cp OBJECT_LOCATION gs://bucketName
Ersetzen Sie Folgendes:
bucketName
: Der Name des Buckets, den Sie zuvor in diesem Leitfaden erstellt haben.OBJECT_LOCATION
: der lokale Pfad zu Ihrem Objekt. Beispiel:Desktop/transformCSVtoJSON.js
Codebeispiele
Python
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Ruby
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
DataflowTemplateOperator konfigurieren
Bevor Sie den DAG ausführen, legen Sie die folgenden Airflow-Variablen fest.
Airflow-Variable | Wert |
---|---|
project_id
|
Die Projekt-ID. Beispiel: example-project . |
gce_zone
|
Compute Engine-Zone, in der der Dataflow-Cluster erstellt werden muss. Beispiel: us-central1-a . Weitere Informationen zu gültigen Zonen finden Sie unter Regionen und Zonen. |
bucket_path
|
Der Speicherort des zuvor erstellten Cloud Storage-Bucket. Beispiel: gs://example-bucket |
Nun verweisen Sie auf die Dateien, die Sie zuvor erstellt haben, um einen DAG zu erstellen, mit dem der Dataflow-Workflow gestartet wird. Kopieren Sie diesen DAG und speichern Sie ihn lokal als composer-dataflow-dag.py
.
DAG in Cloud Storage hochladen
Laden Sie den DAG in den Ordner /dags
im Bucket Ihrer Umgebung hoch. Sobald der Upload erfolgreich abgeschlossen wurde, können Sie ihn in der Cloud Composer-Umgebung auf den Link DAGs-Ordner sehen.
Status der Aufgabe ansehen
- Rufen Sie die Airflow-Weboberfläche auf.
- Klicken Sie auf der Seite „DAGs“ auf den DAG-Namen, z. B.
composerDataflowDAG
. - Klicken Sie auf der DAGs-Detailseite auf Grafikansicht.
Prüfen Sie den Status:
Failed
: Die Aufgabe ist rot umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Failed suchen.Success
: Die Aufgabe ist grün umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Success suchen.
Nach einigen Minuten können Sie die Ergebnisse in Dataflow und BigQuery prüfen.
Job in Dataflow ansehen
Rufen Sie in der Google Cloud Console die Seite Dataflow auf.
Der Name Ihres Jobs lautet
dataflow_operator_transform_csv_to_bq
. Eine eindeutige ID wird am Ende des Namens mit einem Bindestrich angehängt. Beispiel:Klicken Sie auf den Namen, um die Jobdetails aufzurufen.
Ergebnisse in BigQuery anzeigen
Rufen Sie in der Google Cloud Console die Seite BigQuery auf.
Sie können Abfragen mit Standard-SQL senden. Verwenden Sie die folgende Abfrage, um die Zeilen anzuzeigen, die Ihrer Tabelle hinzugefügt wurden:
SELECT * FROM projectId.average_weather.average_weather