Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Dieses Tutorial ist eine Modifikation von Datenanalyse-DAG in Google Cloud ausführen wie Sie Ihre Cloud Composer-Umgebung mit Amazon verbinden können. Webdienste zur Nutzung der dort gespeicherten Daten. Es zeigt, wie Sie zum Erstellen eines Apache Airflow-DAG Der DAG führt Daten aus einem öffentlichen BigQuery-Dataset und einer CSV-Datei zusammen, die in einem Amazon Web Services (AWS) S3-Bucket gespeichert ist. Anschließend wird ein Dataproc Serverless-Batchjob ausgeführt, um die zusammengeführten Daten zu verarbeiten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, einer integrierten Datenbank mit Klimazusammenfassungen auf der ganzen Welt. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.
Mit dem DAG möchten wir die Frage beantworten: „Wie warm war es in Chicago an Thanksgiving in den letzten 25 Jahren?“
Lernziele
- Cloud Composer-Umgebung in der Standardkonfiguration erstellen
- Bucket in AWS S3 erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- Erstellen Sie einen DAG und führen Sie ihn aus, der die folgenden Aufgaben enthält:
- Externes Dataset aus S3 in Cloud Storage laden
- Externes Dataset aus Cloud Storage laden in BigQuery
- Zwei Datasets in BigQuery verknüpfen
- PySpark-Job für Datenanalyse ausführen
Hinweise
Berechtigungen in AWS verwalten
Folgen Sie dem Abschnitt „Richtlinien mit dem visuellen Editor erstellen“ der Anleitung zum Erstellen von IAM-Richtlinien in AWS, um eine benutzerdefinierte IAM-Richtlinie für AWS S3 mit der folgenden Konfiguration zu erstellen:
- Dienst: S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
): Zeigt Ihren S3-Bucket an. - CreateBucket (
s3:CreateBucket
) zum Erstellen eines Buckets - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
) zum Erstellen eines Buckets - ListBucket (
s3:ListBucket
): Gewährt die Berechtigung zum Auflisten von Objekten in einem S3-Bucket. - PutObject (
s3:PutObject
) zum Hochladen von Dateien in einen Bucket - GetBucketVersioning (
s3:GetBucketVersioning
), zum Löschen eines Objekts in einem Bucket - DeleteObject (
s3:DeleteObject
) zum Löschen eines Objekts in einem Bucket - ListBucketVersions (
s3:ListBucketVersions
) zum Löschen eines Buckets - DeleteBucket (
s3:DeleteBucket
): zum Löschen eines Buckets - Ressourcen:Wählen Sie „Beliebig“ aus. neben „Bucket“ und „object“ zu gewähren Berechtigungen für Ressourcen dieses Typs.
- Tag: Keines
- Name: TutorialPolicy
Weitere Informationen finden Sie im Liste der in Amazon S3 unterstützten Aktionen finden Sie weitere Informationen zu den oben aufgeführten Konfigurationen.
Fügen Sie Ihrer Identität die IAM-Richtlinie TutorialPolicy hinzu.
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Weisen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen zu:
Weisen Sie Rollen zum Verwalten von Cloud Composer-Umgebungen und Umgebungs-Buckets zu.
Weisen Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner
) zu, um ein BigQuery-Dataset zu erstellen.Weisen Sie die Rolle Storage-Administrator (
roles/storage.admin
) zu, um einen Cloud Storage-Bucket zu erstellen.
Cloud Composer-Umgebung erstellen und vorbereiten
Erstellen Sie eine Cloud Composer-Umgebung mit den Standardparametern:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die neueste Cloud Composer-Version aus.
Weisen Sie dem Dienstkonto, das in Ihrer Cloud Composer-Umgebung verwendet wird, die folgenden Rollen zu, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner
) - Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Dataproc-Bearbeiter (
roles/dataproc.editor
) - Dataproc-Worker (
roles/dataproc.worker
)
- BigQuery-Nutzer (
Zugehörige Ressourcen in Google Cloud erstellen und ändern
apache-airflow-providers-amazon
installieren PyPI-Paket in Ihrem Cloud Composer-Umgebung.Leeres BigQuery-Dataset erstellen mit den folgenden Parametern:
- Name:
holiday_weather
- Region:
US
- Name:
Neuen Cloud Storage-Bucket erstellen in der Multiregion
US
.Führen Sie den folgenden Befehl aus, um privaten Google-Zugriff im Standardsubnetz in der Region zu aktivieren, in der Sie Dataproc Serverless ausführen möchten, um die Netzwerkanforderungen zu erfüllen. Mi. empfehlen, dieselbe Region wie in Cloud Composer zu verwenden zu verbessern.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Zugehörige Ressourcen in AWS erstellen
Erstellen Sie einen S3-Bucket mit den Standardeinstellungen in Ihrer bevorzugten Region.
Verbindung von Cloud Composer zu AWS herstellen
- AWS-Zugriffsschlüssel-ID und geheimen Zugriffsschlüssel abrufen
Fügen Sie Ihre AWS S3-Verbindung über die Airflow-Benutzeroberfläche hinzu:
- Klicken Sie auf Verwaltung > Verbindungen.
Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:
- Verbindungs-ID:
aws_s3_connection
- Verbindungstyp:
Amazon S3
- Extras:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- Verbindungs-ID:
Datenverarbeitung mit Dataproc Serverless
Beispiel für einen PySpark-Job
Der folgende Code ist ein Beispiel für einen PySpark-Job, mit dem Temperaturen von Zehntelgraden Celsius in Grad Celsius umgewandelt werden. Dieser Job erzielt eine Conversion Temperaturdaten aus dem Dataset in ein anderes Format konvertieren.
PySpark-Datei in Cloud Storage hochladen
So laden Sie die PySpark-Datei in Cloud Storage hoch:
Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:
Klicken Sie auf den Namen des zuvor erstellten Buckets.
Klicken Sie auf dem Tab Objekte des Buckets auf die Schaltfläche Dateien hochladen. wählen Sie
data_analytics_process.py
im angezeigten Dialogfeld aus und klicken Sie auf Öffnen.
CSV-Datei in AWS S3 hochladen
So laden Sie die Datei holidays.csv
hoch:
- Speichern Sie
holidays.csv
auf Ihrem lokalen Computer. - Folgen Sie AWS-Leitfaden um die Datei in den Bucket hochzuladen.
DAG für Datenanalyse
Beispiel-DAG ansehen
Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:
Die
S3ToGCSOperator
überträgt die Datei holidays.csv aus Ihrem AWS S3-Bucket zu Ihrem Cloud Storage-Bucket hinzufügen.Die
GCSToBigQueryOperator
die Datei holidays.csv aus Cloud Storage in eine neue Tabelle in BigQueryholidays_weather
-Dataset, das Sie zuvor erstellt haben.Mit
DataprocCreateBatchOperator
wird ein PySpark-Batchjob mit Dataproc Serverless erstellt und ausgeführt.Die
BigQueryInsertJobOperator
werden die Daten aus holidays.csv „Datum“ Spalte mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d verwendet. DieBigQueryInsertJobOperator
-Aufgaben sind dynamisch mit einer For-Schleife generiert, und diese Aufgaben befinden sichTaskGroup
für eine bessere Lesbarkeit in der Grafikansicht der Airflow-UI.
Variablen über die Airflow-Benutzeroberfläche hinzufügen
In Airflow variables sind eine universelle Möglichkeit, beliebige Einstellungen als einfachen Schlüssel/Wert-Speicher. In diesem DAG werden Airflow-Variablen verwendet, um gängige Werte zu speichern. So fügen Sie sie Ihrer Umgebung hinzu:
Greifen Sie über die Cloud Composer-Konsole auf die Airflow-UI zu.
Klicken Sie auf Verwaltung > Variablen.
Fügen Sie die folgenden Variablen hinzu:
s3_bucket
: der Name des zuvor erstellten S3-Buckets.gcp_project
: Ihre Projekt-ID.gcs_bucket
: der Name des zuvor erstellten Buckets (ohne das Präfixgs://
).gce_region
: die Region, in der Sie Ihren Dataproc-Job ausführen möchten, der die Anforderungen an die Dataproc Serverless-Netzwerkkonfiguration erfüllt. Das ist die Region, in der Sie den privaten Google-Zugriff zuvor aktiviert haben.dataproc_service_account
: das Dienstkonto für Ihre Cloud Composer-Umgebung. Sie finden diesen Dienst auf dem Tab für die Umgebungskonfiguration Cloud Composer-Umgebung.
DAG in den Bucket Ihrer Umgebung hochladen
Cloud Composer plant DAGs, die sich im Ordner /dags
des Buckets Ihrer Umgebung befinden. So laden Sie den DAG mithilfe der
Google Cloud Console:
Speichern Sie auf Ihrem lokalen Computer s3togcsoperator_tutorial.py
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie
s3togcsoperator_tutorial.py
auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.
DAG auslösen
Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
s3_to_gcs_dag
.Klicken Sie auf DAG auslösen.
Warten Sie etwa fünf bis zehn Minuten, bis ein grünes Häkchen angezeigt wird, das signalisiert, dass die Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG prüfen
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle zu sehen. Die Zahlen in der Spalte „Wert“ sind in Zehntelgrad Celsius angegeben.
Klicken Sie auf
holidays_weather_normalized
.Klicken Sie auf „Vorschau“, um die resultierende Tabelle zu sehen. Beachten Sie, dass die Zahlen in den Wertspalte sind in Grad Celsius angegeben.
Bereinigen
Löschen Sie einzelne Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie die Datei
holidays.csv
. in Ihrem AWS S3-Bucket.Löschen Sie den Cloud Storage-Bucket, den Sie die für diese Anleitung erstellt wurden.
Löschen Sie die Cloud Composer-Umgebung, einschließlich den Bucket der Umgebung manuell löschen.