Exécuter un DAG Apache Airflow dans Cloud Composer 3 (Google Cloud CLI)
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Ce guide de démarrage rapide explique comment créer un environnement Cloud Composer et exécuter un DAG Apache Airflow dans Cloud Composer 3.
Si vous êtes un nouvel utilisateur d'Airflow, consultez le tutoriel sur les concepts Airflow dans la documentation Apache Airflow pour en savoir plus sur les concepts, les objets et leur utilisation.
Si vous préférez utiliser la console Google Cloud, consultez Exécuter un DAG Apache Airflow dans Cloud Composer.
Si vous souhaitez créer un environnement à l'aide de Terraform, consultez Créer des environnements (Terraform)
Avant de commencer
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Pour obtenir les autorisations nécessaires pour suivre ce guide de démarrage rapide, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet :
-
Pour afficher, créez et gérez l'environnement Cloud Composer:
-
Administrateur de l'environnement et des objets Storage (
roles/composer.environmentAndStorageObjectAdmin
) -
Utilisateur du compte de service (
roles/iam.serviceAccountUser
)
-
Administrateur de l'environnement et des objets Storage (
-
Pour afficher les journaux :
Lecteur de journaux (
roles/logging.viewer
)
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.
-
Pour afficher, créez et gérez l'environnement Cloud Composer:
Créez un environnement
Créez un environnement nommé example-environment
dans la région us-central1
, avec la dernière version de Cloud Composer 3.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.9.1-build.7
Créer un fichier DAG
Un DAG Airflow est un ensemble de tâches organisées que vous souhaitez programmer et exécuter. Les DAG sont définis dans des fichiers Python standards.
Ce guide utilise un exemple de DAG Airflow défini dans le fichier quickstart.py
.
Le code Python de ce fichier effectue les opérations suivantes :
- Il crée un DAG,
composer_sample_dag
, Ce DAG s'exécute tous les jours. - Il exécute une tâche,
print_dag_run_conf
, qui imprime la configuration de l'exécution du DAG à l'aide de l'opérateur bash.
Enregistrez une copie du fichier quickstart.py
sur votre ordinateur local :
Importer le fichier DAG dans le bucket de votre environnement
Chaque environnement Cloud Composer est associé à un bucket Cloud Storage. Planifications Airflow dans Cloud Composer uniquement
DAG situés dans le dossier /dags
de ce bucket.
Pour programmer votre DAG, importez le fichier quickstart.py
à partir de votre ordinateur local dans le dossier /dags
de votre environnement :
Pour importer quickstart.py
avec Google Cloud CLI, exécutez la commande suivante dans le dossier où se trouve le fichier quickstart.py
:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
Afficher le DAG
Une fois le fichier DAG importé, Airflow effectue les opérations suivantes:
- Analyse le fichier DAG que vous avez importé. L'opération peut prendre quelques minutes DAG pour être mis à la disposition d'Airflow
- Ajoute le DAG à la liste des DAG disponibles.
- Exécute le DAG selon la planification que vous avez indiquée dans le fichier DAG.
Vérifiez que votre DAG est traité sans erreur et qu'il est disponible dans Airflow en l'affichant dans l'UI DAG. L'UI du DAG est l'interface Cloud Composer permettant d'afficher Informations sur le DAG dans la console Google Cloud Cloud Composer fournit aussi un accès à l'interface utilisateur Airflow, qui est une interface Web Airflow native ; de commande.
Attendez environ cinq minutes pour laisser à Airflow le temps de traiter le fichier DAG que vous avez importé précédemment, et pour terminer la première exécution du DAG (expliqué plus tard).
Exécutez la commande suivante dans la Google Cloud CLI. Cette commande exécute la commande CLI Airflow
dags list
qui liste les DAG de votre environnement.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Vérifiez que le DAG
composer_quickstart
est listé dans le résultat de la commande.Exemple de résultat :
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Afficher les détails de l'exécution du DAG
Une seule exécution d'un DAG est appelée exécution de DAG. Airflow immédiatement exécute une exécution du DAG pour l'exemple de DAG, car la date de début du fichier DAG est défini sur hier. De cette manière, Airflow rattrape la planification du DAG spécifié.
L'exemple de DAG contient une tâche, print_dag_run_conf
, qui exécute la commande echo
dans la console. Cette commande affiche des métadonnées sur le DAG (identifiant numérique de l'exécution du DAG).
Exécutez la commande suivante dans la Google Cloud CLI. Cette commande liste les exécutions de DAG pour le DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Exemple de résultat :
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
La CLI Airflow ne fournit pas de commande permettant d'afficher les journaux des tâches. Vous pouvez utiliser d'autres méthodes pour afficher les journaux des tâches Airflow : l'interface utilisateur du DAG Cloud Composer, l'interface utilisateur d'Airflow ou Cloud Logging. Ce guide montre un moyen d'interroger Cloud Logging pour obtenir les journaux d'une exécution de DAG spécifique.
Exécutez la commande suivante dans la Google Cloud CLI. Cette commande lit les journaux de Cloud Logging pour une exécution de DAG spécifique du DAG composer_quickstart
. L'argument --format
met en forme la sortie de sorte que seul le texte du message de journal s'affiche.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Remplacez :
RUN_ID
par la valeurrun_id
issue de la sortie de la commandetasks states-for-dag-run
que vous avez exécutée précédemment. Par exemple,2024-02-17T15:38:38.969307+00:00
.
Exemple de résultat :
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Effectuer un nettoyage
Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.
Supprimez les ressources utilisées dans ce tutoriel:
Supprimez l'environnement Cloud Composer.
Dans la console Google Cloud, accédez à la page Environnements.
Sélectionnez
example-environment
, puis cliquez sur Supprimer.Attendez que l'environnement soit supprimé.
Supprimez le bucket de votre environnement. La suppression de l'environnement Cloud Composer ne supprime pas son bucket.
Dans la console Google Cloud, accédez à Stockage > Navigateur.
Sélectionnez le bucket de l'environnement, puis cliquez sur Supprimer. Par exemple, ce bucket peut être nommé
us-central1-example-environ-c1616fe8-bucket
.
Supprimez le disque persistant de la file d'attente Redis de votre environnement. La suppression de l'environnement Cloud Composer ne supprime pas son disque persistant.
Dans la console Google Cloud, accédez à Compute Engine > Disques.
Sélectionnez le disque persistant de la file d'attente Redis de l'environnement, puis cliquez sur Supprimer.
Par exemple, ce disque peut être nommé
gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee
. Les disques pour Cloud Composer 1 ont toujours le typeStandard persistent disk
et la taille de 2 Go.
Étape suivante