Apache Airflow-DAG in Cloud Composer 1 (Google Cloud CLI) ausführen
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer 1 ausführen.
Wenn Sie Airflow noch nicht kennen, finden Sie in der Apache Airflow-Dokumentation im Airflow-Konzepte-Tutorial weitere Informationen zu den Konzepten und Objekten von Airflow und deren Anwendung.
Wenn Sie stattdessen die Google Cloud Console verwenden möchten, lesen Sie den Abschnitt Apache Airflow-DAG in Cloud Composer ausführen
Wenn Sie eine Umgebung mit Terraform erstellen möchten, lesen Sie den Hilfeartikel Umgebungen erstellen (Terraform).
Hinweise
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen der Kurzanleitung benötigen:
-
So rufen Sie die Cloud Composer-Umgebung auf, erstellen sie und verwalten sie:
-
Administrator für Umgebung und Storage-Objekte (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Administrator für Umgebung und Storage-Objekte (
-
So rufen Sie Logs auf:
Loganzeige (
roles/logging.viewer
)
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
-
So rufen Sie die Cloud Composer-Umgebung auf, erstellen sie und verwalten sie:
Umgebung erstellen
Erstellen Sie in der Region us-central1
eine neue Umgebung mit dem Namen example-environment
und der neuesten Cloud Composer 1-Version.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-1.20.12-airflow-1.10.15
DAG-Datei erstellen
Ein Airflow-DAG ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.
In dieser Anleitung wird ein Beispiel für einen Airflow-DAG verwendet, der in der Datei quickstart.py
definiert ist.
Der Python-Code in dieser Datei führt Folgendes aus:
- Erstellt den DAG
composer_sample_dag
. Dieser DAG wird täglich ausgeführt. - Die Aufgabe
print_dag_run_conf
wird ausgeführt. Sie gibt mithilfe des bash-Operators die Konfiguration der DAG-Ausführung aus.
Speichern Sie eine Kopie der Datei quickstart.py
auf Ihrem lokalen Computer:
DAG-Datei in den Bucket Ihrer Umgebung hochladen
Jede Cloud Composer-Umgebung hat ein Cloud Storage-
Bucket verknüpft. Airflow in Cloud Composer plant nur DAGs, die sich in diesem Bucket im Ordner /dags
befinden.
Laden Sie zum Planen des DAG quickstart.py
von Ihrem lokalen Computer auf den
Ordner /dags
der Umgebung:
Wenn Sie quickstart.py
mit der Google Cloud CLI hochladen möchten, führen Sie den folgenden Befehl im Ordner aus, in dem sich die quickstart.py
-Datei befindet:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG aufrufen
Nachdem Sie die DAG-Datei hochgeladen haben, geschieht in Airflow Folgendes:
- Die hochgeladene DAG-Datei wird geparst. Es kann einige Minuten dauern, DAG, der für Airflow verfügbar gemacht wird.
- Fügt den DAG der Liste der verfügbaren DAGs hinzu.
- Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.
Prüfen Sie, ob Ihr DAG ohne Fehler verarbeitet wird und in Airflow verfügbar ist. Rufen Sie ihn dazu in der DAG-Benutzeroberfläche auf. Die DAG-Benutzeroberfläche ist die Cloud Composer-Oberfläche, über die Sie DAG-Informationen in der Google Cloud Console aufrufen können. Cloud Composer bietet auch Zugriff auf die Airflow-Benutzeroberfläche, eine native Airflow-Weboberfläche.
Warten Sie etwa fünf Minuten, damit Airflow die DAG-Datei verarbeitet hat die Sie zuvor hochgeladen haben, und um die erste DAG-Ausführung abzuschließen (Erläuterung später).
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Dieser Befehl führt den Befehl
dags list
Befehl der Airflow-Befehlszeile zum Auflisten von DAGs in Ihren zu verbessern.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Prüfen Sie, ob der DAG
composer_quickstart
in der Ausgabe des Befehls aufgeführt ist.Beispielausgabe:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Details zur DAG-Ausführung ansehen
Eine einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow führt sofort eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei auf „gestern“ festgelegt ist. So gleicht Airflow den Zeitplan des angegebenen DAGs nach.
Der Beispiel-DAG enthält eine Aufgabe, print_dag_run_conf
, mit der der Befehl echo
in der Console ausgeführt wird. Dieser Befehl gibt Metainformationen zum DAG aus
(Numerische Kennung der DAG-Ausführung).
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden DAG-Ausführungen aufgelistet
für den DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Beispielausgabe:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Die Airflow-Befehlszeile bietet keinen Befehl zum Ansehen von Aufgabenprotokollen. Sie haben auch andere Möglichkeiten, Airflow-Aufgabenprotokolle aufzurufen: über die Cloud Composer DAG-Benutzeroberfläche, die Airflow-Benutzeroberfläche oder Cloud Logging. In diesem Leitfaden erfahren Sie, wie Sie Cloud Logging nach Logs aus einem bestimmten DAG-Lauf abfragen.
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Dieser Befehl liest Logs aus
Cloud Logging für eine bestimmte DAG-Ausführung des DAG composer_quickstart
. Mit dem Argument --format
wird die Ausgabe so formatiert, dass nur der Text der Protokollmeldung angezeigt wird.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Ersetzen Sie:
RUN_ID
durch den Wertrun_id
aus der Ausgabe des zuvor ausgeführten Befehlstasks states-for-dag-run
ersetzen. Beispiel:2024-02-17T15:38:38.969307+00:00
.
Beispielausgabe:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Bereinigen
Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
Löschen Sie die Ressourcen, die in dieser Anleitung verwendet wurden:
Löschen Sie die Cloud Composer-Umgebung.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie
example-environment
aus und klicken Sie auf Löschen.Warten Sie, bis die Umgebung gelöscht ist.
Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.
Rufen Sie in der Google Cloud Console Speicher auf > Browser.
Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise
us-central1-example-environ-c1616fe8-bucket
heißen.
Löschen Sie den nichtflüchtigen Speicher der Redis-Warteschlange Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird der nichtflüchtige Speicher nicht gelöscht.
Wechseln Sie in der Google Cloud Console zu Compute Engine > Laufwerke.
Wählen Sie den nichtflüchtigen Speicher der Redis-Warteschlange der Umgebung aus und klicken Sie auf Löschen.
Das Laufwerk kann beispielsweise
gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee
heißen. Laufwerke für Cloud Composer 1 haben immer den TypStandard persistent disk
und die Größe 2 GB.
Nächste Schritte