Tester, synchroniser et déployer vos DAG à partir de GitHub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Ce guide explique comment créer un pipeline CI/CD pour tester, synchroniser et déployer des DAG dans votre environnement Cloud Composer à partir de votre un dépôt de clés.

Si vous souhaitez synchroniser uniquement les données d'autres services, consultez Transférer des données provenant d'autres services

Présentation du pipeline CI/CD

Schéma de l'architecture illustrant les étapes du flux. Le préenvoi et l'examen des demandes d'extraction se trouvent dans la section GitHub, tandis que la synchronisation du DAG et la validation manuelle du DAG se trouvent dans la section Google Cloud.
Figure 1. Schéma de l'architecture illustrant les étapes du flux (cliquez pour agrandir)

Le pipeline CI/CD qui teste, synchronise et déploie les DAG possède procédez comme suit:

  1. Vous apportez une modification à un DAG et vous la déployez dans une branche de développement. dans votre dépôt.

  2. Vous ouvrez une demande d'extraction sur la branche principale de votre dépôt.

  3. Cloud Build exécute des tests unitaires pour vérifier la validité de votre DAG.

  4. Votre demande d'extraction a été approuvée et fusionnée avec la branche principale de votre un dépôt de clés.

  5. Cloud Build synchronise votre l'environnement Cloud Composer.

  6. Vous vérifierez que le DAG se comporte comme prévu dans votre développement environnement.

  7. Si votre DAG fonctionne comme prévu, vous devez l'importer dans votre environnement de production Cloud Composer.

Objectifs

Avant de commencer

  • Ce guide part du principe que vous travaillez avec Environnements Cloud Composer: un environnement de développement et environnement de production.

    Dans le cadre de ce guide, vous ne configurez qu'un pipeline CI/CD pour votre environnement de développement. Assurez-vous que l'environnement que vous utilisez n'est pas un environnement de production.

  • Dans ce guide, nous partons du principe que vos DAG et leurs tests sont stockés dans un dépôt GitHub un dépôt de clés.

    La exemple de pipeline CI/CD illustre le contenu d'un exemple de dépôt. Les DAG et les tests sont stockées dans le répertoire dags/, avec des fichiers d'exigences, les contraintes et les fichiers de configuration Cloud Build stockés au premier niveau. L'utilitaire de synchronisation des DAG et ses exigences se trouvent dans le Répertoire utils.

Créer une tâche de vérification avant envoi et des tests unitaires

Le premier job Cloud Build exécute une vérification avant envoi, qui exécute les unités pour vos DAG.

Ajouter des tests unitaires

Si ce n'est pas déjà fait, tests unitaires pour vos DAG. Enregistrez ces tests en même temps que Graphes orientés acycliques (DAG) de votre dépôt, chacun portant le suffixe _test. Par exemple, le test du DAG dans example_dag.py est example_dag_test.py. Ce sont les qui s'exécutent en tant que contrôle avant envoi dans votre dépôt.

Créer une configuration YAML Cloud Build pour la vérification avant envoi

Dans votre dépôt, créez un fichier YAML nommé test-dags.cloudbuild.yaml qui configure votre job Cloud Build pour les vérifications avant envoi. On y trouve en trois étapes:

  1. Installez les dépendances nécessaires à vos DAG.
  2. Installez les dépendances nécessaires à vos tests unitaires.
  3. Exécuter les tests du DAG

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Créer le déclencheur Cloud Build pour la vérification avant envoi

Suivez les instructions pour créer des dépôts à partir de GitHub. pour créer un déclencheur basé sur une application GitHub avec les configurations suivantes:

  • Nom : test-dags

  • Événement: demande d'extraction

  • Source – Dépôt: choisissez votre dépôt.

  • Source : branche de base : ^main$ (remplacez main par le nom de votre branche de base du dépôt, si nécessaire)

  • Source – Contrôle des commentaires: non requis

  • Configuration de la compilation – Fichier de configuration Cloud Build: /test-dags.cloudbuild.yaml (chemin d'accès à votre fichier de compilation)

Créer un job de synchronisation des DAG et ajouter le script utilitaire des DAG

Configurez ensuite un job Cloud Build qui exécute un script utilitaire dédié aux DAG. La script utilitaire de ce job synchronise vos DAG avec votre Environnement Cloud Composer après leur fusion avec la branche principale dans votre dépôt.

Ajouter le script de l'utilitaire des DAG

Ajoutez le script de l'utilitaire DAG à votre dépôt. Ce script utilitaire copie tous les fichiers DAG dans le répertoire dags/ de votre vers un répertoire temporaire, en ignorant tous les fichiers Python non DAG. La utilise ensuite la bibliothèque cliente Cloud Storage pour importer tous les fichiers de ce répertoire temporaire vers le répertoire dags/ dans votre bucket de l'environnement Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Créer une configuration YAML Cloud Build pour synchroniser les DAG

Dans votre dépôt, créez un fichier YAML nommé add-dags-to-composer.cloudbuild.yaml qui configure votre instance Cloud Build de synchronisation des DAG. Il comporte deux étapes:

  1. Installez les dépendances requises par le script de l'utilitaire des DAG.

  2. Exécutez le script de l'utilitaire pour synchroniser les DAG de votre dépôt avec votre Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Créer le déclencheur Cloud Build

Suivez les instructions pour créer des dépôts à partir de GitHub. pour créer un déclencheur basé sur une application GitHub avec les configurations suivantes:

  • Nom : add-dags-to-composer

  • Événement: Déployer sur une branche

  • Source – Dépôt: choisissez votre dépôt.

  • Source : branche de base : ^main$ (remplacez main par le nom de votre branche de base du dépôt, si nécessaire)

  • Source – Filtre de fichiers inclus (glob): dags/**

  • Configuration de la compilation – Fichier de configuration Cloud Build: /add-dags-to-composer.cloudbuild.yaml (chemin d'accès à votre fichier de compilation)

Dans "Configuration avancée", ajoutez Variables de substitution:

  • _DAGS_DIRECTORY : répertoire dans lequel se trouvent les DAG dans votre dépôt. Si vous utilisez l'exemple de dépôt de ce guide, il s'agit de dags/.

  • _DAGS_BUCKET : bucket Cloud Storage contenant le Répertoire dags/ de votre environnement Cloud Composer de développement environnement. Omettez le préfixe gs://. Par exemple, us-central1-example-env-1234ab56-bucket.

Tester votre pipeline CI/CD

Dans cette section, vous allez suivre un flux de développement de DAG qui utilise vos nouvelles les déclencheurs Cloud Build créés.

Exécuter un job de pré-envoi

Créez une demande d'extraction vers votre branche principale pour tester votre build. Localisez votre sur la page. Cliquez sur Détails et sélectionnez Affichez plus d'informations sur Google Cloud Build pour consulter vos journaux de compilation dans le console Google Cloud.

<ph type="x-smartling-placeholder">
</ph> Capture d&#39;écran d&#39;une vérification GitHub appelée test-dags avec une flèche rouge pointant vers le nom du projet entre parenthèses
Figure 2 : Capture d'écran de l'état de la vérification avant l'envoi de Cloud Build sur GitHub (cliquez pour agrandir)

Si la vérification avant l'envoi a échoué, consultez Résoudre les échecs de compilation.

Vérifier que votre DAG fonctionne dans votre environnement de développement Cloud Composer

Une fois votre demande d'extraction approuvée, fusionnez-la avec votre branche principale. Utilisez les la console Google Cloud pour afficher les résultats de la compilation. Si vous avez de nombreux les déclencheurs Cloud Build, vous pouvez filtrer vos compilations en fonction de leur nom add-dags-to-composer

Une fois la tâche de synchronisation Cloud Build réussie, le DAG synchronisé s'affiche dans votre environnement Cloud Composer de développement. Vous pouvez alors vérifier que le DAG fonctionne comme prévu.

Ajouter le DAG à votre environnement de production

Une fois que le DAG fonctionne comme prévu, ajoutez-le manuellement à votre environnement de production environnement. Pour ce faire, importer le fichier DAG vers le répertoire dags/ de votre environnement Cloud Composer de production bucket de l'environnement.

Si votre job de synchronisation de DAG a échoué ou si votre DAG ne se comporte pas comme prévu dans votre dans l'environnement Cloud Composer de développement, consultez Corriger les échecs de compilation

Résoudre les échecs de compilation

Cette section explique comment résoudre les scénarios courants d'échec de compilation.

Que faire si la vérification avant l'envoi a échoué ?

À partir de votre demande d'extraction, cliquez sur Détails et sélectionnez Affichez plus d'informations sur Google Cloud Build pour consulter vos journaux de compilation dans le console Google Cloud. Ces journaux peuvent vous aider à résoudre le problème lié à votre DAG. Une fois les problèmes résolus, validez la correction et déployez-la ou une autre branche. La vérification avant l'envoi est de nouveau exécutée, et vous pouvez continuer à itérer en utilisant les journaux comme outil de débogage.

Que faire si ma tâche de synchronisation du DAG a échoué ?

Utilisez la console Google Cloud pour : afficher les résultats de la compilation. Si vous avez de nombreux les déclencheurs Cloud Build, vous pouvez filtrer vos compilations en fonction de leur nom add-dags-to-composer Examinez les journaux du job de compilation et résolvez le problème les erreurs. Si vous avez besoin d'une aide supplémentaire pour résoudre les erreurs, utilisez canaux d'assistance.

Que faire si mon DAG ne fonctionne pas correctement dans mon environnement Cloud Composer ?

Si votre DAG ne fonctionne pas comme prévu dans votre processus de développement dans l'environnement Cloud Composer, ne promouvez pas manuellement le DAG l'environnement Cloud Composer de production. Effectuez plutôt l'une des opérations suivantes :

  • Annuler la demande d'extraction avec les modifications qui ont interrompu votre DAG afin de le restaurer à l'état immédiatement avant vos modifications (cela annule également tous les autres fichiers inclus dans cette demande d'extraction).
  • Créez une demande d'extraction pour annuler manuellement les modifications apportées au DAG défectueux.
  • Créez une demande d'extraction pour corriger les erreurs dans votre DAG.

L'exécution de l'une de ces étapes déclenche une nouvelle vérification avant l'envoi. Lors de la fusion, le job de synchronisation du DAG.

Étape suivante