Ajouter et mettre à jour des DAG

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment gérer les DAG dans votre environnement Cloud Composer.

Cloud Composer stocke les DAG de votre environnement Cloud Composer à l'aide d'un bucket Cloud Storage. Votre environnement synchronise les DAG de ce bucket avec les composants Airflow tels que les nœuds de calcul et les programmeurs Airflow.

Avant de commencer

  • Étant donné qu'Apache Airflow n'offre pas une forte isolation DAG, nous vous recommandons de maintenir des environnements de production et de test distincts afin d'éviter les interférences au niveau des DAG. Pour en savoir plus, reportez-vous à la section Tester les DAG.
  • Assurez-vous que votre compte dispose d'autorisations suffisantes pour gérer les DAG.
  • Les modifications apportées au DAG se propagent à Airflow en 3 à 5 minutes. Vous pouvez consulter l'état de la tâche dans l'interface Web Airflow.

Accéder au bucket de votre environnement

Pour accéder au bucket associé à votre environnement, procédez comme suit:

Console

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement et, dans la colonne Dossier des DAG, cliquez sur le lien DAG. La page Informations sur le bucket s'ouvre. Elle affiche le contenu du dossier /dags du bucket de votre environnement.

gcloud

La gcloud CLI dispose de commandes distinctes pour ajouter et supprimer des DAG dans le bucket de votre environnement.

Si vous souhaitez interagir avec le bucket de votre environnement, vous pouvez également utiliser l'outil de ligne de commande gsutil. Pour obtenir l'adresse du bucket de votre environnement, exécutez la commande gcloud CLI suivante:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.

Exemple :

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Rédigez une requête API environments.get. Dans la ressource Environment, dans la ressource EnvironmentConfig, dans la ressource dagGcsPrefix, vous trouverez l'adresse du bucket de votre environnement.

Exemple :

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Utilisez la bibliothèque google-auth pour obtenir les identifiants et utilisez la bibliothèque requests pour appeler l'API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Ajouter ou mettre à jour un DAG

Pour ajouter ou mettre à jour un DAG, déplacez le fichier Python .py du DAG vers le dossier /dags du bucket de l'environnement.

Console

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement et, dans la colonne Dossier des DAG, cliquez sur le lien DAG. La page Informations sur le bucket s'ouvre. Elle affiche le contenu du dossier /dags du bucket de votre environnement.

  3. Cliquez sur Importer des fichiers. Sélectionnez ensuite le fichier Python .py pour le DAG à l'aide de la boîte de dialogue du navigateur, puis confirmez.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • LOCAL_FILE_TO_UPLOAD est le fichier Python .py du DAG.

Exemple :

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Mettre à jour un DAG contenant des exécutions de DAG actives

Si vous mettez à jour un DAG contenant des exécutions DAG actives :

  • Toutes les tâches en cours d'exécution finissent par utiliser le fichier DAG d'origine.
  • Toutes les tâches planifiées, mais qui ne sont pas en cours d'exécution, utiliseront le fichier DAG mis à jour.
  • Toutes les tâches qui ne sont plus présentes dans le fichier DAG mis à jour sont marquées comme supprimées.

Mettre à jour des DAG qui s'exécutent régulièrement

Après l'importation d'un fichier DAG, Airflow peut mettre un certain temps à charger ce fichier et à mettre à jour le DAG. Si votre DAG s'exécute fréquemment, vous pouvez vous assurer qu'il utilise la version mise à jour du fichier DAG. Pour ce faire :

  1. Suspendez le DAG dans l'interface utilisateur d'Airflow.
  2. Importez un fichier DAG mis à jour.
  3. Attendez que les mises à jour s'affichent dans l'interface utilisateur d'Airflow. Cela signifie que le DAG a été correctement analysé par le programmeur et mis à jour dans la base de données Airflow.

    Si l'interface utilisateur d'Airflow affiche les DAG mis à jour, cela ne garantit pas qu'ils utilisent bien la version mise à jour du fichier DAG. Ce comportement est dû au fait que les fichiers DAG sont synchronisés indépendamment pour les programmeurs et les nœuds de calcul.

  4. Vous pouvez prolonger le temps d'attente pour vous assurer que le fichier DAG est synchronisé avec tous les nœuds de calcul de votre environnement. La synchronisation a lieu plusieurs fois par minute. Dans un environnement qui fonctionne normalement, il suffit d'attendre 20 à 30 secondes pour que tous les nœuds de calcul soient synchronisés.

  5. (Facultatif) Pour être certain que tous les nœuds de calcul disposent de la nouvelle version du fichier DAG, consultez les journaux de chaque nœud de calcul. Pour ce faire :

    1. Ouvrez l'onglet Journaux de votre environnement dans la console Google Cloud.

    2. Accédez à l'élément Journaux Composer > Infrastructure > Synchronisation Cloud Storage et inspectez les journaux de chaque nœud de calcul de votre environnement. Recherchez l'élément de journal Syncing dags directory le plus récent doté d'un horodatage postérieur à l'importation du nouveau fichier DAG. Si Finished syncing s'affiche après l'élément de journal, les DAG sont correctement synchronisés sur ce nœud de calcul.

  6. Relancez le DAG.

Supprimer un DAG

Cette section explique comment supprimer un DAG.

Supprimer un DAG dans votre environnement

Pour supprimer un DAG, supprimez le fichier Python .py du DAG du dossier /dags de l'environnement, dans le bucket de votre environnement.

Console

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement et, dans la colonne Dossier des DAG, cliquez sur le lien DAG. La page Informations sur le bucket s'ouvre. Elle affiche le contenu du dossier /dags du bucket de votre environnement.

  3. Sélectionnez le fichier DAG, cliquez sur Supprimer, puis confirmez l'opération.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_FILE par le fichier Python .py pour le DAG.

Exemple :

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Supprimer un DAG de l'interface utilisateur Airflow

Pour supprimer les métadonnées d'un DAG de l'interface Web Airflow, procédez comme suit:

Interface utilisateur d'Airflow

  1. Accédez à l'interface utilisateur d'Airflow pour votre environnement.
  2. Pour le DAG, cliquez sur Supprimer le DAG.

gcloud

Exécutez la commande suivante dans la gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_NAME est le nom du DAG à supprimer.

Étapes suivantes