Prueba, sincroniza e implementa tus DAG desde GitHub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta guía, se explica cómo crear una canalización de CI/CD para probar, sincronizar y implementar DAG en tu entorno de Cloud Composer desde tu GitHub en un repositorio de confianza.

Si solo quieres sincronizar datos de otros servicios, consulta Cómo transferir datos de otros servicios.

Descripción general de la canalización de CI/CD

Diagrama de arquitectura en el que se muestran los pasos del flujo. La revisión previa al envío y la revisión de PR se encuentran en la sección de GitHub, y la sincronización de DAG y la verificación manual de DAG se encuentran en la sección de Google Cloud.
Figura 1. Diagrama de arquitectura que muestra los pasos del flujo (haz clic para ampliar)

La canalización de CI/CD que prueba, sincroniza e implementa DAG tiene la los siguientes pasos:

  1. Realizas un cambio en un DAG y lo envías a una rama de desarrollo en tu repositorio.

  2. Abres una solicitud de extracción en la rama principal de tu repositorio.

  3. Cloud Build ejecuta pruebas de unidades para verificar que tu DAG sea válido.

  4. Se aprobó tu solicitud de extracción y se fusionó en la rama principal de tu repositorio.

  5. Cloud Build sincroniza tu entorno de Cloud Composer de desarrollo con estos cambios nuevos.

  6. Verificas que el DAG se comporte como se espera en tu entorno de desarrollo.

  7. Si tu DAG funciona como se espera, súbela a tu entorno de producción de Cloud Composer.

Objetivos

Antes de comenzar

  • En esta guía, se da por sentado que trabajas con dos entornos de Cloud Composer idénticos: un entorno de desarrollo y un entorno de producción.

    A los efectos de esta guía, configurarás una canalización de CI/CD solo para tu entorno de desarrollo. Asegúrate de que el entorno que usas no es un entorno de producción.

  • En esta guía, se supone que tienes tus DAG y sus pruebas almacenados en un GitHub en un repositorio de confianza.

    El ejemplo de canalización de CI/CD muestra el contenido de un repositorio de ejemplo. Los DAG y las pruebas se almacenados en el directorio dags/, con los archivos de requisitos, las restricciones de configuración de Terraform y los archivos de configuración de Cloud Build almacenados en el nivel superior. La utilidad de sincronización de DAG y sus requisitos se encuentran en utils.

    Esta estructura se puede usar para Airflow 1, Airflow 2, Cloud Composer 1 y Cloud Composer 2.

Crea un trabajo de verificación previo al envío y pruebas de unidades

El primer trabajo de Cloud Build ejecuta una verificación previa al envío, que ejecuta la unidad pruebas para tus DAG.

Cómo agregar pruebas de unidades

Si aún no lo hiciste, crea pruebas de unidades para tus DAG. Guarda estas pruebas junto con los DAG en tu repositorio, cada uno con el sufijo _test. Por ejemplo, la prueba del DAG en example_dag.py es example_dag_test.py. Estas son las pruebas que se ejecutan como una verificación previa al envío en tu repositorio.

Crea la configuración de YAML de Cloud Build para la verificación previa al envío

En tu repositorio, crea un archivo YAML llamado test-dags.cloudbuild.yaml que configura tu trabajo de Cloud Build para las verificaciones del envío previo. En él hay tres pasos:

  1. Instala las dependencias que necesitan tus DAG.
  2. Instala las dependencias que necesitan tus pruebas de unidades.
  3. Ejecuta las pruebas de DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Crea el activador de Cloud Build para la verificación previa al envío

Sigue las instrucciones de la guía Cómo compilar repositorios desde GitHub para crear un activador basado en la app de GitHub con la siguiente configuración:

  • Nombre: test-dags

  • Evento: solicitud de extracción

  • Fuente: Repositorio: Elige tu repositorio.

  • Fuente: Rama base: ^main$ (cambia main al nombre de la rama base de tu repositorio, si es necesario)

  • Fuente (control de comentarios): no obligatorio

  • Configuración de compilación: Archivo de configuración de Cloud Build: /test-dags.cloudbuild.yaml (la ruta de acceso a tu archivo de compilación)

Crea un trabajo de sincronización de DAG y agrega una secuencia de comandos de utilidad de DAG

A continuación, configura un trabajo de Cloud Build que ejecute una secuencia de comandos de utilidad de DAGs. El de utilidad en este trabajo sincroniza tus DAG con tu El entorno de Cloud Composer después de combinarlos con la rama principal en tu repositorio.

Agrega la secuencia de comandos de utilidad de DAG

Agregar la secuencia de comandos de utilidad del DAG a tu repositorio Esta secuencia de comandos de utilidad copia todos los archivos DAG del directorio dags/ de tu en un directorio temporal, ignorando todos los archivos de Python que no sean DAG. Luego, la secuencia de comandos usa la biblioteca cliente de Cloud Storage para subir todos los archivos de ese directorio temporal al directorio dags/ en el bucket de tu entorno de Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Crea una configuración YAML de Cloud Build para sincronizar DAG

En tu repositorio, crea un archivo YAML llamado add-dags-to-composer.cloudbuild.yaml que configure tu trabajo de Cloud Build para sincronizar DAG. En él, hay dos pasos:

  1. Instala las dependencias que necesita la secuencia de comandos de la utilidad de DAG.

  2. Ejecuta la secuencia de comandos de utilidad para sincronizar los DAG de tu repositorio con tu entorno de Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Crea el activador de Cloud Build

Sigue los pasos que se indican en Compila repositorios desde GitHub. para crear un activador basado en la app de GitHub con los siguientes parámetros de configuración:

  • Nombre: add-dags-to-composer

  • Evento: Enviar a una rama

  • Fuente: Repositorio: Elige tu repositorio.

  • Fuente: Rama base: ^main$ (cambia main por el nombre de tu la rama base del repositorio, si es necesario)

  • Fuente: filtro de archivos incluidos (glob): dags/**

  • Configuración de compilación: Archivo de configuración de Cloud Build: /add-dags-to-composer.cloudbuild.yaml (la ruta de acceso al archivo de compilación)

En la Configuración avanzada, agrega dos variables de sustitución:

  • _DAGS_DIRECTORY: Es el directorio en el que se encuentran los DAG en tu repositorio. Si usas el repositorio de ejemplo de esta guía, es dags/.

  • _DAGS_BUCKET: Es el bucket de Cloud Storage que contiene la Directorio dags/ en tu Cloud Composer de desarrollo en un entorno de nube. Omite el prefijo gs://. Por ejemplo: us-central1-example-env-1234ab56-bucket

Prueba tu canalización de CI/CD

En esta sección, sigue un flujo de desarrollo de DAG que usa tu y creaste activadores de Cloud Build.

Ejecuta un trabajo de envío previo

Crea una solicitud de extracción en tu rama principal para probar la compilación. Ubica tu verificación previa al envío en la página. Haz clic en Detalles y elige Consulta más detalles en Google Cloud Build para ver los registros de tu compilación en la Consola de Google Cloud

Captura de pantalla de una verificación de GitHub llamada test-dags con una flecha roja que apunta al nombre del proyecto entre paréntesis
Figura 2. Captura de pantalla del estado de la verificación previa al envío de Cloud Build en GitHub (haz clic para ampliar)

Si falló la verificación del envío previo, consulta Cómo abordar fallas de compilación.

Valida que tu DAG funcione en tu entorno de Cloud Composer de desarrollo

Una vez que se apruebe la solicitud de extracción, combínala con la rama principal. Usa la consola de Google Cloud para ver los resultados de la compilación. Si tienes muchos activadores de Cloud Build, puedes filtrar tus compilaciones según el nombre del activador add-dags-to-composer.

Una vez que se complete correctamente la tarea de sincronización de Cloud Build, el DAG sincronizado aparecerá en tu entorno de Cloud Composer de desarrollo. Ahí puedes y validar que el DAG funcione como se espera.

Agrega el DAG a tu entorno de producción

Después de que el DAG funcione como se espera, agrégalo a tu producción de forma manual en un entorno de nube. Para ello, sube el archivo DAG al directorio dags/ en el bucket de tu entorno de Cloud Composer de producción.

Si tu trabajo de sincronización de DAG falló o si tu DAG no se comporta como se espera en tu entorno de Cloud Composer de desarrollo, consulta Cómo abordar las fallas de compilación.

Cómo abordar fallas de compilación

En esta sección, se explica cómo abordar situaciones comunes de fallas de compilación.

¿Qué sucede si mi verificación previa al envío falla?

En tu solicitud de extracción, haz clic en Detalles y elige Consulta más detalles en Google Cloud Build para ver los registros de tu compilación en la Consola de Google Cloud Usa estos registros para depurar el problema de tus en el DAG. Cuando resuelvas los problemas, confirma la solución y envíala a tu . La verificación previa al envío se vuelve a ejecutar, y puedes seguir iterando con los registros como herramienta de depuración.

¿Qué sucede si mi trabajo de sincronización de DAG falla?

Usa la consola de Google Cloud para ver los resultados de la compilación. Si tienes muchas de Cloud Build, puedes filtrar tus compilaciones según el nombre del activador add-dags-to-composer Examina los registros del trabajo de compilación y resuelve el errores. Si necesitas ayuda adicional para resolver los errores, utiliza canales de asistencia.

¿Qué sucede si mi DAG no funciona correctamente en mi entorno de Cloud Composer?

Si tu DAG no funciona como se espera en tu entorno de Cloud Composer de desarrollo, no lo promociones manualmente a tu entorno de Cloud Composer de producción. Como alternativa, toma una de las siguientes medidas:

  • Reverte la solicitud de extracción con los cambios que rompieron tu DAG para restablecerlo al estado que tenía inmediatamente antes de realizar los cambios (esto también revertirá todos los demás archivos de esa solicitud de extracción).
  • Crea una solicitud de extracción nueva para revertir de forma manual los cambios en el DAG dañado.
  • Crea una solicitud de extracción nueva y corrige los errores en tu DAG.

Seguir cualquiera de estos pasos activa una nueva verificación previa al envío y, luego de la combinación, el trabajo de sincronización de DAG.

¿Qué sigue?