Apache Airflow-DAG in Cloud Composer 3 (Google Cloud CLI) ausführen
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer 3 ausführen.
Wenn Sie Airflow noch nicht kennen, finden Sie in der Anleitung zu Airflow-Konzepten in der Apache Airflow-Dokumentation weitere Informationen zu Airflow-Konzepten, -Objekten und ihrer Verwendung.
Wenn Sie stattdessen die Google Cloud Console verwenden möchten, lesen Sie Apache Airflow-DAG in Cloud Composer ausführen.
Wie Sie eine Umgebung mit Terraform erstellen, erfahren Sie unter Umgebungen erstellen (Terraform).
Hinweise
- Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.
-
Aktivieren Sie die Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen der Kurzanleitung benötigen:
-
So erstellen und verwalten Sie die Cloud Composer-Umgebung:
-
Administrator für Umgebung und Storage-Objekte (
roles/composer.environmentAndStorageObjectAdmin
) -
Dienstkontonutzer (
roles/iam.serviceAccountUser
)
-
Administrator für Umgebung und Storage-Objekte (
-
So rufen Sie Logs auf:
Loganzeige (
roles/logging.viewer
)
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.
Möglicherweise können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
-
So erstellen und verwalten Sie die Cloud Composer-Umgebung:
Umgebung erstellen
Erstellen Sie eine neue Umgebung mit dem Namen example-environment
in der Region us-central1
mit der neuesten Version von Cloud Composer 3.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.7.3-build.6
DAG-Datei erstellen
Ein Airflow-DAG ist eine Sammlung organisierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.
In dieser Anleitung wird ein Beispiel für einen Airflow-DAG verwendet, der in der Datei quickstart.py
definiert ist.
Der Python-Code in dieser Datei führt Folgendes aus:
- Erstellt den DAG
composer_sample_dag
. Dieser DAG wird täglich ausgeführt. - Führt die Aufgabe
print_dag_run_conf
aus. Die Aufgabe gibt die Konfiguration der DAG-Ausführung mithilfe des bash-Operators aus.
Speichern Sie eine Kopie der Datei quickstart.py
auf Ihrem lokalen Computer:
DAG-Datei in den Bucket Ihrer Umgebung hochladen
Jeder Cloud Composer-Umgebung ist ein Cloud Storage-Bucket zugeordnet. Airflow in Cloud Composer plant nur DAGs, die sich im Ordner /dags
in diesem Bucket befinden.
Laden Sie zum Planen Ihres DAG quickstart.py
von Ihrem lokalen Computer in den Ordner /dags
Ihrer Umgebung hoch:
Führen Sie zum Hochladen von quickstart.py
mit der Google Cloud CLI den folgenden Befehl in dem Ordner aus, in dem sich die Datei quickstart.py
befindet:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG aufrufen
Nach dem Hochladen der DAG-Datei führt Airflow die folgenden Schritte aus:
- Analysiert die von Ihnen hochgeladene DAG-Datei. Es kann einige Minuten dauern, bis der DAG für Airflow verfügbar ist.
- Fügt den DAG der Liste der verfügbaren DAGs hinzu.
- Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.
Prüfen Sie, ob der DAG fehlerfrei verarbeitet wird und in Airflow verfügbar ist. Rufen Sie ihn dazu in der DAG-UI auf. Die DAG-UI ist die Cloud Composer-Oberfläche zum Aufrufen von DAG-Informationen in der Google Cloud Console. Cloud Composer bietet außerdem Zugriff auf die Airflow-UI, eine native Airflow-Weboberfläche.
Warten Sie etwa fünf Minuten, damit Airflow die zuvor hochgeladene DAG-Datei verarbeitet und die erste DAG-Ausführung abgeschlossen hat (wird später erläutert).
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl wird der Airflow-Befehl
dags list
ausgeführt, mit dem die DAGs in Ihrer Umgebung aufgelistet werden.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Prüfen Sie, ob der DAG
composer_quickstart
in der Ausgabe des Befehls aufgeführt ist.Beispielausgabe:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Details zur DAG-Ausführung ansehen
Eine einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow führt sofort eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei auf gestern festgelegt ist. Auf diese Weise entspricht Airflow dem Zeitplan des angegebenen DAG.
Der Beispiel-DAG enthält die Aufgabe print_dag_run_conf
, mit der der Befehl echo
in der Console ausgeführt wird. Dieser Befehl gibt Metainformationen über den DAG (die numerische Kennung der DAG-Ausführung) aus.
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden die DAG-Ausführungen für den DAG composer_quickstart
aufgelistet:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Beispielausgabe:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Die Airflow-Befehlszeile bietet keinen Befehl zum Aufrufen von Aufgabenlogs. Sie können andere Methoden zum Aufrufen von Airflow-Aufgabenlogs verwenden: die Cloud Composer-DAG-UI, die Airflow-UI oder Cloud Logging. In dieser Anleitung wird beschrieben, wie Sie Cloud Logging nach Logs aus einer bestimmten DAG-Ausführung abfragen können.
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden Logs aus Cloud Logging für eine bestimmte DAG-Ausführung des DAG composer_quickstart
gelesen. Das Argument --format
formatiert die Ausgabe so, dass nur der Text der Lognachricht angezeigt wird.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Ersetzen Sie:
RUN_ID
durch den Wertrun_id
aus der Ausgabe des zuvor ausgeführten Befehlstasks states-for-dag-run
. Beispiel:2024-02-17T15:38:38.969307+00:00
.
Beispielausgabe:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Bereinigen
Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
Löschen Sie die in dieser Anleitung verwendeten Ressourcen:
Löschen Sie die Cloud Composer-Umgebung.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie
example-environment
aus und klicken Sie auf Löschen.Warten Sie, bis die Umgebung gelöscht ist.
Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.
Rufen Sie in der Google Cloud Console die Seite Storage > Browser auf.
Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise
us-central1-example-environ-c1616fe8-bucket
heißen.
Löschen Sie den nichtflüchtigen Speicher der Redis-Warteschlange Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird der nichtflüchtige Speicher nicht gelöscht.
Wechseln Sie in der Google Cloud Console zu Compute Engine > Laufwerke.
Wählen Sie den nichtflüchtigen Speicher der Redis-Warteschlange der Umgebung aus und klicken Sie auf Löschen.
Dieses Laufwerk kann beispielsweise den Namen
gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee
haben. Laufwerke für Cloud Composer 1 haben immer den TypStandard persistent disk
und eine Größe von 2 GB.
Nächste Schritte