Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Dieses Tutorial ist eine Modifikation von Führen Sie einen Datenanalyse-DAG in Google Cloud aus, der zeigt, wie Sie Ihre Cloud Composer-Umgebung mit Microsoft Azure verbinden, um dort gespeicherte Daten zu nutzen. Darin wird gezeigt, wie Sie mit Cloud Composer einen Apache Airflow-DAG erstellen. Der DAG führt Daten aus einem öffentlichen BigQuery-Dataset und einer CSV-Datei zusammen, die in einem Azure Blob Storage gespeichert ist. Anschließend wird ein Dataproc Serverless-Batchjob ausgeführt, um die zusammengeführten Daten zu verarbeiten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, einer integrierten Datenbank mit Klimazusammenfassungen auf der ganzen Welt. Die CSV-Datei enthält Informationen zu den Daten und Namen von US-Feiertagen von 1997 bis 2021.
Die Frage, die wir mithilfe des DAG beantworten möchten, lautet: „Wie warm war es in Chicago? in den letzten 25 Jahren an Thanksgiving?“
Lernziele
- Cloud Composer-Umgebung in der Standardkonfiguration erstellen
- Blob in Azure erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- Erstellen Sie einen DAG und führen Sie ihn aus, der die folgenden Aufgaben enthält:
- Externen Datensatz aus Azure Blob Storage in Cloud Storage laden
- Externes Dataset aus Cloud Storage in BigQuery laden
- Zwei Datasets in BigQuery verknüpfen
- PySpark-Job für Datenanalyse ausführen
Hinweise
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Gewähren Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen:
Weisen Sie Rollen zum Verwalten von Cloud Composer-Umgebungen und Umgebungs-Buckets zu.
Gewähren Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner
), um ein BigQuery-Dataset zu erstellen.Weisen Sie die Rolle Storage-Administrator (
roles/storage.admin
) zu, um einen Cloud Storage-Bucket zu erstellen.
Cloud Composer-Umgebung erstellen und vorbereiten
Erstellen Sie eine Cloud Composer-Umgebung mit den Standardparametern:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die neueste Cloud Composer-Version aus.
Weisen Sie dem Dienstkonto, das in Ihrem Cloud Composer-Umgebung, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausgeführt:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner
) - Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Dataproc-Bearbeiter (
roles/dataproc.editor
) - Dataproc-Worker (
roles/dataproc.worker
)
- BigQuery-Nutzer (
Zugehörige Ressourcen in Google Cloud erstellen und ändern
Installieren Sie das
apache-airflow-providers-microsoft-azure
PyPI-Paket in Ihrer Cloud Composer-Umgebung.Erstellen Sie ein leeres BigQuery-Dataset mit den folgenden Parametern:
- Name:
holiday_weather
- Region:
US
- Name:
Erstellen Sie einen neuen Cloud Storage-Bucket in der Multi-Region
US
.Führen Sie den folgenden Befehl aus, um privaten Google-Zugriff im Standardsubnetz in der Region zu aktivieren, in der Sie Dataproc Serverless ausführen möchten, um die Netzwerkanforderungen zu erfüllen. Mi. empfehlen, dieselbe Region wie in Cloud Composer zu verwenden zu verbessern.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Zugehörige Ressourcen in Azure erstellen
Speicherplatzkonto erstellen mit den Standardeinstellungen.
Zugriffsschlüssel und Verbindungsstring abrufen für Ihr Speicherkonto.
Container erstellen mit den Standardoptionen in Ihrem neu erstellten Speicherkonto.
Weisen Sie dem Container, den Sie im vorherigen Schritt erstellt haben, die Rolle Storage Blob Delegator zu.
Laden Sie holidays.csv hoch, um mit den Standardoptionen im Azure-Portal einen Block-Blob zu erstellen.
Erstellen Sie ein SAS-Token für den Blockblob, den Sie im vorherigen Schritt im Azure-Portal erstellt haben.
- Signierungsmethode: Nutzerdelegierungsschlüssel
- Berechtigungen: Lesen
- Zulässige IP-Adresse: Keine
- Zulässige Protokolle: Nur HTTPS
Von Cloud Composer mit Azure verbinden
So fügen Sie über die Airflow-Benutzeroberfläche eine Microsoft Azure-Verbindung hinzu:
Gehen Sie zu Verwaltung > Verbindungen.
Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:
- Verbindungs-ID:
azure_blob_connection
- Verbindungstyp:
Azure Blob Storage
- Blob Storage-Anmeldung: der Name Ihres Speicherkontos
- Blob Storage Key (Blob-Speicherschlüssel): der Zugriffsschlüssel für Ihr Speicherkonto
- Blob Storage Account Connection String:Ihr Speicherkonto Verbindungsstring
- SAS-Token:das von Ihrem Blob generierte SAS-Token
- Verbindungs-ID:
Datenverarbeitung mit Dataproc Serverless
PySpark-Beispieljob ansehen
Der folgende Code ist ein Beispiel für einen PySpark-Job, mit dem Temperaturen von Zehntelgraden Celsius in Grad Celsius umgewandelt werden. Dieser Job erzielt eine Conversion Temperaturdaten aus dem Dataset in ein anderes Format konvertieren.
PySpark-Datei in Cloud Storage hochladen
So laden Sie die PySpark-Datei in Cloud Storage hoch:
Speichern data_analytics_process.py auf Ihren lokalen Computer übertragen.
Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:
Klicken Sie auf den Namen des zuvor erstellten Buckets.
Klicken Sie auf dem Tab Objekte des Buckets auf die Schaltfläche Dateien hochladen. wählen Sie
data_analytics_process.py
im angezeigten Dialogfeld aus und klicken Sie auf Öffnen.
Datenanalyse-DAG
Beispiel-DAG ansehen
Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:
Die
AzureBlobStorageToGCSOperator
überträgt die Datei holidays.csv aus Ihrem Azure-Block-Blob in Ihren Cloud Storage-Bucket.Mit
GCSToBigQueryOperator
wird die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Datasetholidays_weather
aufgenommen, das Sie zuvor erstellt haben.Mit
DataprocCreateBatchOperator
wird ein PySpark-Batchjob mit Dataproc Serverless erstellt und ausgeführt.Die
BigQueryInsertJobOperator
werden die Daten aus holidays.csv „Datum“ Spalte mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d verwendet. DieBigQueryInsertJobOperator
-Aufgaben werden dynamisch mit einer For-Schleife generiert und befinden sich in einemTaskGroup
, um die Lesbarkeit in der Grafikansicht der Airflow-Benutzeroberfläche zu verbessern.
Variablen über die Airflow-Benutzeroberfläche hinzufügen
In Airflow variables sind eine universelle Möglichkeit, beliebige Einstellungen als einfachen Schlüssel/Wert-Speicher. In diesem DAG werden Airflow-Variablen verwendet, um gängige Werte zu speichern. So fügen Sie sie Ihrer Umgebung hinzu:
Greifen Sie über die Cloud Composer-Konsole auf die Airflow-UI zu.
Klicken Sie auf Admin > Variablen.
Fügen Sie die folgenden Variablen hinzu:
gcp_project
: Ihre Projekt-ID.gcs_bucket
: der Name des Buckets, den Sie zuvor erstellt haben (ohne das Präfixgs://
).gce_region
ist die Region, in der die Dataproc-Job, der die Dataproc Serverless-Netzwerkanforderungen. Das ist die Region, in der Sie den privaten Google-Zugriff zuvor aktiviert haben.dataproc_service_account
: das Dienstkonto für Ihr Cloud Composer-Umgebung. Sie finden dieses Dienstkonto auf dem Tab „Umgebungskonfiguration“ Ihrer Cloud Composer-Umgebung.azure_blob_name
: der Name des Blobs, das Sie zuvor erstellt haben.azure_container_name
: Der Name des zuvor erstellten Containers.
DAG in den Bucket Ihrer Umgebung hochladen
Cloud Composer plant DAGs, die sich in der
/dags
-Ordner im Bucket Ihrer Umgebung. So laden Sie den DAG mit der Google Cloud Console hoch:
Speichern Sie azureblobstoretogcsoperator_tutorial.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie
azureblobstoretogcsoperator_tutorial.py
auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.
DAG auslösen
Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
azure_blob_to_gcs_dag
.Klicken Sie auf DAG auslösen.
Warten Sie etwa fünf bis zehn Minuten, bis Sie ein grünes Häkchen sehen. Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG validieren
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle aufzurufen. Beachten Sie, dass die Zahlen in den Wertspalte in Zehntelgrad Celsius angegeben.
Klicken Sie auf
holidays_weather_normalized
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle zu sehen. Die Zahlen in der Spalte „Wert“ sind in Grad Celsius angegeben.
Bereinigen
Löschen Sie einzelne Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie den Cloud Storage-Bucket, den Sie die für diese Anleitung erstellt wurden.
Löschen Sie die Cloud Composer-Umgebung, einschließlich des manuellen Löschens des Buckets der Umgebung.