Cloud Composer 1 Cloud Composer 2
Diese Anleitung ist eine Änderung von Datenanalyse-DAG in Google Cloud ausführen. Sie zeigt, wie Sie Ihre Cloud Composer-Umgebung mit Amazon Web Services verbinden, um die dort gespeicherten Daten zu verwenden. Es wird gezeigt, wie Sie mit Cloud Composer einen Apache Airflow-DAG erstellen. Der DAG führt Daten aus einem öffentlichen BigQuery-Dataset und einer CSV-Datei zusammen, die in einem AWS-S3-Bucket (Amazon Web Services) gespeichert sind. Anschließend führt er einen Dataproc Serverless-Batchjob aus, um die verknüpften Daten zu verarbeiten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit globalen Klimazusammenfassungen. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.
Die Frage, die wir mit dem DAG beantworten möchten, lautet: „Wie warm war es in Chicago am Thanksgiving in den letzten 25 Jahren?“
Lernziele
- Cloud Composer-Umgebung in der Standardkonfiguration erstellen
- Bucket in AWS S3 erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- Erstellen Sie einen DAG, der die folgenden Aufgaben enthält, und führen Sie ihn aus:
- Externes Dataset aus S3 in Cloud Storage laden
- Externes Dataset aus Cloud Storage in BigQuery laden
- Zwei Datasets in BigQuery verknüpfen
- PySpark-Data-Analytics-Job ausführen
Hinweise
Berechtigungen in AWS verwalten
Folgen Sie dem Abschnitt „Richtlinien mit dem visuellen Editor erstellen“ der AWS-Anleitung zum Erstellen von IAM-Richtlinien, um eine benutzerdefinierte IAM-Richtlinie für AWS S3 mit der folgenden Konfiguration zu erstellen:
- Dienst:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
) zum Aufrufen des S3-Buckets - CreateBucket (
s3:CreateBucket
) zum Erstellen eines Buckets - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
) zum Erstellen eines Buckets - ListBucket (
s3:ListBucket
), um die Berechtigung zum Auflisten von Objekten in einem S3-Bucket zu gewähren - PutObject (
s3:PutObject
) zum Hochladen von Dateien in einen Bucket - GetBucketVersioning (
s3:GetBucketVersioning
) zum Löschen eines Objekts in einem Bucket - DeleteObject (
s3:DeleteObject
) zum Löschen eines Objekts in einem Bucket - ListBucketVersions (
s3:ListBucketVersions
) zum Löschen eines Buckets - DeleteBucket (
s3:DeleteBucket
) zum Löschen eines Buckets - Ressourcen: Wählen Sie neben „Bucket“ und „Objekt“ die Option „Beliebig“ aus, um allen Ressourcen dieses Typs Berechtigungen zu gewähren.
- Tag:Keines
- Name:TutorialPolicy
Weitere Informationen zu den einzelnen Konfigurationen finden Sie in der Liste der in Amazon S3 unterstützten Aktionen.
Ihrer Identität die IAM-Richtlinie TutorialPolicy hinzufügen
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Weisen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen zu:
Gewähren Sie Rollen zum Verwalten von Cloud Composer-Umgebungen und Umgebungs-Buckets.
Weisen Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner
) zu, um ein BigQuery-Dataset zu erstellen.Gewähren Sie die Rolle Storage Admin (
roles/storage.admin
) zum Erstellen eines Cloud Storage-Buckets.
Cloud Composer-Umgebung erstellen und vorbereiten
Erstellen Sie eine Cloud Composer-Umgebung mit Standardparametern:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die neueste Cloud Composer-Version aus.
Weisen Sie dem Dienstkonto, das in Ihrer Cloud Composer-Umgebung verwendet wird, die folgenden Rollen zu, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner
) - Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Dataproc-Editor (
roles/dataproc.editor
) - Dataproc-Worker (
roles/dataproc.worker
)
- BigQuery-Nutzer (
Zugehörige Ressourcen in Google Cloud erstellen und ändern
Installieren Sie das PyPI-Paket
apache-airflow-providers-amazon
in Ihrer Cloud Composer-Umgebung.Erstellen Sie ein leeres BigQuery-Dataset mit den folgenden Parametern:
- Name:
holiday_weather
- Region:
US
- Name:
Erstellen Sie einen neuen Cloud Storage-Bucket am multiregionalen Standort
US
.Führen Sie den folgenden Befehl aus, um den privaten Google-Zugriff im Standardsubnetz der Region zu aktivieren, in der Sie Dataproc Serverless ausführen möchten, um die Netzwerkanforderungen zu erfüllen. Wir empfehlen, dieselbe Region wie in Ihrer Cloud Composer-Umgebung zu verwenden.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Zugehörige Ressourcen in AWS erstellen
Erstellen Sie einen S3-Bucket mit Standardeinstellungen in Ihrer bevorzugten Region.
Von Cloud Composer mit AWS verbinden
- ID und geheimen Zugriffsschlüssel für AWS abrufen
Fügen Sie die AWS S3-Verbindung über die Airflow-UI hinzu:
- Klicken Sie auf Admin > Verbindungen.
Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:
- Verbindungs-ID:
aws_s3_connection
- Verbindungstyp:
Amazon S3
- Extras:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- Verbindungs-ID:
Datenverarbeitung mit Dataproc Serverless
PySpark-Beispiel-Job ansehen
Der folgende Code ist ein PySpark-Beispieljob, der die Temperatur von Zehntelgrad Celsius in Grad Celsius umwandelt. Dieser Job wandelt Temperaturdaten aus dem Dataset in ein anderes Format um.
PySpark-Datei in Cloud Storage hochladen
So laden Sie die PySpark-Datei in Cloud Storage hoch:
Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:
Klicken Sie auf den Namen des Buckets, den Sie zuvor erstellt haben.
Klicken Sie im Tab Objekte für den Bucket auf die Schaltfläche Dateien hochladen, wählen Sie im angezeigten Dialogfeld
data_analytics_process.py
aus und klicken Sie auf Öffnen.
CSV-Datei in AWS S3 hochladen
So laden Sie die Datei holidays.csv
hoch:
- Speichern Sie
holidays.csv
auf Ihrem lokalen Computer. - Folgen Sie dem AWS-Leitfaden, um die Datei in Ihren Bucket hochzuladen.
Datenanalyse-DAG
Beispiel-DAG ansehen
Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:
S3ToGCSOperator
überträgt die Datei holidays.csv aus Ihrem AWS S3-Bucket in Ihren Cloud Storage-Bucket.Mit
GCSToBigQueryOperator
wird die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Datasetholidays_weather
aufgenommen, das Sie zuvor erstellt haben.Der
DataprocCreateBatchOperator
erstellt und führt einen PySpark-Batchjob mit Dataproc Serverless aus.Mit
BigQueryInsertJobOperator
werden die Daten aus holidays.csv in der Spalte „Date“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d zusammengeführt. DieBigQueryInsertJobOperator
-Aufgaben werden mit einer For-Schleife dynamisch generiert. Zur besseren Lesbarkeit in der Grafikansicht der Airflow-UI befinden sich diese Aufgaben in einerTaskGroup
.
Variablen über die Airflow-UI hinzufügen
In Airflow sind Variablen eine universelle Möglichkeit, beliebige Einstellungen oder Konfigurationen als einfachen Speicher für Schlüssel/Wert-Paare zu speichern und abzurufen. Dieser DAG verwendet Airflow-Variablen zum Speichern gängiger Werte. So fügen Sie sie zu Ihrer Umgebung hinzu:
Rufen Sie über die Cloud Composer-Konsole auf die Airflow-UI auf.
Klicken Sie auf Verwaltung > Variablen.
Fügen Sie die folgenden Variablen hinzu:
s3_bucket
: der Name des S3-Buckets, den Sie zuvor erstellt haben.gcp_project
: Ihre Projekt-ID.gcs_bucket
: der Name des zuvor erstellten Buckets (ohne das Präfixgs://
).gce_region
: Die Region, in der Ihr Dataproc-Job ausgeführt werden soll, der die Anforderungen an das serverlose Dataproc-Netzwerk erfüllt. Das ist die Region, in der Sie zuvor den privaten Google-Zugriff aktiviert haben.dataproc_service_account
: das Dienstkonto für Ihre Cloud Composer-Umgebung. Sie finden dieses Dienstkonto auf dem Tab für die Umgebungskonfiguration für Ihre Cloud Composer-Umgebung.
DAG in den Bucket Ihrer Umgebung hochladen
Cloud Composer plant DAGs, die sich im Ordner /dags
im Bucket Ihrer Umgebung befinden. So laden Sie den DAG über die Google Cloud Console hoch:
Speichern Sie s3togcsoperator_tutorial.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie
s3togcsoperator_tutorial.py
auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.
DAG auslösen
Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
s3_to_gcs_dag
.Klicken Sie auf Trigger DAG (DAG auslösen).
Warten Sie etwa fünf bis zehn Minuten, bis Sie ein grünes Häkchen sehen, das anzeigt, dass die Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG validieren
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Beachten Sie, dass die Zahlen in der Wertspalte in Zehntelgrad Celsius angegeben sind.
Klicken Sie auf
holidays_weather_normalized
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Beachten Sie, dass die Zahlen in der Spalte Werte in Grad Celsius angegeben sind.
Bereinigen
Löschen Sie einzelne Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie die Datei
holidays.csv
in Ihrem AWS S3-Bucket.Löschen Sie den AWS S3-Bucket, den Sie erstellt haben.
Löschen Sie den Cloud Storage-Bucket, den Sie für diese Anleitung erstellt haben.
Löschen Sie die Cloud Composer-Umgebung, einschließlich des Buckets der Umgebung.