Programa y activa DAG de Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se explica cómo funcionan la programación y la activación del DAG en Airflow, y se explica definir un programa para un DAG y cómo activarlo manualmente o pausarlo.

Acerca de los DAG de Airflow en Cloud Composer

Los DAG de Airflow en Cloud Composer se ejecutan en uno o más entornos de Cloud Composer en tu en un proyecto final. Subes los archivos fuente de tus DAG de Airflow a un bucket de Cloud Storage asociado a un entorno. Luego, la instancia de Airflow del entorno analiza estos archivos y programa las ejecuciones de DAG, según lo definido por la programación de cada DAG. Durante una ejecución de DAG, Airflow programa y ejecuta tareas individuales que conforman un DAG en una secuencia definida por el DAG.

Para obtener más información sobre los conceptos básicos de Airflow, como los DAG de Airflow, las ejecuciones de DAG, las tareas o los operadores, consulta la página Conceptos básicos en la documentación de Airflow.

Información acerca de la programación de DAG en Airflow

Airflow proporciona los siguientes conceptos para su mecanismo de programación:

Fecha lógica

Representa una fecha para la que se ejecuta una ejecución de DAG en particular.

Esta no es la fecha real en la que Airflow ejecuta un DAG, sino un período que una ejecución de DAG en particular debe procesar. Por ejemplo, para un DAG que está programado para ejecutarse todos los días a las 12:00, la fecha lógica también sería 12:00 en un día específico. Dado que se ejecuta dos veces al día, el período que debe es de las últimas 12 horas. Al mismo tiempo, la lógica definida en el Es posible que el DAG en sí no use la fecha lógica o el intervalo de tiempo. Por ejemplo, un DAG podría ejecutar la misma secuencia de comandos una vez al día sin usar el de la fecha lógica.

En las versiones de Airflow anteriores a la 2.2, esta fecha se denomina fecha de ejecución.

Fecha de la ejecución

Representa la fecha en la que se ejecuta una ejecución de DAG en particular.

Por ejemplo, para un DAG que está programado para ejecutarse todos los días a las 12:00, la ejecución real del DAG puede ocurrir a las 12:05, algún tiempo después de que pase la fecha lógica.

Intervalo de programas

Representa cuándo y con qué frecuencia se debe ejecutar un DAG, en términos de fechas lógicas.

Por ejemplo, una programación diaria significa que un DAG se ejecuta una vez al día y las fechas lógicas de sus ejecuciones de DAG tienen intervalos de 24 horas.

Fecha de inicio

Especifica cuándo deseas que Airflow comience a programar el DAG.

Las tareas del DAG pueden tener fechas de inicio individuales, o bien puedes especificar una sola fecha de inicio para todas las tareas. En función de la fecha de inicio mínima para las tareas en tu DAG y en el intervalo de programación, Airflow programa las ejecuciones de DAG.

Actualización, reabastecimiento y reintentos

Mecanismos para ejecutar ejecuciones de DAG de fechas anteriores.

La actualización ejecuta ejecuciones de DAG que aún no se ejecutaron, por ejemplo, si el DAG estuvo detenido durante un período prolongado y, luego, se reanudó. Puedes usar el reabastecimiento para ejecutar ejecuciones de DAG para un período determinado. Reintentos especificar cuántos intentos debe realizar Airflow cuando ejecute tareas desde un DAG

La programación funciona de la siguiente manera:

  1. Una vez transcurrido la fecha de inicio, Airflow espera el próximo caso de la durante un intervalo de programación.

  2. Airflow programa la primera ejecución del DAG para que se realice al final de este intervalo de programación.

    Por ejemplo, si un DAG está programado para ejecutarse a cada hora y la fecha de inicio es a las 12:00 p.m. de hoy, la primera ejecución del DAG se realiza hoy a las 13:00 p.m.

En la sección Programa un DAG de Airflow de este documento, se describe cómo configurar la programación de tus DAG con estos conceptos. Para ver más sobre las ejecuciones y la programación de DAG, consulta Ejecuciones de DAG en la documentación de Airflow.

Información acerca de las formas de activar un DAG

Airflow proporciona las siguientes formas de activar un DAG:

  • Activación en función de un programa: Airflow activa el DAG automáticamente según la programación que se especificó en el archivo DAG.

  • Activación manual: Puedes activar un DAG de forma manual desde la consola de Google Cloud, la IU de Airflow o ejecutando un comando de la CLI de Airflow desde Google Cloud CLI.

  • Activar en respuesta a eventos La forma estándar de activar un DAG en respuesta a eventos es usar un sensor.

Otras maneras de activar los DAG:

Antes de comenzar

  • Asegúrate de que tu cuenta tenga un rol que pueda administrar objetos en la en buckets de entorno y visualizar y activar DAG. Para obtener más información, consulta Guía de control de acceso.

Programa un DAG de Airflow

Puedes definir un programa para un DAG en el archivo DAG. Edita la definición del DAG en de la siguiente manera:

  1. Localiza y edita el archivo DAG en tu computadora. Si no tienes el archivo DAG, puedes descargar su copia del bucket del entorno. Para un DAG nuevo, puedes definir todos los parámetros cuando creas el archivo DAG.

  2. En el parámetro schedule_interval, define la programación. Puedes usar una expresión de Cron, como 0 0 * * *, o un valor predeterminado, como @daily. Para Para obtener más información, consulta Intervalos de tiempo y cron en la documentación de Airflow.

    Airflow determina fechas lógicas para las ejecuciones de DAG en función del programa que que establezcas.

  3. En el parámetro start_date, define la fecha de inicio.

    Airflow determina la fecha lógica de la primera ejecución de DAG con este parámetro.

  4. (Opcional) En el parámetro catchup, define si Airflow debe ejecutar todas las ejecuciones anteriores de este DAG desde la fecha de inicio hasta la fecha actual que aún no se ejecutaron.

    Las ejecuciones de DAG que se ejecuten durante la puesta en marcha tendrán su fecha lógica en el pasada y su fecha de ejecución reflejará la hora en la que la ejecución del DAG ejecutado.

  5. (Opcional) En el parámetro retries, define cuántas veces Airflow debe reintentarlas tareas que fallaron (cada DAG consta de una o más tareas individuales). De forma predeterminada, las tareas de Cloud Composer se reintentan dos veces.

  6. Sube la versión nueva del DAG al entorno bucket.

  7. Espera hasta que Airflow analice correctamente el DAG. Por ejemplo, puedes verificar la lista de DAG de tu entorno en la La consola de Google Cloud o en la IU de Airflow.

En el siguiente ejemplo, la definición de DAG se ejecuta dos veces al día a las 00:00 y a las 12:00. Es la fecha de inicio está configurada para el 1 de enero de 2024, pero Airflow no la ejecuta en fechas anteriores después de subirlo o pausarlo, ya que la actualización está inhabilitada.

El DAG contiene una tarea llamada insert_query_job, que inserta una fila en una tabla con el operador BigQueryInsertJobOperator. Este operador es uno de Operadores de BigQuery de Google Cloud, que puedes usar para administrar tablas y conjuntos de datos, ejecutar consultas y validar datos. Si falla una ejecución específica de esta tarea, Airflow lo vuelve a intentar cuatro veces más veces con el intervalo de reintentos predeterminado. La fecha lógica para estos reintentos sigue siendo el mismo.

La consulta de SQL para esta fila usa plantillas de Airflow para escribir la fecha y el nombre lógicos de DAG en la fila.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Para probar este DAG, puedes activarlo de forma manual y y, luego, consulta los registros de ejecución de tareas.

Más ejemplos de parámetros de programación

A continuación, se muestran ejemplos de cómo funciona la programación con diferentes combinaciones de parámetros:

  • Si start_date es datetime(2024, 4, 4, 16, 25) y schedule_interval es 30 16 * * *, la primera ejecución del DAG se realiza el 5 de abril de 2024 a las 16:30.

  • Si start_date es datetime(2024, 4, 4, 16, 35) y schedule_interval es 30 16 * * *, la primera ejecución de DAG se realizará el 6 de abril de 2024 a las 16:30. Debido a que la fecha de inicio es posterior al intervalo de programación el 4 de abril de 2024, la ejecución del DAG no se realiza el 5 de abril de 2024. En cambio, el cronograma el intervalo finaliza a las 16:35 del 5 de abril de 2024, por lo que se programó la próxima ejecución de DAG para las 4:30 del día siguiente.

  • Si start_date es datetime(2024, 4, 4) y schedule_interval es @daily, entonces la primera ejecución del DAG se programa para las 00:00 del 5 de abril de 2024.

  • Si start_date es datetime(2024, 4, 4, 16, 30) y schedule_interval es 0 * * * *, la primera ejecución de DAG es programado para el 4 de abril de 2024 a las 6:00 p.m. Después de que pase la fecha y hora especificadas, Airflow programa una ejecución de DAG en el minuto 0 de cada hora. El momento más cercano en que esto sucede es a las 17:00. En este momento, Airflow programa una ejecución de DAG para que se realice al final del intervalo de programación, es decir, a las 18:00.

Cómo activar un DAG de forma manual

Cuando activas un DAG de Airflow de forma manual, Airflow ejecuta el DAG una vez, independientemente del programa especificado en el archivo DAG.

Console

Para activar un DAG desde la consola de Google Cloud, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. Selecciona un entorno para ver sus detalles.

  3. En la página Detalles del entorno, ve a la pestaña DAG.

  4. Haz clic en el nombre de un DAG.

  5. En la página Detalles del DAG, haz clic en Activar DAG. Una nueva ejecución de DAG crear.

IU de Airflow

Para activar un DAG desde la IU de Airflow, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la columna Servidor web de Airflow, sigue el vínculo de Airflow para tu entorno.

  3. Accede con la Cuenta de Google que tiene los permisos correspondientes.

  4. En la interfaz web de Airflow, en la página DAG, en la columna Vínculos de tu DAG, haz clic en el botón Activar DAG.

  5. Especifica la configuración de la ejecución del DAG (opcional).

  6. Haz clic en Activar.

gcloud

Ejecuta el comando dags trigger de la CLI de Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • DAG_ID: Es el nombre del DAG.

Para obtener más información sobre cómo ejecutar comandos de la CLI de Airflow en entornos de Cloud Composer, consulta Cómo ejecutar comandos de la CLI de Airflow.

Para obtener más información sobre los comandos disponibles de la CLI de Airflow, consulta la referencia de comandos gcloud composer environments run.

Consulta los registros y los detalles de la ejecución de DAG

En la consola de Google Cloud, puedes hacer lo siguiente:

Además, Cloud Composer proporciona acceso a los La IU de Airflow, que es la interfaz web de Airflow

Cómo pausar un DAG

Console

Para pausar un DAG desde la consola de Google Cloud, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. Selecciona un entorno para ver sus detalles.

  3. En la página Detalles del entorno, ve a la pestaña DAG.

  4. Haz clic en el nombre de un DAG.

  5. En la página Detalles del DAG, haz clic en Pausar DAG.

IU de Airflow

Para pausar un DAG desde la IU de Airflow, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Entornos.

Ir a Entornos

  1. En la columna Servidor web de Airflow, sigue el vínculo de Airflow para tu entorno.

  2. Accede con la Cuenta de Google que tiene los permisos correspondientes.

  3. En la interfaz web de Airflow, en la página DAG, haz clic en con el botón de activación junto al nombre del DAG.

gcloud

Ejecuta el comando dags pause de la CLI de Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • DAG_ID: Es el nombre del DAG.

Si deseas obtener más información para ejecutar comandos de la CLI de Airflow en Cloud Composer, consulta Ejecuta comandos de la CLI de Airflow.

Para obtener más información sobre los comandos disponibles de la CLI de Airflow, consulta la referencia de comandos gcloud composer environments run.

¿Qué sigue?