Ejecuta un DAG de Apache Airflow en Cloud Composer 2

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta guía de inicio rápido, se muestra cómo crear un entorno de Cloud Composer y ejecutar un DAG de Apache Airflow en Cloud Composer 2.

Antes de comenzar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. Si quieres obtener los permisos que necesitas para completar esta guía de inicio rápido, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:

    Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

    También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Crear un entorno

  1. En la consola de Google Cloud, ve a la página Crear entorno.

    Ir a Crear entorno

  1. Si este es el primer entorno del proyecto, Otorga los permisos necesarios a la cuenta de servicio de Cloud Composer. .

    Agregas la cuenta de agente de servicio de Cloud Composer como un principal nuevo en la cuenta de servicio de tu entorno y le otorgas el rol de extensión de agente de servicio de la API de Cloud Composer v2.

    Confirma que usas la cuenta de servicio prevista para tu entorno y haz clic en Otorgar.

  2. En el campo Nombre, ingresa example-environment.

  3. En la lista desplegable Ubicación, selecciona una región para el entorno de Cloud Composer. En esta guía, se usa la región us-central1.

  4. Para otras opciones de configuración del entorno, usa los valores predeterminados proporcionados.

  5. Haz clic en Crear y espera a que se cree el entorno.

  6. Cuando termine, aparecerá una marca de verificación verde junto al nombre del entorno.

Crea un archivo DAG

Un DAG de Airflow es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python.

En esta guía, se usa un ejemplo de DAG de Airflow definido en el archivo quickstart.py. El código de Python de este archivo hace lo siguiente:

  1. Crea un DAG composer_sample_dag. Este DAG se ejecuta todos los días.
  2. Ejecuta una tarea print_dag_run_conf. La tarea imprime la configuración de ejecución del DAG con el operador bash.

Guarda una copia del archivo quickstart.py en tu máquina local:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Sube el archivo DAG al bucket de tu entorno

Cada entorno de Cloud Composer tiene un bucket de Cloud Storage asociado. Airflow en Cloud Composer programa solo los DAG que se encuentran en la carpeta /dags de este bucket.

Para programar el DAG, sube quickstart.py desde tu máquina local a tu la carpeta /dags del entorno:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno, example-environment. Se abrirá la página Detalles del entorno.

  3. Haz clic en Abrir carpeta de DAG. Se abrirá la página Detalles del bucket.

  4. Haz clic en Subir archivos y, luego, selecciona tu copia de quickstart.py.

  5. Para subir el archivo, haz clic en Abrir.

Cómo ver el DAG

Después de subir el archivo DAG, Airflow hace lo siguiente:

  1. Analiza el archivo DAG que subiste. Pueden pasar unos minutos hasta que DAG esté disponible para Airflow.
  2. Agrega el DAG a la lista de DAGs disponibles.
  3. Ejecuta el DAG de acuerdo con el programa que proporcionaste en el archivo DAG.

Para verificar que tu DAG se procese sin errores y esté disponible en Airflow, obsérvalo en la IU de DAG. La IU del DAG es la interfaz de Cloud Composer para visualizar Información sobre el DAG en la consola de Google Cloud. Cloud Composer también proporciona acceso a la IU de Airflow, que es una interfaz web nativa de Airflow.

  1. Espera unos cinco minutos para darle tiempo a Airflow de procesar el archivo DAG que subiste anteriormente y completar la primera ejecución del DAG (se explica más adelante).

  2. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  3. En la lista de entornos, haz clic en el nombre de tu entorno, example-environment. Se abrirá la página Detalles del entorno.

  4. Ve a la pestaña DAG.

  5. Verifica que el DAG composer_quickstart esté presente en la lista de DAG.

    La lista de DAGs muestra el DAG composer_quickstart con información adicional, como el estado y la programación.
    Figura 1: En la lista de DAG, se muestra DAG composer_quickstart (haz clic para ampliar)

Cómo ver los detalles de la ejecución del DAG

Una sola ejecución de un DAG se denomina ejecución de DAG. Airflow de inmediato ejecuta una ejecución de DAG para el DAG de ejemplo porque la fecha de inicio en el archivo DAG se estableció en ayer. De esta manera, Airflow se pone al día con los DAG especificados de un proyecto.

El DAG de ejemplo contiene una tarea, print_dag_run_conf, que ejecuta el comando echo en la consola. Este comando genera información meta sobre el DAG (identificador numérico de la ejecución de DAG).

  1. En la pestaña DAGs, haz clic en composer_quickstart. Se abrirá la pestaña Runs del DAG.

  2. En la lista de ejecuciones de DAG, haz clic en la primera entrada.

    En la lista de ejecuciones de DAG, se muestra la ejecución de DAG reciente (su fecha de ejecución y estado).
    Figura 2: La lista de ejecuciones de DAG para el DAG composer_quickstart (haz clic para ampliar)
  3. Se muestran los detalles de ejecución de DAG, que detallan la información sobre tareas del DAG de ejemplo.

    La lista de tareas con una entrada print_dag_run_conf, su inicio
    hora, hora de finalización y duración
    Figura 3: La lista de tareas que se ejecutaron en la ejecución del DAG (haz clic para ampliar)
  4. En la sección Registros de la ejecución del DAG, se enumeran los registros de todas las tareas de la ejecución del DAG. Puedes ver el resultado del comando echo en los registros.

    Entradas de registro de la tarea, una de las cuales es "Resultado" y la otra enumera un identificador
    Figura 4: Registros de la tarea print_dag_run_conf (haz clic para ampliar)

Limpia

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página.

Borra los recursos que se usaron en este instructivo:

  1. Borra el entorno de Cloud Composer:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. Selecciona example-environment y haz clic en Borrar.

    3. Espera hasta que se borre el entorno.

  2. Borra el bucket de tu entorno. Aunque borres el entorno de Cloud Composer, no se borra el bucket.

    1. En la consola de Google Cloud, ve a Almacenamiento > Navegador.

      Ir a Almacenamiento > Navegador

    2. Selecciona el bucket del entorno y haz clic en Borrar. Por ejemplo, este bucket puede llamarse us-central1-example-environ-c1616fe8-bucket.

  3. Borra el disco persistente de la cola de Redis de tu entorno. Aunque borres el entorno de Cloud Composer, no se borrará el disco persistente.

    1. En la consola de Google Cloud, ve a Compute Engine > Discos.

      Ir a Discos

    2. Selecciona el disco persistente de la cola de Redis del entorno y haz clic en Borrar.

      Por ejemplo, este disco se puede con el nombre pvc-02bc4842-2312-4347-8519-d87bdcd31115. Los discos de Cloud Composer 2 siempre tienen el tipo Balanced persistent disk y un tamaño de 2 GB.

¿Qué sigue?