Exécuter un DAG Apache Airflow dans Cloud Composer 2

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment créer un environnement Cloud Composer et exécuter un DAG Apache Airflow dans Cloud Composer 2.

Si vous êtes un nouvel utilisateur d'Airflow, consultez ce tutoriel pour en savoir plus sur les concepts, les objets et leur utilisation Airflow.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.

  4. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  5. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.

  6. Activez l'API Cloud Composer

    Activer l'API

Créez un environnement

Console

  1. Dans Google Cloud Console, accédez à la page Créer un environnement.

    Accéder à la page "Créer un environnement"

  2. Dans le champ Nom, saisissez example-environment.

  3. Dans la liste déroulante Emplacement, sélectionnez une région pour l'environnement Cloud Composer. Pour plus d'informations sur la sélection d'une région, consultez la page Régions disponibles.

  4. Pour les autres options de configuration de l'environnement, utilisez les valeurs par défaut fournies.

  5. Pour créer l'environnement, cliquez sur Créer.

  6. Attendez que l'environnement soit créé. Lorsque vous avez terminé, une coche verte s'affiche à côté du nom de l'environnement.

gcloud

gcloud beta composer environments create ENVIRONMENT_NAME \
    --location LOCATION \
    --image-version IMAGE_VERSION

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement. Ce guide de démarrage rapide utilise example-environment.
  • LOCATION par une région pour l'environnement Cloud Composer. Pour plus d'informations sur la sélection d'une région, consultez la page Régions disponibles.
  • IMAGE_VERSION par le nom de l'image Cloud Composer. Ce guide utilise composer-2.0.11-airflow-2.2.3 pour créer un environnement avec la dernière image Cloud Composer 2.

Exemple :

gcloud beta composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.0.11-airflow-2.2.3

Terraform

Pour configurer cet environnement à l'aide de Terraform, ajoutez le bloc de ressources suivant à votre configuration Terraform, puis exécutez terraform apply.

Pour en savoir plus sur l'utilisation de Terraform pour créer un environnement Cloud Composer, consultez la documentation Terraform.

resource "google_composer_environment" "example" {
  provider = google-beta
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME par le nom de l'environnement. Ce guide de démarrage rapide utilise example-environment.
  • LOCATION par une région pour l'environnement Cloud Composer. Pour plus d'informations sur la sélection d'une région, consultez la page Régions disponibles.
  • IMAGE_VERSION par le nom de l'image Cloud Composer. Ce guide utilise composer-2.0.11-airflow-2.2.3 pour créer un environnement avec la dernière image Cloud Composer 2.

Exemple :

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-2.0.11-airflow-2.2.3"
    }
  }

}

Afficher les détails de l'environnement

Une fois la création de l'environnement terminée, vous pouvez afficher les informations de votre environnement, telles que la version de Cloud Composer, l'URL de l'interface Web Airflow et le dossier des DAG dans Cloud Storage.

Pour afficher les informations sur l'environnement, procédez comme suit :

  1. Dans Google Cloud Console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Pour afficher la page Détails de l'environnement, cliquez sur le nom de votre environnement, example-environment.

Créer un DAG

Un DAG Airflow est un ensemble de tâches organisées que vous souhaitez programmer et exécuter. Les DAG sont définis dans des fichiers Python standards.

Le code Python dans quickstart.py effectue les opérations suivantes :

  1. Il crée un DAG, composer_sample_dag, lequel s'exécute une fois par jour.
  2. Il exécute une tâche, print_dag_run_conf, qui imprime la configuration de l'exécution du DAG à l'aide de l'opérateur bash.

Pour créer un DAG, créez une copie du fichier quickstart.py sur votre machine locale.

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_quickstart',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

Importer le DAG dans Cloud Storage

Cloud Composer ne programme que les DAG qui se trouvent dans le dossier /dags du bucket Cloud Storage de l'environnement.

Pour programmer votre DAG, importez le fichier quickstart.py à partir de votre ordinateur local dans le dossier /dags de votre environnement.

Console

  1. Dans Google Cloud Console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Pour ouvrir le dossier /dags, suivez le lien Dossier des graphes orientés acycliques pour example-environment.

  3. Sur la page "Informations sur le bucket", cliquez sur Importer des fichiers, puis sélectionnez votre copie locale de quickstart.py.

  4. Pour importer le fichier, cliquez sur Ouvrir.

    Une fois que vous avez importé votre DAG, Cloud Composer l'ajoute à Airflow et planifie son exécution immédiate. L'affichage du DAG dans l'interface Web Airflow peut prendre quelques minutes.

gcloud

Pour importer quickstart.py avec gcloud, exécutez la commande suivante :

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Afficher le DAG dans l'interface utilisateur Airflow

Chaque environnement Cloud Composer dispose d'un serveur Web qui exécute l'interface Web Airflow. Vous pouvez gérer les DAG à partir de l'interface Web d'Airflow.

Pour afficher le DAG dans l'interface Web Airflow, procédez comme suit :

  1. Dans Google Cloud Console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Pour ouvrir l'interface Web Airflow, cliquez sur le lien Airflow pour example-environment. L'interface utilisateur Web Airflow s'ouvre dans une nouvelle fenêtre du navigateur.

  3. Dans la barre d'outils Airflow, accédez à la page des DAG.

  4. Pour ouvrir la page de détails des DAG, cliquez sur composer_sample_dag.

    Page des DAG dans l'interface utilisateur Airflow
    Figure 1. Page des DAG dans l'interface utilisateur Airflow (cliquez pour agrandir)

    La page du DAG présente l'arborescence, soit une représentation graphique des tâches et des dépendances du workflow.

    Affichage de l'arborescence pour le DAG composer_sample_dags
    Figure 2. Affichage sous forme d'arborescence pour le DAG composer_sample_dags

Afficher les détails de l'instance de tâche dans les journaux Airflow

Le DAG que vous avez programmé inclut la tâche print_dag_run_conf. La tâche imprime la configuration de l'exécution du DAG, que vous pouvez consulter dans les journaux Airflow de l'instance de tâche.

Pour afficher les détails de l'instance de tâche, procédez comme suit :

  1. Dans l'arborescence du DAG figurant dans l'interface Web Airflow, cliquez sur Graph View (Vue graphique).

    Si vous maintenez le pointeur sur la tâche print_dag_run_conf, son état s'affiche.

    Affichage de l'arborescence pour le DAG composer_sample_dags
    Figure 3 : État de la tâche "print_dag_run_conf"
  2. Cliquez sur la tâche print_dag_run_conf.

    Dans le menu contextuel de l'instance de tâche, vous pouvez obtenir des métadonnées et effectuer certaines actions.

    Menu contextuel de l'instance de tâche pour la tâche composer_sample_dags
    Figure 4. Menu contextuel de l'instance de tâche pour la tâche composer_sample_dags
  3. Dans le menu contextuel de l'instance de tâche, cliquez sur Log (Journal).

  4. Dans le journal, recherchez Running command: ['bash' pour voir le résultat de l'opérateur bash.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, procédez comme suit :

Supprimez les ressources utilisées dans ce tutoriel:

  1. Supprimez l'environnement Cloud Composer.

    1. Dans Google Cloud Console, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Sélectionnez example-environment, puis cliquez sur Supprimer.

    3. Attendez que l'environnement soit supprimé.

  2. Supprimez le bucket de votre environnement. La suppression de l'environnement Cloud Composer ne supprime pas son bucket.

    1. Dans Google Cloud Console, accédez à la page Stockage (> Navigateur).

      Accéder à Cloud Storage > Navigateur

    2. Sélectionnez le bucket de l'environnement, puis cliquez sur Supprimer. Par exemple, ce bucket peut être nommé us-central1-example-environ-c1616fe8-bucket.

  3. Supprimez le disque persistant de la file d'attente Redis de votre environnement. La suppression de l'environnement Cloud Composer ne supprime pas son disque persistant.

    1. Dans Google Cloud Console, accédez à l'outil Compute Engine &Disks.

      Accéder à la page Disques

    2. Sélectionnez le disque persistant de la file d'attente Redis de l'environnement, puis cliquez sur Supprimer.

      Par exemple, ce disque peut être nommé pvc-02bc4842-2312-4347-8519-d87bdcd31115. Les disques Cloud Composer 2 ont toujours le type Balanced persistent disk et la taille de 2 Go.

Étapes suivantes