Exécuter un DAG Apache Airflow dans Cloud Composer 1 (Google Cloud CLI)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Ce guide de démarrage rapide explique comment créer un environnement Cloud Composer et exécuter un DAG Apache Airflow dans Cloud Composer 1.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  4. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Install the Google Cloud CLI.

  8. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  9. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Verify that billing is enabled for your Google Cloud project.

  12. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  13. Pour obtenir les autorisations nécessaires pour suivre ce guide de démarrage rapide, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet :

    Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

    Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

  14. Créer le compte de service d'un environnement

    Lorsque vous créez un environnement, vous spécifiez un compte de service. Ce compte de service est appelé compte de service de l'environnement. Votre environnement utilise ce compte de service pour effectuer la plupart des opérations.

    Le compte de service de votre environnement n'est pas un compte utilisateur. Un compte de service est un type particulier de compte utilisé par une application ou une instance de machine virtuelle (VM), et non par une personne.

    Pour créer un compte de service pour votre environnement :

    1. Créez un compte de service, comme décrit dans la documentation Identity and Access Management.

    2. Attribuez-lui un rôle, comme décrit dans la documentation Identity and Access Management. Le rôle requis est Nœud de calcul Composer (composer.worker).

    Créez un environnement

    Créez un environnement nommé example-environment dans la région us-central1, avec la dernière version de Cloud Composer 1.

    gcloud composer environments create example-environment \
        --location us-central1 \
        --image-version composer-1.20.12-airflow-1.10.15
    

    Créer un fichier DAG

    Un DAG Airflow est un ensemble de tâches organisées que vous souhaitez programmer et exécuter. Les DAG sont définis dans des fichiers Python standards.

    Ce guide utilise un exemple de DAG Airflow défini dans le fichier quickstart.py. Le code Python de ce fichier effectue les opérations suivantes :

    1. Il crée un DAG, composer_sample_dag, Ce DAG s'exécute tous les jours.
    2. Il exécute une tâche, print_dag_run_conf, qui imprime la configuration de l'exécution du DAG à l'aide de l'opérateur bash.

    Enregistrez une copie du fichier quickstart.py sur votre ordinateur local :

    import datetime
    
    from airflow import models
    from airflow.operators import bash
    
    # If you are running Airflow in more than one time zone
    # see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
    # for best practices
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    
    default_args = {
        "owner": "Composer Example",
        "depends_on_past": False,
        "email": [""],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=5),
        "start_date": YESTERDAY,
    }
    
    with models.DAG(
        "composer_quickstart",
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:
        # Print the dag_run id from the Airflow logs
        print_dag_run_conf = bash.BashOperator(
            task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
        )

    Importer le fichier DAG dans le bucket de votre environnement

    Chaque environnement Cloud Composer est associé à un bucket Cloud Storage. Airflow dans Cloud Composer ne programme que les DAG qui se trouvent dans le dossier /dags de ce bucket.

    Pour programmer votre DAG, importez le fichier quickstart.py à partir de votre ordinateur local dans le dossier /dags de votre environnement :

    Pour importer quickstart.py avec Google Cloud CLI, exécutez la commande suivante dans le dossier où se trouve le fichier quickstart.py :

    gcloud composer environments storage dags import \
    --environment example-environment --location us-central1 \
    --source quickstart.py
    

    Afficher le DAG

    Une fois le fichier DAG importé, Airflow effectue les opérations suivantes :

    1. Analyse le fichier DAG que vous avez importé. L'affichage du DAG dans Airflow peut prendre quelques minutes.
    2. Ajoute le DAG à la liste des DAG disponibles.
    3. Exécute le DAG selon le calendrier que vous avez fourni dans le fichier DAG.

    Vérifiez que votre DAG est traité sans erreur et qu'il est disponible dans Airflow en l'affichant dans l'UI DAG. L'UI DAG est l'interface Cloud Composer permettant d'afficher les informations sur les DAG dans la console Google Cloud . Cloud Composer permet également d'accéder à l'interface utilisateur Airflow, qui est une interface Web Airflow native.

    1. Patientez environ cinq minutes pour laisser le temps à Airflow de traiter le fichier DAG que vous avez importé précédemment et de terminer la première exécution du DAG (expliquée plus loin).

    2. Exécutez la commande suivante dans Google Cloud CLI. Cette commande exécute la commande de CLI Airflow dags list qui liste les DAG dans votre environnement.

      gcloud composer environments run example-environment \
      --location us-central1 \
      dags list
      
    3. Vérifiez que le DAG composer_quickstart est listé dans le résultat de la commande.

      Exemple de résultat :

      Executing the command: [ airflow dags list ]...
      Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
      Use ctrl-c to interrupt the command
      dag_id              | filepath              | owner            | paused
      ====================+=======================+==================+=======
      airflow_monitoring  | airflow_monitoring.py | airflow          | False
      composer_quickstart | dag-quickstart-af2.py | Composer Example | False
      

    Afficher les détails d'une exécution de DAG

    Une seule exécution d'un DAG est appelée exécution de DAG. Airflow exécute immédiatement une exécution DAG pour l'exemple de DAG, car la date de début dans le fichier DAG est définie sur "hier". Airflow rattrape ainsi le calendrier du DAG spécifié.

    L'exemple de DAG contient une tâche, print_dag_run_conf, qui exécute la commande echo dans la console. Cette commande génère des métadonnées sur le DAG (identifiant numérique de l'exécution du DAG).

    Exécutez la commande suivante dans Google Cloud CLI. Cette commande liste les exécutions de DAG pour le DAG composer_quickstart :

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list-runs -- --dag-id composer_quickstart
    

    Exemple de résultat :

    dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
    ====================+=============================================+=========+==================================+==================================+=================================
    composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
    

    La CLI Airflow ne fournit pas de commande permettant d'afficher les journaux des tâches. Vous pouvez utiliser d'autres méthodes pour afficher les journaux des tâches Airflow : l'interface utilisateur DAG Cloud Composer, l'interface utilisateur Airflow ou Cloud Logging. Ce guide explique comment interroger Cloud Logging pour obtenir les journaux d'une exécution de DAG spécifique.

    Exécutez la commande suivante dans Google Cloud CLI. Cette commande lit les journaux de Cloud Logging pour une exécution de DAG spécifique du DAG composer_quickstart. L'argument --format met en forme la sortie de sorte que seul le texte du message de journal soit affiché.

    gcloud logging read \
    --format="value(textPayload)" \
    --order=asc \
    "resource.type=cloud_composer_environment \
    resource.labels.location=us-central1 \
    resource.labels.environment_name=example-environment \
    labels.workflow=composer_quickstart \
    (labels.\"execution-date\"=\"RUN_ID\")"
    

    Remplacez :

    • RUN_ID par la valeur run_id issue du résultat de la commande tasks states-for-dag-run que vous avez exécutée précédemment. Par exemple, 2024-02-17T15:38:38.969307+00:00.

    Exemple de résultat :

    ...
    
    Starting attempt 1 of 2
    Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
    15:38:38.969307+00:00
    Started process 22544 to run task
    
    ...
    
    Running command: ['/usr/bin/bash', '-c', 'echo 115746']
    Output:
    115746
    
    ...
    
    Command exited with return code 0
    Marking task as SUCCESS. dag_id=composer_quickstart,
    task_id=print_dag_run_conf, execution_date=20240217T153838,
    start_date=20240218T153841, end_date=20240218T153841
    Task exited with return code 0
    0 downstream tasks scheduled from follow-on schedule check
    

    Effectuer un nettoyage

    Pour éviter que les ressources utilisées dans cette démonstration soient facturées sur votre compte Google Cloud , supprimez le projet Google Cloud qui les contient.

    Supprimez les ressources utilisées dans ce tutoriel :

    1. Supprimez l'environnement Cloud Composer.

      1. Dans la console Google Cloud , accédez à la page Environnements.

        Accéder à la page Environnements

      2. Sélectionnez example-environment, puis cliquez sur Supprimer.

      3. Attendez que l'environnement soit supprimé.

    2. Supprimez le bucket de votre environnement. La suppression de l'environnement Cloud Composer ne supprime pas son bucket.

      1. Dans la console Google Cloud , accédez à la page Stockage > Navigateur.

        Accéder à Cloud Storage > Navigateur

      2. Sélectionnez le bucket de l'environnement, puis cliquez sur Supprimer. Par exemple, ce bucket peut être nommé us-central1-example-environ-c1616fe8-bucket.

    Étapes suivantes