Tester, synchroniser et déployer vos DAG depuis GitHub

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Ce guide explique comment créer un pipeline CI/CD pour tester, synchroniser et déployer des DAG dans votre environnement Cloud Composer à partir de votre dépôt GitHub.

Si vous souhaitez uniquement synchroniser les données d'autres services, consultez Transférer des données depuis d'autres services.

Présentation du pipeline CI/CD

Diagramme d'architecture montrant les étapes du flux. La vérification avant commit et l'examen des demandes de pull se trouvent dans la section GitHub, tandis que la synchronisation des DAG et la vérification manuelle des DAG se trouvent dans la section Google Cloud .
Figure 1. Schéma de l'architecture montrant les étapes du flux (cliquez pour agrandir)

Le pipeline CI/CD permettant de tester, de synchroniser et de déployer des DAG comprend les étapes suivantes :

  1. Vous apportez une modification à un DAG et la transférez vers une branche de développement dans votre dépôt.

  2. Vous ouvrez une demande d'extraction pour la branche principale de votre dépôt.

  3. Cloud Build exécute des tests unitaires pour vérifier que votre DAG est valide.

  4. Votre demande d'extraction est approuvée et fusionnée dans la branche principale de votre dépôt.

  5. Cloud Build synchronise votre environnement de développement Cloud Composer avec ces nouvelles modifications.

  6. Vérifiez que le DAG se comporte comme prévu dans votre environnement de développement.

  7. Si votre DAG fonctionne comme prévu, importez-le dans votre environnement Cloud Composer de production.

Objectifs

Avant de commencer

  • Ce guide part du principe que vous travaillez avec deux environnements Cloud Composer identiques : un environnement de développement et un environnement de production.

    Dans ce guide, vous ne configurez un pipeline CI/CD que pour votre environnement de développement. Assurez-vous que l'environnement que vous utilisez n'est pas un environnement de production.

  • Ce guide suppose que vos DAG et leurs tests sont stockés dans un dépôt GitHub.

    L'exemple de pipeline CI/CD montre le contenu d'un dépôt exemple. Les DAG et les tests sont stockés dans le répertoire dags/, avec les fichiers d'exigences, le fichier de contraintes et les fichiers de configuration Cloud Build stockés au niveau supérieur. L'utilitaire de synchronisation des DAG et ses exigences se trouvent dans le répertoire utils.

Créer une tâche de vérification avant l'envoi et des tests unitaires

La première tâche Cloud Build exécute une vérification avant l'envoi, qui exécute des tests unitaires pour vos DAG.

Ajouter des tests unitaires

Si vous ne l'avez pas déjà fait, créez des tests unitaires pour vos DAG. Enregistrez ces tests à côté des DAG dans votre dépôt, chacun avec le suffixe _test. Par exemple, le fichier de test du DAG dans example_dag.py est example_dag_test.py. Il s'agit des tests qui s'exécutent en tant que vérification de pré-envoi dans votre dépôt.

Créer une configuration YAML Cloud Build pour la vérification avant l'envoi

Dans votre dépôt, créez un fichier YAML nommé test-dags.cloudbuild.yaml qui configure votre job Cloud Build pour les vérifications avant l'envoi. Il comporte trois étapes :

  1. Installez les dépendances requises par vos DAG.
  2. Installez les dépendances nécessaires à vos tests unitaires.
  3. Exécutez les tests du DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Créer le déclencheur Cloud Build pour la vérification avant l'envoi

Suivez le guide Créer des dépôts à partir de GitHub pour créer un déclencheur basé sur une application GitHub avec les configurations suivantes :

  • Nom : test-dags

  • Événement : demande d'extraction

  • Source : dépôt : sélectionnez votre dépôt.

  • Source : branche de base : ^main$ (remplacez main par le nom de la branche de base de votre dépôt, si nécessaire)

  • Source : contrôle des commentaires non requis

  • Configuration de compilation : fichier de configuration Cloud Build : /test-dags.cloudbuild.yaml (chemin d'accès à votre fichier de compilation)

Créer un job de synchronisation de DAG et ajouter un script utilitaire de DAG

Ensuite, configurez un job Cloud Build qui exécute un script utilitaire DAG. Le script utilitaire de ce job synchronise vos DAG avec votre environnement Cloud Composer une fois qu'ils ont été fusionnés dans la branche principale de votre dépôt.

Ajouter le script utilitaire DAG

Ajoutez le script utilitaire DAG à votre dépôt. Ce script utilitaire copie tous les fichiers DAG du répertoire dags/ de votre dépôt dans un répertoire temporaire, en ignorant tous les fichiers Python non DAG. Le script utilise ensuite la bibliothèque cliente Cloud Storage pour importer tous les fichiers de ce répertoire temporaire dans le répertoire dags/ du bucket de votre environnement Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Créer une configuration YAML Cloud Build pour synchroniser les DAG

Dans votre dépôt, créez un fichier YAML nommé add-dags-to-composer.cloudbuild.yaml qui configure votre job Cloud Build pour la synchronisation des DAG. Il comporte deux étapes :

  1. Installez les dépendances nécessaires au script utilitaire DAG.

  2. Exécutez le script utilitaire pour synchroniser les DAG de votre dépôt avec votre environnement Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Créer le déclencheur Cloud Build

Suivez le guide Créer des dépôts à partir de GitHub pour créer un déclencheur basé sur une application GitHub avec les configurations suivantes :

  • Nom : add-dags-to-composer

  • Événement : Déployer sur une branche

  • Source : dépôt : sélectionnez votre dépôt.

  • Source : branche de base : ^main$ (remplacez main par le nom de la branche de base de votre dépôt, si nécessaire)

  • Source : filtre par fichiers inclus (glob) : dags/**

  • Configuration de compilation : fichier de configuration Cloud Build : /add-dags-to-composer.cloudbuild.yaml (chemin d'accès à votre fichier de compilation)

Dans la configuration avancée, ajoutez deux variables de substitution :

  • _DAGS_DIRECTORY : répertoire dans lequel se trouvent les DAG dans votre dépôt. Si vous utilisez l'exemple de dépôt de ce guide, il s'agit de dags/.

  • _DAGS_BUCKET : bucket Cloud Storage contenant le répertoire dags/ dans votre environnement Cloud Composer de développement. Omettez le préfixe gs://. Par exemple, us-central1-example-env-1234ab56-bucket.

Tester votre pipeline CI/CD

Dans cette section, suivez un workflow de développement de DAG qui utilise les déclencheurs Cloud Build que vous venez de créer.

Exécuter un job de pré-commit

Créez une demande d'extraction vers votre branche principale pour tester votre compilation. Localisez votre vérification avant l'envoi sur la page. Cliquez sur Détails, puis sélectionnez Afficher plus d'informations sur Google Cloud Build pour afficher les journaux de compilation dans la consoleGoogle Cloud .

Capture d'écran d'une vérification GitHub appelée "test-dags" avec une flèche rouge pointant vers le nom du projet entre parenthèses
Figure 2. Capture d'écran de l'état de la vérification avant envoi Cloud Build sur GitHub (cliquez pour agrandir)

Si votre vérification avant envoi a échoué, consultez Résoudre les échecs de compilation.

Valider le fonctionnement de votre DAG dans votre environnement de développement Cloud Composer

Une fois votre demande d'extraction approuvée, fusionnez-la avec votre branche principale. Utilisez la consoleGoogle Cloud pour afficher les résultats de votre compilation. Si vous disposez de nombreux déclencheurs Cloud Build, vous pouvez filtrer vos compilations sur le nom du déclencheur add-dags-to-composer.

Une fois le job de synchronisation Cloud Build réussi, le DAG synchronisé s'affiche dans votre environnement de développement Cloud Composer. Vous pourrez y vérifier que le DAG fonctionne comme prévu.

Ajouter le DAG à votre environnement de production

Une fois que le DAG fonctionne comme prévu, ajoutez-le manuellement à votre environnement de production. Pour ce faire, importez le fichier DAG dans le répertoire dags/ du bucket de votre environnement Cloud Composer de production.

Si votre tâche de synchronisation de DAG a échoué ou si votre DAG ne se comporte pas comme prévu dans votre environnement de développement Cloud Composer, consultez Résoudre les échecs de compilation.

Résoudre les échecs de compilation

Cette section explique comment résoudre les scénarios d'échec de compilation courants.

Que faire si ma vérification avant envoi a échoué ?

Dans votre demande d'extraction, cliquez sur Détails, puis sélectionnez Afficher plus de détails sur Google Cloud Build pour afficher les journaux de compilation dans la consoleGoogle Cloud . Utilisez ces journaux pour vous aider à déboguer le problème lié à votre DAG. Une fois les problèmes résolus, validez la correction et envoyez-la à votre branche. La vérification avant envoi s'exécute à nouveau. Vous pouvez continuer à itérer en utilisant les journaux comme outil de débogage.

Que faire si mon job de synchronisation DAG a échoué ?

Utilisez la console Google Cloud pour afficher les résultats de votre compilation. Si vous disposez de nombreux déclencheurs Cloud Build, vous pouvez filtrer vos compilations sur le nom du déclencheur add-dags-to-composer. Examinez les journaux du job de compilation et corrigez les erreurs. Si vous avez besoin d'aide supplémentaire pour résoudre les erreurs, utilisez les canaux d'assistance.

Que faire si mon DAG ne fonctionne pas correctement dans mon environnement Cloud Composer ?

Si votre DAG ne fonctionne pas comme prévu dans votre environnement de développement Cloud Composer, ne le promouvez pas manuellement dans votre environnement de production Cloud Composer. Effectuez plutôt l'une des opérations suivantes :

  • Rétablissez la requête d'extraction avec les modifications qui ont cassé votre DAG pour le restaurer à l'état immédiatement antérieur à vos modifications (cela rétablit également tous les autres fichiers de cette demande d'extraction d'extraction).
  • Créez une demande d'extraction pour rétablir manuellement les modifications apportées au DAG défectueux.
  • Créez une demande d'extraction pour corriger les erreurs dans votre DAG.

Chacune de ces étapes déclenche une nouvelle vérification avant l'envoi et, lors de la fusion, le job de synchronisation du DAG.

Étapes suivantes