Probar, sincronizar y desplegar tus DAGs desde GitHub

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta guía se explica cómo crear una canalización de CI/CD para probar, sincronizar e implementar DAGs en tu entorno de Cloud Composer desde tu repositorio de GitHub.

Si solo quieres sincronizar datos de otros servicios, consulta el artículo Transferir datos de otros servicios.

Descripción general de los flujos de procesamiento de CI/CD

Diagrama de arquitectura que muestra los pasos del flujo. Las comprobaciones previas y la revisión de la solicitud de extracción se encuentran en la sección de GitHub, mientras que la sincronización de DAG y la verificación manual de DAG se encuentran en la sección Google Cloud .
Figura 1. Diagrama de arquitectura que muestra los pasos del flujo (haga clic para ampliar)

El flujo de procesamiento de CI/CD que prueba, sincroniza y despliega DAGs tiene los siguientes pasos:

  1. Haces un cambio en un DAG y lo insertas en una rama de desarrollo de tu repositorio.

  2. Abre una solicitud de extracción en la rama principal de tu repositorio.

  3. Cloud Build ejecuta pruebas unitarias para comprobar que tu DAG sea válido.

  4. Tu solicitud de extracción se ha aprobado y se ha combinado con la rama principal de tu repositorio.

  5. Cloud Build sincroniza tu entorno de desarrollo de Cloud Composer con estos nuevos cambios.

  6. Verificas que el DAG se comporta como esperabas en tu entorno de desarrollo.

  7. Si tu DAG funciona como esperabas, súbelo a tu entorno de producción de Cloud Composer.

Objetivos

Antes de empezar

  • En esta guía se presupone que trabajas con dos entornos de Cloud Composer idénticos: un entorno de desarrollo y un entorno de producción.

    En esta guía, solo vas a configurar una canalización de CI/CD para tu entorno de desarrollo. Asegúrate de que el entorno que usas no sea un entorno de producción.

  • En esta guía se da por hecho que tienes tus DAGs y sus pruebas almacenados en un repositorio de GitHub.

    La pipeline de CI/CD de ejemplo muestra el contenido de un repositorio de ejemplo. Los DAGs y las pruebas se almacenan en el directorio dags/, mientras que los archivos de requisitos, el archivo de restricciones y los archivos de configuración de Cloud Build se almacenan en el nivel superior. La utilidad de sincronización de DAG y sus requisitos se encuentran en el directorio utils.

    Esta estructura se puede usar en entornos de Airflow 1, Airflow 2, Cloud Composer 1 y Cloud Composer 2.

Crear una tarea de comprobación previa y pruebas unitarias

La primera tarea de Cloud Build ejecuta una comprobación previa al envío, que ejecuta pruebas unitarias de tus DAGs.

Añadir pruebas unitarias

Si aún no lo has hecho, escribe pruebas unitarias para tus DAGs. Guarda estas pruebas junto con los DAGs de tu repositorio. Cada prueba debe tener el sufijo _test. Por ejemplo, el archivo de prueba del DAG en example_dag.py es example_dag_test.py. Estas son las pruebas que se ejecutan como comprobación previa en tu repositorio.

Crear una configuración YAML de Cloud Build para la comprobación previa a la confirmación

En tu repositorio, crea un archivo YAML llamado test-dags.cloudbuild.yaml que configure tu tarea de Cloud Build para las comprobaciones previas a la confirmación. En él, hay tres pasos:

  1. Instala las dependencias que necesiten tus DAGs.
  2. Instala las dependencias que necesiten tus pruebas unitarias.
  3. Ejecuta las pruebas de DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Crea el activador de Cloud Build para la comprobación previa a la confirmación

Sigue la guía Crear repositorios desde GitHub para crear un activador basado en una aplicación de GitHub con las siguientes configuraciones:

  • Nombre: test-dags

  • Evento: solicitud de extracción

  • Origen: repositorio. Elige el repositorio.

  • Fuente - Rama base: ^main$ (cambia main por el nombre de la rama base de tu repositorio, si es necesario)

  • Fuente: control de comentarios (no obligatorio)

  • Configuración de compilación: archivo de configuración de Cloud Build: /test-dags.cloudbuild.yaml (ruta al archivo de compilación)

Crear un trabajo de sincronización de DAGs y añadir una secuencia de comandos de utilidad de DAGs

A continuación, configura una tarea de Cloud Build que ejecute una secuencia de comandos de utilidad de DAGs. La secuencia de comandos de utilidad de este trabajo sincroniza tus DAGs con tu entorno de Cloud Composer después de que se combinen en la rama principal de tu repositorio.

Añadir la secuencia de comandos de utilidad de los DAGs

Añade la secuencia de comandos de la utilidad DAG a tu repositorio. Esta secuencia de comandos de utilidad copia todos los archivos DAG del directorio dags/ de tu repositorio en un directorio temporal e ignora todos los archivos Python que no sean DAG. A continuación, la secuencia de comandos usa la biblioteca de cliente de Cloud Storage para subir todos los archivos de ese directorio temporal al directorio dags/ del bucket de tu entorno de Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Crear una configuración YAML de Cloud Build para sincronizar DAGs

En tu repositorio, crea un archivo YAML llamado add-dags-to-composer.cloudbuild.yaml que configure tu trabajo de Cloud Build para sincronizar los DAGs. En él, hay dos pasos:

  1. Instala las dependencias que necesita la secuencia de comandos de utilidad de los DAGs.

  2. Ejecuta la secuencia de comandos de la utilidad para sincronizar los DAGs de tu repositorio con tu entorno de Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Crea el activador de Cloud Build

Sigue la guía Crear repositorios desde GitHub para crear un activador basado en una aplicación de GitHub con las siguientes configuraciones:

  • Nombre: add-dags-to-composer

  • Evento: inserción en una rama

  • Origen: repositorio. Elige el repositorio.

  • Fuente - Rama base: ^main$ (cambia main por el nombre de la rama base de tu repositorio, si es necesario)

  • Fuente: filtro de archivos incluidos (glob): dags/**

  • Configuración de compilación: archivo de configuración de Cloud Build: /add-dags-to-composer.cloudbuild.yaml (ruta al archivo de compilación)

En la configuración avanzada, añade dos variables de sustitución:

  • _DAGS_DIRECTORY: el directorio en el que se encuentran los DAGs en tu repositorio. Si usas el repositorio de ejemplo de esta guía, es dags/.

  • _DAGS_BUCKET: el segmento de Cloud Storage que contiene el directorio dags/ de tu entorno de desarrollo de Cloud Composer. Omite el prefijo gs://. Por ejemplo: us-central1-example-env-1234ab56-bucket.

Probar tu flujo de procesamiento de CI/CD

En esta sección, sigue un flujo de desarrollo de DAG que utilice los activadores de Cloud Build que acabas de crear.

Ejecutar una tarea de envío previo

Crea una solicitud de extracción en tu rama principal para probar la compilación. Busca la comprobación previa en la página. Haz clic en Detalles y elige Ver más detalles en Google Cloud Build para ver los registros de compilación en laGoogle Cloud consola.

Captura de pantalla de una comprobación de GitHub llamada test-dags con una flecha roja que apunta al nombre del proyecto entre paréntesis
Imagen 2. Captura de pantalla del estado de la comprobación previa al envío de Cloud Build en GitHub (haz clic para ampliar)

Si la comprobación previa al envío ha fallado, consulta Solucionar errores de compilación.

Validar que tu DAG funciona en tu entorno de desarrollo de Cloud Composer

Una vez que se apruebe tu solicitud de extracción, combínala con tu rama principal. Usa la consolaGoogle Cloud para ver los resultados de tu compilación. Si tienes muchos activadores de Cloud Build, puedes filtrar tus compilaciones por el nombre del activador add-dags-to-composer.

Una vez que el trabajo de sincronización de Cloud Build se haya completado correctamente, el DAG sincronizado aparecerá en tu entorno de desarrollo de Cloud Composer. Ahí puedes comprobar que el DAG funciona correctamente.

Añade el DAG a tu entorno de producción

Una vez que el DAG funcione correctamente, añádelo manualmente a tu entorno de producción. Para ello, sube el archivo DAG al directorio dags/ del segmento de tu entorno de producción de Cloud Composer.

Si la tarea de sincronización de tu DAG ha fallado o si tu DAG no se comporta como esperabas en tu entorno de desarrollo de Cloud Composer, consulta Solucionar errores de compilación.

Solucionar errores de compilación

En esta sección se explica cómo abordar situaciones habituales en las que se produce un error de compilación.

¿Qué ocurre si mi comprobación previa no se ha realizado correctamente?

En tu solicitud de extracción, haz clic en Detalles y elige Ver más detalles en Google Cloud Build para ver los registros de compilación en la consola deGoogle Cloud . Usa estos registros para depurar el problema con tu DAG. Una vez que haya resuelto los problemas, confirme la corrección y envíela a su rama. La comprobación previa se vuelve a ejecutar y puedes seguir iterando con los registros como herramienta de depuración.

¿Qué ocurre si falla mi trabajo de sincronización de DAG?

Usa la consola Google Cloud para ver los resultados de tu compilación. Si tienes muchos activadores de Cloud Build, puedes filtrar tus compilaciones por el nombre del activador add-dags-to-composer. Examina los registros del trabajo de compilación y resuelve los errores. Si necesitas más ayuda para resolver los errores, utiliza los canales de asistencia.

¿Qué ocurre si mi DAG no funciona correctamente en mi entorno de Cloud Composer?

Si tu DAG no funciona como esperabas en tu entorno de desarrollo de Cloud Composer, no lo asciendas manualmente a tu entorno de producción de Cloud Composer. En su lugar, haz una de las siguientes acciones:

  • Revierte la solicitud de extracción con los cambios que han dañado tu DAG para restaurarlo al estado inmediatamente anterior a los cambios (esto también revierte todos los demás archivos de esa solicitud de extracción).
  • Crea una solicitud de extracción para revertir manualmente los cambios del DAG dañado.
  • Crea una solicitud de extracción para corregir los errores de tu DAG.

Si sigues alguno de estos pasos, se activará una nueva comprobación previa al envío y, al combinar, se activará el trabajo de sincronización de DAG.

Siguientes pasos