Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In dieser Anleitung wird gezeigt, wie Sie mit Cloud Composer ein Apache Airflow-DAG Die DAG verknüpft Daten aus einem öffentlichen BigQuery-Dataset mit einer gespeicherten CSV-Datei in einem Cloud Storage-Bucket Serverloser Dataproc-Batchjob zum Verarbeiten der zusammengeführten Daten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit Klimazusammenfassungen auf der ganzen Welt. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.
Mit dem DAG möchten wir die Frage beantworten: „Wie warm war es in Chicago an Thanksgiving in den letzten 25 Jahren?“
Lernziele
- Cloud Composer-Umgebung in der Standardkonfiguration erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- Erstellen und führen Sie einen DAG mit den folgenden Aufgaben aus:
- Externes Dataset aus Cloud Storage in BigQuery laden
- Zwei Datasets in BigQuery zusammenführen
- PySpark-Job zur Datenanalyse ausführen
Hinweise
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Weisen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen zu:
Gewähren Sie Rollen zum Verwalten von Cloud Composer-Umgebungen und Umgebungs-Buckets.
Gewähren Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner
), um ein BigQuery-Dataset zu erstellen.Weisen Sie die Rolle Storage-Administrator (
roles/storage.admin
) zu, um einen Cloud Storage-Bucket zu erstellen.
Cloud Composer-Umgebung erstellen und vorbereiten
Erstellen Sie eine Cloud Composer-Umgebung mit Standardeinstellung. Parameter:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die neueste Cloud Composer-Version aus.
Weisen Sie dem Dienstkonto, das in Ihrer Cloud Composer-Umgebung verwendet wird, die folgenden Rollen zu, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner
) - Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Dataproc-Bearbeiter (
roles/dataproc.editor
) - Dataproc-Worker (
roles/dataproc.worker
)
- BigQuery-Nutzer (
Zugehörige Ressourcen erstellen
Leeres BigQuery-Dataset erstellen mit den folgenden Parametern:
- Name:
holiday_weather
- Region:
US
- Name:
Erstellen Sie einen neuen Cloud Storage-Bucket in der Multi-Region
US
.Führen Sie den folgenden Befehl aus, Privaten Google-Zugriff aktivieren im Standardsubnetz in der Region, in der Sie ausführen möchten Serverlose Dataproc-Funktionen für die Ausführung Netzwerkanforderungen. Wir empfehlen, dieselbe Region wie für Ihre Cloud Composer-Umgebung zu verwenden.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Datenverarbeitung mit Dataproc Serverless
Beispiel für einen PySpark-Job
Der folgende Code ist ein PySpark-Beispieljob, der die Temperatur von Zehntelgrad in Celsius in Grad Celsius. Mit diesem Job werden Temperaturdaten aus dem Datensatz in ein anderes Format konvertiert.
Nachweisdateien in Cloud Storage hochladen
So laden Sie die PySpark-Datei und das in holidays.csv
gespeicherte Dataset hoch:
Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.
Speichern Sie die Datei holidays.csv auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:
Klicken Sie auf den Namen des Buckets, den Sie zuvor erstellt haben.
Klicken Sie auf dem Tab Objekte des Buckets auf die Schaltfläche Dateien hochladen. wählen Sie
data_analytics_process.py
undholidays.csv
aus dem Dialogfeld aus, angezeigt wird, und klicken Sie auf Öffnen.
Datenanalyse-DAG
Beispiel-DAG ansehen
Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:
Mit
GCSToBigQueryOperator
wird die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Datasetholidays_weather
aufgenommen, das Sie zuvor erstellt haben.Die
DataprocCreateBatchOperator
einen PySpark-Batch-Job mithilfe von Dataproc ServerlessMit
BigQueryInsertJobOperator
werden die Daten aus holidays.csv in der Spalte „Datum“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d zusammengeführt. DieBigQueryInsertJobOperator
-Aufgaben sind dynamisch mit einer For-Schleife generiert, und diese Aufgaben befinden sichTaskGroup
für eine bessere Lesbarkeit in der Grafikansicht der Airflow-UI.
Variablen über die Airflow-Benutzeroberfläche hinzufügen
In Airflow variables sind eine universelle Möglichkeit, beliebige Einstellungen als einfachen Schlüssel/Wert-Speicher. In diesem DAG werden Airflow-Variablen verwendet, um gängige Werte zu speichern. So fügen Sie sie Ihrer Umgebung hinzu:
Greifen Sie über die Cloud Composer-Konsole auf die Airflow-UI zu.
Klicken Sie auf Admin > Variablen.
Fügen Sie die folgenden Variablen hinzu:
gcp_project
: Ihre Projekt-ID.gcs_bucket
: der Name des Buckets, den Sie zuvor erstellt haben (ohne das Präfixgs://
).gce_region
: die Region, in der Sie Ihren Dataproc-Job ausführen möchten, der die Anforderungen an die Dataproc Serverless-Netzwerkkonfiguration erfüllt. Das ist die Region, in der Sie den privaten Google-Zugriff zuvor aktiviert haben.dataproc_service_account
: das Dienstkonto für Ihre Cloud Composer-Umgebung. Sie finden diesen Dienst auf dem Tab für die Umgebungskonfiguration Cloud Composer-Umgebung.
DAG in den Bucket Ihrer Umgebung hochladen
Cloud Composer plant DAGs, die sich im Ordner /dags
des Buckets Ihrer Umgebung befinden. So laden Sie den DAG mit der Google Cloud Console hoch:
Speichern Sie data_analytics_dag.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie
data_analytics_dag.py
auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.
DAG auslösen
Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
data_analytics_dag
.Klicken Sie auf DAG auslösen.
Warten Sie etwa fünf bis zehn Minuten, bis ein grünes Häkchen angezeigt wird, das signalisiert, dass die Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG prüfen
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle aufzurufen. Beachten Sie, dass die Zahlen in den Wertspalte in Zehntelgrad Celsius angegeben.
Klicken Sie auf
holidays_weather_normalized
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle aufzurufen. Die Zahlen in der Spalte „Wert“ sind in Grad Celsius angegeben.
Detaillierte Informationen zu Dataproc Serverless (optional)
Sie können eine erweiterte Version dieses DAG mit einem komplexeren PySpark-Datenverarbeitungsablauf ausprobieren. Dataproc-Erweiterung für das Beispiel für die Datenanalyse auf GitHub
Bereinigen
Löschen Sie die einzelnen Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie den Cloud Storage-Bucket, den Sie die für diese Anleitung erstellt wurden.
Löschen Sie die Cloud Composer-Umgebung, einschließlich des manuellen Löschens des Buckets der Umgebung.