Apache Airflow-DAG in Cloud Composer 3 (Google Cloud CLI) ausführen
Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer 3 ausführen.
Wenn Sie Airflow noch nicht kennen, finden Sie in der Apache Airflow-Dokumentation im Airflow-Konzepte-Tutorial weitere Informationen zu den Konzepten und Objekten von Airflow und deren Anwendung.
Wenn Sie stattdessen die Google Cloud Console verwenden möchten, lesen Sie den Hilfeartikel Apache Airflow-DAG in Cloud Composer ausführen.
Wenn Sie eine Umgebung mit Terraform erstellen möchten, lesen Sie den Hilfeartikel Umgebungen erstellen (Terraform).
Hinweis
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen der Kurzanleitung benötigen:
-
So rufen Sie die Cloud Composer-Umgebung auf, erstellen sie und verwalten sie:
-
Administrator für Umgebung und Storage-Objekte (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Administrator für Umgebung und Storage-Objekte (
-
So rufen Sie Logs auf:
Loganzeige (
roles/logging.viewer
)
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
-
So rufen Sie die Cloud Composer-Umgebung auf, erstellen sie und verwalten sie:
Dienstkonto einer Umgebung erstellen
Wenn Sie eine Umgebung erstellen, geben Sie ein Dienstkonto an. Dieses Dienstkonto wird als Dienstkonto der Umgebung bezeichnet. In Ihrer Umgebung wird dieses Dienstkonto für die meisten Vorgänge verwendet.
Das Dienstkonto für Ihre Umgebung ist kein Nutzerkonto. Ein Dienstkonto ist ein spezieller Kontotyp, der nicht von einer Person, sondern von einer Anwendung oder einer VM-Instanz verwendet wird.
So erstellen Sie ein Dienstkonto für Ihre Umgebung:
Erstellen Sie ein neues Dienstkonto, wie in der Dokumentation zur Identitäts- und Zugriffsverwaltung beschrieben.
Weisen Sie ihm eine Rolle zu, wie in der Identity and Access Management-Dokumentation beschrieben. Die erforderliche Rolle ist Composer-Worker (
composer.worker
).
Umgebung erstellen
Erstellen Sie in der Region us-central1
eine neue Umgebung mit dem Namen example-environment
und der neuesten Cloud Composer 3-Version.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.10.2-build.7
DAG-Datei erstellen
Ein Airflow-DAG ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.
In dieser Anleitung wird ein Beispiel-Airflow-DAG verwendet, das in der Datei quickstart.py
definiert ist.
Der Python-Code in dieser Datei führt Folgendes aus:
- Der DAG
composer_sample_dag
wird erstellt. Dieser DAG wird jeden Tag ausgeführt. - Die Aufgabe
print_dag_run_conf
wird ausgeführt. Sie gibt mithilfe des bash-Operators die Konfiguration der DAG-Ausführung aus.
Speichern Sie eine Kopie der Datei quickstart.py
auf Ihrem lokalen Computer:
DAG-Datei in den Bucket Ihrer Umgebung hochladen
Jede Cloud Composer-Umgebung ist mit einem Cloud Storage-Bucket verknüpft. Airflow in Cloud Composer plant nur DAGs, die sich in diesem Bucket im Ordner /dags
befinden.
Zum Planen Ihres DAG laden Sie quickstart.py
von Ihrem lokalen Computer in den Ordner /dags
Ihrer Umgebung hoch:
Wenn Sie quickstart.py
mit der Google Cloud CLI hochladen möchten, führen Sie im Ordner, in dem sich die quickstart.py
-Datei befindet, den folgenden Befehl aus:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG aufrufen
Nachdem Sie die DAG-Datei hochgeladen haben, geschieht in Airflow Folgendes:
- Die hochgeladene DAG-Datei wird geparst. Es kann einige Minuten dauern, bis der DAG für Airflow verfügbar ist.
- Der DAG wird der Liste der verfügbaren DAGs hinzugefügt.
- Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.
Prüfen Sie, ob Ihr DAG ohne Fehler verarbeitet wird und in Airflow verfügbar ist. Rufen Sie ihn dazu in der DAG-Benutzeroberfläche auf. Die DAG-Benutzeroberfläche ist die Cloud Composer-Oberfläche, über die Sie DAG-Informationen in der Google Cloud Console aufrufen können. Cloud Composer bietet auch Zugriff auf die Airflow-Benutzeroberfläche, eine native Airflow-Weboberfläche.
Warten Sie etwa fünf Minuten, damit Airflow die zuvor hochgeladene DAG-Datei verarbeiten und die erste DAG-Ausführung abschließen kann (wird später erläutert).
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl wird der
dags list
-Befehl der Airflow-Befehlszeile ausgeführt, der DAGs in Ihrer Umgebung auflistet.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Prüfen Sie, ob die DAG
composer_quickstart
in der Ausgabe des Befehls aufgeführt ist.Beispielausgabe:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Details zur DAG-Ausführung ansehen
Eine einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow führt sofort eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei auf „gestern“ festgelegt ist. So gleicht Airflow den Zeitplan des angegebenen DAGs wieder an.
Der Beispiel-DAG enthält eine Aufgabe, print_dag_run_conf
, die den Befehl echo
in der Console ausführt. Mit diesem Befehl werden Metainformationen zum DAG ausgegeben (numerische Kennung der DAG-Ausführung).
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden DAG-Ausführungen für die DAG composer_quickstart
aufgelistet:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Beispielausgabe:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Die Airflow-Befehlszeile bietet keinen Befehl zum Ansehen von Aufgabenprotokollen. Sie haben auch andere Möglichkeiten, Airflow-Aufgabenprotokolle aufzurufen: die Cloud Composer DAG-Benutzeroberfläche, die Airflow-Benutzeroberfläche oder Cloud Logging. In diesem Leitfaden erfahren Sie, wie Sie Cloud Logging nach Logs aus einem bestimmten DAG-Lauf abfragen.
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden Logs aus Cloud Logging für eine bestimmte DAG-Ausführung der composer_quickstart
-DAG gelesen. Mit dem Argument --format
wird die Ausgabe so formatiert, dass nur der Text der Protokollmeldung angezeigt wird.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Ersetzen Sie:
RUN_ID
durch den Wertrun_id
aus der Ausgabe des zuvor ausgeführten Befehlstasks states-for-dag-run
ersetzen. Beispiel:2024-02-17T15:38:38.969307+00:00
.
Beispielausgabe:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Bereinigen
Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud -Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
Löschen Sie die Ressourcen, die in dieser Anleitung verwendet wurden:
Löschen Sie die Cloud Composer-Umgebung.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie
example-environment
aus und klicken Sie auf Löschen.Warten Sie, bis die Umgebung gelöscht ist.
Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.
Rufen Sie in der Google Cloud Console die Seite Speicher > Browser auf.
Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise
us-central1-example-environ-c1616fe8-bucket
heißen.
Nächste Schritte