Exécuter un DAG Apache Airflow dans Cloud Composer 2

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Ce guide de démarrage rapide explique comment créer un environnement Cloud Composer et exécuter un DAG Apache Airflow dans Cloud Composer 2.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. Pour obtenir les autorisations nécessaires pour suivre ce guide de démarrage rapide, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet :

    Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

    Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Créez un environnement

  1. Dans la console Google Cloud, accédez à la page Créer un environnement.

    Accéder à la page "Créer un environnement"

  1. S'il s'agit du premier environnement de votre projet, la section Accorder les autorisations requises pour le compte de service Cloud Composer s'affiche.

    Vous ajoutez un compte d'agent de service Cloud Composer en tant que nouveau sur le compte de service de votre environnement et accordez le rôle Extension d'agent de service de l'API Cloud Composer v2.

    Vérifiez que vous utilisez le compte de service prévu pour votre environnement, puis cliquez sur Accorder.

  2. Dans le champ Nom, saisissez example-environment.

  3. Dans la liste déroulante Emplacement, sélectionnez une région pour l'environnement Cloud Composer. Ce guide utilise la région us-central1.

  4. Pour les autres options de configuration de l'environnement, utilisez les valeurs par défaut fournies.

  5. Cliquez sur Créer, puis attendez que l'environnement soit créé.

  6. Lorsque vous avez terminé, une coche verte s'affiche à côté du nom de l'environnement.

Créer un fichier DAG

Un DAG Airflow est un ensemble de tâches organisées que vous voulez planifier et exécuter. Les DAG sont définis dans des fichiers Python standards.

Ce guide utilise un exemple de DAG Airflow défini dans le fichier quickstart.py. Le code Python de ce fichier effectue les opérations suivantes:

  1. Il crée un DAG, composer_sample_dag, Ce DAG s'exécute tous les jours.
  2. Il exécute une tâche, print_dag_run_conf, qui imprime la configuration de l'exécution du DAG à l'aide de l'opérateur bash.

Enregistrez une copie du fichier quickstart.py sur votre ordinateur local:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Importez le fichier DAG dans le bucket de votre environnement.

Chaque environnement Cloud Composer est associé à un bucket Cloud Storage. Airflow dans Cloud Composer ne programme que les DAG qui se trouvent dans le dossier /dags de ce bucket.

Pour planifier votre DAG, importez quickstart.py depuis votre ordinateur local vers votre dossier /dags de l'environnement:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement, example-environment. La page Détails de l'environnement s'ouvre.

  3. Cliquez sur Ouvrir le dossier des DAG. La page Informations sur le bucket s'ouvre.

  4. Cliquez sur Importer des fichiers, puis sélectionnez votre copie de quickstart.py.

  5. Pour importer le fichier, cliquez sur Ouvrir.

Afficher le DAG

Une fois le fichier DAG importé, Airflow effectue les opérations suivantes:

  1. Analyse le fichier DAG que vous avez importé. L'affichage du DAG dans Airflow peut prendre quelques minutes.
  2. Ajoute le DAG à la liste des DAG disponibles.
  3. Exécute le DAG selon le calendrier que vous avez fourni dans le fichier DAG.

Vérifiez que votre DAG est traité sans erreur et qu'il est disponible dans Airflow en l'affichant dans l'UI DAG. L'UI DAG est l'interface Cloud Composer permettant d'afficher les informations sur les DAG dans la console Google Cloud. Cloud Composer fournit aussi un accès à l'interface utilisateur Airflow, qui est une interface Web Airflow native ; de commande.

  1. Attendez environ cinq minutes pour laisser Airflow traiter le fichier DAG que vous avez importé précédemment et terminer la première exécution du DAG (expliqué plus loin).

  2. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  3. Dans la liste des environnements, cliquez sur le nom de votre environnement, example-environment La page Détails de l'environnement s'ouvre.

  4. Accédez à l'onglet DAG.

  5. Vérifier que le DAG composer_quickstart est présent dans la liste des DAG.

    La liste des DAG affiche le DAG composer_quickstart avec des informations supplémentaires telles que l'état et la planification.
    Figure 1 La liste des DAG affiche le DAG composer_quickstart (cliquez pour agrandir)

Afficher les détails de l'exécution du DAG

Une seule exécution d'un DAG est appelée exécution de DAG. Airflow immédiatement exécute une exécution du DAG pour l'exemple de DAG, car la date de début du fichier DAG est défini sur hier. De cette manière, Airflow rattrape la planification du DAG spécifié.

L'exemple de DAG contient une tâche, print_dag_run_conf, qui exécute la commande echo dans la console. Cette commande affiche des métadonnées sur le DAG (identifiant numérique de l'exécution du DAG).

  1. Dans l'onglet DAG, cliquez sur composer_quickstart. L'onglet Exécutions de la Le DAG s'ouvre.

  2. Dans la liste des exécutions du DAG, cliquez sur la première entrée.

    La liste des exécutions de DAG affiche l'exécution de DAG récente (sa date d'exécution et son état).
    Figure 2 : La liste des exécutions de DAG pour DAG composer_quickstart (cliquez pour agrandir)
  3. Les détails de l'exécution du DAG s'affichent. Il contient des informations sur les individus de l'exemple de DAG.

    Liste des tâches avec une entrée print_dag_run_conf, dont le
    l'heure de fin, l'heure de fin et la durée
    Figure 3 : Liste des tâches exécutées lors de l'exécution du DAG (cliquez pour agrandir)
  4. La section Journaux de l'exécution du DAG répertorie les journaux de toutes les tâches associées à l'exécution du DAG. Vous pouvez voir le résultat de la commande echo dans les journaux.

    Entrées de journal de la tâche, l'une d'elles étant "Output" (Sortie) et l'autre listant un identifiant
    Figure 4 : Journaux de la tâche print_dag_run_conf (cliquez pour agrandir)

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page soient facturées sur votre compte Google Cloud, procédez comme suit :

Supprimez les ressources utilisées dans ce tutoriel:

  1. Supprimez l'environnement Cloud Composer.

    1. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Sélectionnez example-environment, puis cliquez sur Supprimer.

    3. Attendez que l'environnement soit supprimé.

  2. Supprimez le bucket de votre environnement. La suppression de l'environnement Cloud Composer ne supprime pas son bucket.

    1. Dans la console Google Cloud, accédez à Stockage > Navigateur.

      Accéder à Cloud Storage > Navigateur

    2. Sélectionnez le bucket de l'environnement, puis cliquez sur Supprimer. Par exemple, ce bucket peut être nommé us-central1-example-environ-c1616fe8-bucket.

  3. Supprimez le disque persistant de la file d'attente Redis de votre environnement. La suppression de l'environnement Cloud Composer ne supprime pas son disque persistant.

    1. Dans la console Google Cloud, accédez à Compute Engine > Disques.

      Accéder à la page Disques

    2. Sélectionnez le disque persistant de la file d'attente Redis de l'environnement, puis cliquez sur Supprimer.

      Par exemple, ce disque peut être nommé pvc-02bc4842-2312-4347-8519-d87bdcd31115. Disques pour Cloud Composer 2 possède toujours le type Balanced persistent disk et est de 2 Go.

Étape suivante