Ajouter et mettre à jour des DAG

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment gérer les DAG dans votre environnement Cloud Composer.

Cloud Composer utilise un bucket Cloud Storage pour stocker les DAG de votre environnement Cloud Composer. Votre environnement synchronise lesDAG de ce bucket avec des composants Airflow tels que les nœuds de calcul et les programmeurs Airflow.

Avant de commencer

  • Étant donné qu'Apache Airflow n'offre pas une forte isolation DAG, nous vous recommandons de maintenir des environnements de production et de test distincts afin d'éviter les interférences au niveau des DAG. Pour en savoir plus, reportez-vous à la section Tester les DAG.
  • Assurez-vous que votre compte dispose d'autorisations suffisantes pour gérer les DAG.
  • Les modifications apportées au DAG se produisent dans Airflow sous trois à cinq minutes. Vous pouvez consulter l'état des tâches dans l'interface Web Airflow.

Accéder au bucket de votre environnement

Pour accéder au bucket associé à votre environnement:

Console

  1. Dans Google Cloud Console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne comportant le nom de votre environnement et, dans la colonne DAGs folder, cliquez sur le lien DAGs (DAG). La page Bucket details (Informations sur le bucket) s'ouvre. Elle affiche le contenu du dossier /dags dans le bucket de votre environnement.

gcloud

La CLI gcloud dispose de commandes distinctes pour ajouter et supprimer des DAG dans votre bucket d'environnement.

Pour interagir avec le bucket de votre environnement, vous pouvez également utiliser l'outil de ligne de commande gsutil. Pour obtenir l'adresse du bucket de votre environnement, exécutez la commande CLI gcloud suivante:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.

Exemple :

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Rédigez une requête API environments.get. Dans la ressource Environment, dans la ressource EnvironmentConfig, dans celle-ci, dagGcsPrefixest l'adresse de votre environnement.

Exemple :

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Utilisez la bibliothèque google-auth pour obtenir des identifiants, et la bibliothèque requests pour appeler l'API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Ajouter ou mettre à jour un DAG

Pour ajouter ou mettre à jour un DAG, déplacez le fichier Python .py de ce DAG vers le dossier /dags du bucket de l'environnement.

Console

  1. Dans Google Cloud Console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne comportant le nom de votre environnement et, dans la colonne DAGs folder, cliquez sur le lien DAGs (DAG). La page Bucket details (Informations sur le bucket) s'ouvre. Elle affiche le contenu du dossier /dags dans le bucket de votre environnement.

  3. Cliquez sur Upload files (Importer des fichiers). Sélectionnez ensuite le fichier Python .py pour le DAG à l'aide de la boîte de dialogue du navigateur, puis confirmez.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • LOCAL_FILE_TO_UPLOAD est le fichier Python .py pour le DAG.

Exemple :

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Mettre à jour un DAG contenant des exécutions de DAG actives

Si vous mettez à jour un DAG contenant des exécutions DAG actives :

  • Toutes les tâches en cours d'exécution finissent par utiliser le fichier DAG d'origine.
  • Toutes les tâches planifiées, mais qui ne sont pas en cours d'exécution, utiliseront le fichier DAG mis à jour.
  • Toutes les tâches qui ne sont plus présentes dans le fichier DAG mis à jour sont marquées comme supprimées.

Mettre à jour des DAG qui s'exécutent régulièrement

Une fois que vous avez importé un fichier DAG, Airflow peut se charger et mettre à jour le DAG. Si votre DAG s'exécute fréquemment, vous pouvez vous assurer qu'il utilise la version mise à jour du fichier DAG. Pour ce faire :

  1. Suspendez le DAG dans l'interface utilisateur d'Airflow.
  2. Importez un fichier DAG mis à jour.
  3. Attendez que les mises à jour s'affichent dans l'interface utilisateur d'Airflow. Cela signifie que le DAG a été correctement analysé par le programmeur et mis à jour dans la base de données Airflow.

    Si l'interface utilisateur d'Airflow affiche les DAG mis à jour, cela ne garantit pas qu'ils utilisent bien la version mise à jour du fichier DAG. Ce comportement est dû au fait que les fichiers DAG sont synchronisés indépendamment pour les programmeurs et les nœuds de calcul.

  4. Vous pouvez prolonger le temps d'attente pour vous assurer que le fichier DAG est synchronisé avec tous les nœuds de calcul de votre environnement. La synchronisation a lieu plusieurs fois par minute. Dans un environnement qui fonctionne normalement, il suffit d'attendre 20 à 30 secondes pour que tous les nœuds de calcul soient synchronisés.

  5. (Facultatif) Pour être certain que tous les nœuds de calcul disposent de la nouvelle version du fichier DAG, consultez les journaux de chaque nœud de calcul. Pour ce faire :

    1. Ouvrez l'onglet Journaux de votre environnement dans Google Cloud Console.

    2. Accédez à l'élément Journaux Composer > Infrastructure > Synchronisation Cloud Storage et inspectez les journaux de chaque nœud de calcul de votre environnement. Recherchez l'élément de journal Syncing dags directory le plus récent doté d'un horodatage postérieur à l'importation du nouveau fichier DAG. Si Finished syncing s'affiche après l'élément de journal, les DAG sont correctement synchronisés sur ce nœud de calcul.

  6. Relancez le DAG.

Supprimer un DAG

Cette section explique comment supprimer un DAG.

Supprimer un DAG dans votre environnement

Pour supprimer un DAG, supprimez le fichier Python .py du DAG situé dans le dossier /dags de l'environnement de votre environnement.

Console

  1. Dans Google Cloud Console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne comportant le nom de votre environnement et, dans la colonne DAGs folder, cliquez sur le lien DAGs (DAG). La page Bucket details (Informations sur le bucket) s'ouvre. Elle affiche le contenu du dossier /dags dans votre bucket d'environnement.

  3. Sélectionnez le fichier DAG, cliquez sur Supprimer, puis confirmez l'opération.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_FILE par le fichier Python .py pour le DAG.

Exemple :

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Supprimer un DAG de l'interface utilisateur Airflow

Pour supprimer les métadonnées d'un DAG à partir de l'interface Web Airflow:

Interface utilisateur d'Airflow

  1. Accédez à l'interface utilisateur Airflow pour votre environnement.
  2. Pour le DAG, cliquez sur Supprimer le DAG.

gcloud

Exécutez la commande suivante dans la CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région où se trouve l'environnement.
  • DAG_NAME est le nom du DAG à supprimer.

Étapes suivantes