Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Diese Anleitung ist eine Änderung des Artikels Datenanalyse-DAG in Google Cloud ausführen. Darin wird gezeigt, wie Sie Ihre Cloud Composer-Umgebung mit Microsoft Azure verbinden, um dort gespeicherte Daten zu verwenden. Es wird gezeigt, wie Sie mit Cloud Composer einen Apache Airflow-DAG erstellen. Der DAG verknüpft Daten aus einem öffentlichen BigQuery-Dataset mit einer in einem Azure Blob Storage gespeicherten CSV-Datei und führt dann einen Dataproc Serverless-Batchjob aus, um die zusammengeführten Daten zu verarbeiten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit globalen Klimazusammenfassungen. Die CSV-Datei enthält Informationen zu den Datumsangaben und Namen von Feiertagen in den USA von 1997 bis 2021.
Die Frage, die wir mithilfe des DAG beantworten möchten, lautet: „Wie warm war es in Chicago in den letzten 25 Jahren an Thanksgiving?“
Lernziele
- Cloud Composer-Umgebung in der Standardkonfiguration erstellen
- Blob in Azure erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- Erstellen Sie einen DAG und führen Sie ihn aus, der die folgenden Aufgaben enthält:
- Externes Dataset aus Azure Blob Storage in Cloud Storage laden
- Externes Dataset aus Cloud Storage in BigQuery laden
- Zwei Datasets in BigQuery verknüpfen
- PySpark-Job zur Datenanalyse ausführen
Hinweise
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs aktivieren.
gcloud
Aktivieren Sie die Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Gewähren Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen:
Gewähren Sie Rollen zum Verwalten von Cloud Composer-Umgebungen und Umgebungs-Buckets.
Gewähren Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner
), um ein BigQuery-Dataset zu erstellen.Gewähren Sie die Rolle Storage-Administrator (
roles/storage.admin
), um einen Cloud Storage-Bucket zu erstellen.
Cloud Composer-Umgebung erstellen und vorbereiten
Erstellen Sie eine Cloud Composer-Umgebung mit Standardparametern:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die neueste Cloud Composer-Version aus.
Gewähren Sie dem Dienstkonto, das in Ihrer Cloud Composer-Umgebung verwendet wird, die folgenden Rollen, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner
) - Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Dataproc-Bearbeiter (
roles/dataproc.editor
) - Dataproc-Worker (
roles/dataproc.worker
)
- BigQuery-Nutzer (
Zugehörige Ressourcen in Google Cloud erstellen und ändern
Installieren Sie das PyPI-Paket
apache-airflow-providers-microsoft-azure
in der Cloud Composer-Umgebung.Erstellen Sie ein leeres BigQuery-Dataset mit den folgenden Parametern:
- Name:
holiday_weather
- Region:
US
- Name:
Erstellen Sie einen neuen Cloud Storage-Bucket am multiregionalen Standort
US
.Führen Sie den folgenden Befehl aus, um den privaten Google-Zugriff im Standardsubnetz in der Region zu aktivieren, in der Sie Dataproc Serverless ausführen möchten, um die Netzwerkanforderungen zu erfüllen. Wir empfehlen, dieselbe Region wie in Ihrer Cloud Composer-Umgebung zu verwenden.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Zugehörige Ressourcen in Azure erstellen
Erstellen Sie ein Speicherkonto mit den Standardeinstellungen.
Rufen Sie den Zugriffsschlüssel und den Verbindungsstring für Ihr Speicherkonto ab.
Erstellen Sie einen Container mit Standardoptionen im neu erstellten Speicherkonto.
Weisen Sie dem im vorherigen Schritt erstellten Container die Rolle Storage Blob Delegator zu.
Laden Sie holidays.csv hoch, um ein Block-Blob mit Standardoptionen im Azure-Portal zu erstellen.
Erstellen Sie ein SAS-Token für das Block-Blob, das Sie im vorherigen Schritt im Azure-Portal erstellt haben.
- Signierungsmethode: Nutzerdelegierungsschlüssel
- Berechtigungen: Lesen
- Zulässige IP-Adresse: Keine
- Zulässige Protokolle: nur HTTPS
Von Cloud Composer mit Azure verbinden
Fügen Sie die Microsoft Azure-Verbindung über die Airflow-UI hinzu:
Gehen Sie zu Verwaltung > Verbindungen.
Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:
- Verbindungs-ID:
azure_blob_connection
- Verbindungstyp:
Azure Blob Storage
- Blob Storage Login (Blob-Speicheranmeldung): Name Ihres Speicherkontos
- Blob Storage Key (Blob-Speicherschlüssel): der Zugriffsschlüssel für Ihr Speicherkonto
- Blob Storage Account Connection String:Ihr Verbindungsstring für das Speicherkonto.
- SAS-Token:das von Ihrem Blob generierte SAS-Token
- Verbindungs-ID:
Datenverarbeitung mit Dataproc Serverless
PySpark-Beispieljob ansehen
Der folgende Code ist ein PySpark-Beispieljob, der die Temperatur von Zehntelgrad in Celsius in Grad Celsius umwandelt. Dieser Job wandelt Temperaturdaten aus dem Dataset in ein anderes Format um.
PySpark-Datei in Cloud Storage hochladen
So laden Sie die PySpark-Datei in Cloud Storage hoch:
Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:
Klicken Sie auf den Namen des Buckets, den Sie zuvor erstellt haben.
Klicken Sie auf dem Tab Objekte des Buckets auf die Schaltfläche Dateien hochladen, wählen Sie im angezeigten Dialogfeld
data_analytics_process.py
aus und klicken Sie auf Öffnen.
Datenanalyse-DAG
Beispiel-DAG ansehen
Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:
Der
AzureBlobStorageToGCSOperator
überträgt die Datei holidays.csv aus dem Azure-Block-Blob in den Cloud Storage-Bucket.Der
GCSToBigQueryOperator
nimmt die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Datasetholidays_weather
auf, das Sie zuvor erstellt haben.Der
DataprocCreateBatchOperator
erstellt einen PySpark-Batchjob mit Dataproc Serverless und führt ihn aus.Über
BigQueryInsertJobOperator
werden die Daten aus holidays.csv in der Spalte „Date“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d zusammengeführt. DieBigQueryInsertJobOperator
-Aufgaben werden dynamisch mit einer For-Schleife generiert und befinden sich zur besseren Lesbarkeit in einerTaskGroup
in der Grafikansicht der Airflow-UI.
Variablen über die Airflow-UI hinzufügen
In Airflow sind Variablen eine universelle Möglichkeit, beliebige Einstellungen oder Konfigurationen als einfachen Schlüssel/Wert-Speicher zu speichern und abzurufen. Dieser DAG verwendet Airflow-Variablen zum Speichern gängiger Werte. So fügen Sie sie Ihrer Umgebung hinzu:
Greifen Sie über die Cloud Composer-Konsole auf die Airflow-UI zu.
Klicken Sie auf Admin > Variablen.
Fügen Sie die folgenden Variablen hinzu:
gcp_project
: Ihre Projekt-ID.gcs_bucket
: der Name des Buckets, den Sie zuvor erstellt haben (ohne das Präfixgs://
).gce_region
: Die Region, in der der Dataproc-Job gespeichert werden soll und die die Anforderungen an serverloses Dataproc-Netzwerk erfüllt. Das ist die Region, in der Sie den privaten Google-Zugriff zuvor aktiviert haben.dataproc_service_account
: das Dienstkonto für Ihre Cloud Composer-Umgebung. Sie finden dieses Dienstkonto auf dem Tab „Umgebungskonfiguration“ für Ihre Cloud Composer-Umgebung.azure_blob_name
: der Name des Blobs, das Sie zuvor erstellt haben.azure_container_name
: der Name des Containers, den Sie zuvor erstellt haben.
DAG in den Bucket Ihrer Umgebung hochladen
Cloud Composer plant DAGs, die sich im Ordner /dags
des Buckets Ihrer Umgebung befinden. So laden Sie den DAG über die Google Cloud Console hoch:
Speichern Sie azureblobstoretogcsoperator_tutorial.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie
azureblobstoretogcsoperator_tutorial.py
auf Ihrem lokalen Computer aus und klicken Sie auf Open (Öffnen).
DAG auslösen
Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
azure_blob_to_gcs_dag
.Klicken Sie auf DAG auslösen.
Warten Sie etwa fünf bis zehn Minuten, bis ein grünes Häkchen angezeigt wird, das anzeigt, dass die Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG validieren
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle zu sehen. Beachten Sie, dass die Zahlen in der Wertspalte in Zehntelgrad Celsius angegeben sind.
Klicken Sie auf
holidays_weather_normalized
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle zu sehen. Die Zahlen in der Wertspalte sind in Grad Celsius angegeben.
Bereinigen
Löschen Sie einzelne Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie den Cloud Storage-Bucket, den Sie für diese Anleitung erstellt haben.
Löschen Sie die Cloud Composer-Umgebung, einschließlich des manuellen Löschens des Buckets der Umgebung.