Apache Airflow-DAG in Cloud Composer 2 (Google Cloud CLI) ausführen
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen. und führen Sie einen Apache Airflow-DAG in Cloud Composer 2 aus.
Wenn Sie Airflow noch nicht kennen, lesen Sie die Anleitung zu Airflow-Konzepten in Apache Airflow finden Sie weitere Informationen zu Airflow-Konzepten, -Objekten und deren Nutzung.
Wenn Sie stattdessen die Google Cloud Console verwenden möchten, lesen Sie den Hilfeartikel Apache Airflow-DAG in Cloud Composer ausführen.
Wie Sie eine Umgebung mit Terraform erstellen, erfahren Sie unter Umgebungen erstellen (Terraform)
Hinweise
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen der Kurzanleitung benötigen:
-
So rufen Sie die Cloud Composer-Umgebung auf, erstellen sie und verwalten sie:
-
Administrator für Umgebung und Storage-Objekte (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Administrator für Umgebung und Storage-Objekte (
-
So können Sie Logs aufrufen:
Loganzeige (
roles/logging.viewer
)
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
-
So rufen Sie die Cloud Composer-Umgebung auf, erstellen sie und verwalten sie:
Umgebung erstellen
Wenn dies die erste Umgebung in Ihrem Projekt ist, fügen Sie das Cloud Composer-Dienstkonto als neuen Hauptberechtigten zum Dienstkonto Ihrer Umgebung hinzu und weisen Sie ihm die Rolle roles/composer.ServiceAgentV2Ext
zu.
Standardmäßig verwendet Ihre Umgebung die Compute Engine-Standarddienstkonto und den Das folgende Beispiel zeigt, wie Sie diese Berechtigung hinzufügen.
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
Erstellen Sie im us-central1
eine neue Umgebung mit dem Namen example-environment
.
Region mit der neuesten Cloud Composer 2-Version.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.9.4-airflow-2.9.1
DAG-Datei erstellen
Ein Airflow-DAG ist eine Sammlung organisierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.
In dieser Anleitung wird ein Beispiel für einen Airflow-DAG verwendet, der in der Datei quickstart.py
definiert ist.
Der Python-Code in dieser Datei führt Folgendes aus:
- Der DAG
composer_sample_dag
wird erstellt. Dieser DAG wird täglich ausgeführt. - Die Aufgabe
print_dag_run_conf
wird ausgeführt. Sie gibt mithilfe des bash-Operators die Konfiguration der DAG-Ausführung aus.
Speichern Sie eine Kopie der Datei quickstart.py
auf Ihrem lokalen Computer:
DAG-Datei in den Bucket Ihrer Umgebung hochladen
Jede Cloud Composer-Umgebung ist mit einem Cloud Storage-Bucket verknüpft. Airflow in Cloud Composer plant nur DAGs, die sich in diesem Bucket im Ordner /dags
befinden.
Laden Sie zum Planen des DAG quickstart.py
von Ihrem lokalen Computer auf den
Ordner /dags
der Umgebung:
Führen Sie den folgenden Befehl aus, um quickstart.py
mit der Google Cloud CLI hochzuladen:
den Ordner, in dem sich die Datei quickstart.py
befindet:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG aufrufen
Nachdem Sie die DAG-Datei hochgeladen haben, geschieht in Airflow Folgendes:
- Analysiert die von Ihnen hochgeladene DAG-Datei. Es kann einige Minuten dauern, bis der DAG für Airflow verfügbar ist.
- Der DAG wird der Liste der verfügbaren DAGs hinzugefügt.
- Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.
Prüfen Sie, ob Ihr DAG fehlerfrei verarbeitet wird und in Airflow verfügbar ist, indem Sie in der DAG-UI ansehen. DAG-UI ist die Cloud Composer-Oberfläche zum Ansehen DAG-Informationen in der Google Cloud Console Cloud Composer bietet auch Zugriff auf die Airflow-Benutzeroberfläche, eine native Airflow-Weboberfläche.
Warten Sie etwa fünf Minuten, damit Airflow die DAG-Datei verarbeitet hat die Sie zuvor hochgeladen haben, und um die erste DAG-Ausführung abzuschließen (Erläuterung später).
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Dieser Befehl führt den Befehl
dags list
Befehl der Airflow-Befehlszeile zum Auflisten von DAGs in Ihren zu verbessern.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Prüfen Sie, ob die DAG
composer_quickstart
in der Ausgabe des Befehls aufgeführt ist.Beispielausgabe:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Details zur DAG-Ausführung ansehen
Eine einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow führt sofort eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei auf „gestern“ festgelegt ist. So gleicht Airflow den Zeitplan des angegebenen DAGs nach.
Der Beispiel-DAG enthält die Aufgabe print_dag_run_conf
, die den echo
ausführt.
in der Konsole. Dieser Befehl gibt Metainformationen zum DAG aus
(Numerische Kennung der DAG-Ausführung).
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden DAG-Ausführungen aufgelistet
für den DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Beispielausgabe:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Die Airflow-Befehlszeile bietet keinen Befehl zum Aufrufen von Aufgabenlogs. Sie können andere Methoden zum Anzeigen von Airflow-Aufgabenlogs: Cloud Composer-DAG-UI, Airflow-UI oder Cloud Logging In diesem Leitfaden erfahren Sie, wie Sie Cloud Logging nach Logs aus einem bestimmten DAG-Lauf abfragen.
Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden Logs aus Cloud Logging für eine bestimmte DAG-Ausführung der composer_quickstart
-DAG gelesen. Mit dem Argument --format
wird die Ausgabe so formatiert, dass nur der Text der Protokollmeldung angezeigt wird.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Ersetzen Sie:
RUN_ID
durch den Wertrun_id
aus der Ausgabe dertasks states-for-dag-run
-Befehl, den Sie zuvor ausgeführt haben. Beispiel:2024-02-17T15:38:38.969307+00:00
.
Beispielausgabe:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Bereinigen
Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
Löschen Sie die Ressourcen, die in dieser Anleitung verwendet wurden:
Löschen Sie die Cloud Composer-Umgebung.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie
example-environment
aus und klicken Sie auf Löschen.Warten Sie, bis die Umgebung gelöscht ist.
Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.
Rufen Sie in der Google Cloud Console Speicher auf > Browser.
Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise
us-central1-example-environ-c1616fe8-bucket
heißen.
Löschen Sie den nichtflüchtigen Speicher der Redis-Warteschlange Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird der nichtflüchtige Speicher nicht gelöscht.
Gehen Sie in der Google Cloud Console zu Compute Engine > Laufwerke:
Wählen Sie den nichtflüchtigen Speicher der Redis-Warteschlange der Umgebung aus und klicken Sie auf Löschen.
Dieses Laufwerk kann z. B. mit dem Namen
pvc-02bc4842-2312-4347-8519-d87bdcd31115
. Laufwerke für Cloud Composer 2 hat immer den TypBalanced persistent disk
und so groß wie 2 GB.
Nächste Schritte