Ajouter et mettre à jour des DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment gérer les DAG dans Cloud Composer environnement.

Cloud Composer utilise un bucket Cloud Storage pour stocker les DAG de votre environnement Cloud Composer. Votre environnement synchronise les DAG de ce bucket avec les composants Airflow tels que les nœuds de calcul et les programmeurs Airflow.

Avant de commencer

  • Étant donné qu'Apache Airflow n'offre pas une forte isolation DAG, nous vous recommandons de maintenir des environnements de production et de test distincts afin d'éviter les interférences au niveau des DAG. Pour en savoir plus, reportez-vous à la section Tester les DAG.
  • Assurez-vous que votre compte dispose d'autorisations suffisantes pour gérer les DAG.
  • Les modifications apportées au DAG sont propagées vers Airflow dans un délai de 3 à 5 minutes. Vous pouvez voir qu'il s'agit dans l'interface Web Airflow.

Accéder au bucket de votre environnement

Pour accéder au bucket associé à votre environnement :

Console

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne avec le nom de votre environnement. Dans la colonne Dossier des DAG, cliquez sur le lien DAG. La page Informations sur le bucket s'ouvre. Il affiche le contenu du dossier /dags dans le bucket de votre environnement.

gcloud

La gcloud CLI propose des commandes distinctes ajouter et supprimer des DAG dans le bucket de votre environnement.

Si vous souhaitez interagir avec le bucket de votre environnement, vous pouvez également utiliser la Google Cloud CLI. Pour obtenir l'adresse du bucket de votre environnement, exécutez la commande suivante de la CLI gcloud :

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.

Exemple :

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Rédigez une requête API environments.get. Dans la ressource Environment, dans la ressource EnvironmentConfig, dans la ressource dagGcsPrefix, vous trouverez l'adresse du bucket de votre environnement.

Exemple :

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Utilisez la bibliothèque google-auth pour obtenir des identifiants et la bibliothèque requests pour appeler l'API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Ajouter ou mettre à jour un DAG

Pour ajouter ou mettre à jour un DAG, déplacez le fichier Python .py du DAG vers dans le dossier /dags du bucket de l'environnement.

Console

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement, puis dans la colonne Dossier des DAG, cliquez sur le lien DAG. La page Informations sur le bucket s'ouvre. Il affiche le contenu du dossier /dags dans le bucket de votre environnement.

  3. Cliquez sur Importer des fichiers. Sélectionnez ensuite le fichier Python .py pour le DAG à l'aide de la boîte de dialogue du navigateur, puis confirmez.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • LOCAL_FILE_TO_UPLOAD est le fichier .py Python du DAG.

Exemple :

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Mettre à jour un DAG contenant des exécutions de DAG actives

Si vous mettez à jour un DAG contenant des exécutions DAG actives :

  • Toutes les tâches en cours d'exécution finissent par utiliser le fichier DAG d'origine.
  • Toutes les tâches planifiées, mais qui ne sont pas en cours d'exécution, utiliseront le fichier DAG mis à jour.
  • Toutes les tâches qui ne figurent plus dans le fichier DAG mis à jour sont marquées comme supprimés.

Mettre à jour les DAG qui s'exécutent fréquemment

Après l'importation d'un fichier DAG, Airflow met un certain temps à le charger et mettre à jour le DAG. Si votre DAG s'exécute fréquemment, vous pouvez pour s'assurer que le DAG utilise la version mise à jour du fichier DAG. Pour ce faire :

  1. Suspendez le DAG dans l'interface utilisateur d'Airflow.

  2. Importez un fichier DAG mis à jour.

  3. Attendez que les mises à jour s'affichent dans l'interface utilisateur d'Airflow. Cela signifie que le DAG a été correctement analysé par le programmeur et mis à jour dans la base de données Airflow.

    Si l'interface utilisateur d'Airflow affiche les DAG mis à jour, cela ne garantit pas qu'ils utilisent bien la version mise à jour du fichier DAG. Ce comportement est dû au fait que les fichiers DAG sont synchronisés indépendamment pour les programmeurs et les nœuds de calcul.

  4. Vous pouvez prolonger le temps d'attente pour vous assurer que le fichier DAG est synchronisé avec tous les nœuds de calcul de votre environnement. La synchronisation a lieu plusieurs fois par minute. Dans un environnement qui fonctionne normalement, il suffit d'attendre 20 à 30 secondes pour que tous les nœuds de calcul soient synchronisés.

  5. (Facultatif) Pour être certain que tous les nœuds de calcul disposent de la nouvelle version du fichier DAG, consultez les journaux de chaque nœud de calcul. Pour ce faire :

    1. Ouvrez l'onglet Journaux de votre environnement dans la console Google Cloud.

    2. Accédez à Journaux Composer > Infrastructure > de synchronisation Cloud Storage, et inspectez les journaux de chaque nœud de calcul dans votre environnement. Recherchez le journal Syncing dags directory le plus récent avec un code temporel après l'importation du nouveau fichier DAG. Si Finished syncing s'affiche après l'élément de journal, les DAG sont correctement synchronisés sur ce nœud de calcul.

  6. Relancez le DAG.

Supprimer un DAG dans votre environnement

Pour supprimer un DAG, supprimez le fichier Python .py du DAG du /dags dans le bucket de votre environnement.

Console

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne avec le nom de votre environnement. Dans la colonne Dossier des DAG, cliquez sur le lien DAG. La La page Informations sur le bucket s'ouvre. Il affiche le contenu du dossier /dags dans le bucket de votre environnement.

  3. Sélectionnez le fichier DAG, cliquez sur Supprimer, puis confirmez l'opération.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_FILE par le fichier Python .py pour le DAG ;

Exemple :

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Supprimer un DAG de l'interface utilisateur d'Airflow

Pour supprimer les métadonnées d'un DAG à partir de l'interface Web Airflow, procédez comme suit:

Interface utilisateur d'Airflow

  1. Accédez à l'interface utilisateur d'Airflow pour votre environnement.
  2. Pour le DAG, cliquez sur Supprimer le DAG.

gcloud

Exécutez la commande suivante dans la gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_NAME est le nom du DAG à supprimer.

Étape suivante