Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se explica cómo funcionan la programación y la activación de DAGs en Airflow, cómo definir una programación para un DAG y cómo activarlo o pausarlo manualmente.
Acerca de los DAGs de Airflow en Cloud Composer
Los DAGs de Airflow en Cloud Composer se ejecutan en uno o varios entornos de Cloud Composer de tu proyecto. Sube los archivos de origen de tus DAGs de Airflow a un segmento de Cloud Storage asociado a un entorno. La instancia de Airflow del entorno analiza estos archivos y programa las ejecuciones de DAGs, tal como se define en la programación de cada DAG. Durante la ejecución de un DAG, Airflow programa y ejecuta las tareas individuales que componen un DAG en una secuencia definida por el DAG.
Para obtener más información sobre los conceptos básicos de Airflow, como los DAGs, las ejecuciones de DAGs, las tareas o los operadores de Airflow, consulta la página Conceptos básicos de la documentación de Airflow.
Acerca de la programación de DAGs en Airflow
Airflow ofrece los siguientes conceptos para su mecanismo de programación:
- Fecha lógica
Representa una fecha en la que se ejecuta una ejecución de DAG concreta.
No es la fecha real en la que Airflow ejecuta un DAG, sino un periodo de tiempo que debe procesar una ejecución de DAG concreta. Por ejemplo, en un DAG programado para ejecutarse todos los días a las 12:00, la fecha lógica también sería las 12:00 de un día concreto. Como se ejecuta dos veces al día, el periodo que debe procesar son las últimas 12 horas. Al mismo tiempo, la lógica definida en el DAG puede que no use la fecha lógica o el intervalo de tiempo. Por ejemplo, un DAG puede ejecutar la misma secuencia de comandos una vez al día sin usar el valor de la fecha lógica.
En las versiones de Airflow anteriores a la 2.2, esta fecha se denomina fecha de ejecución.
- Fecha de ejecución
Representa la fecha en la que se ejecuta un DAG en concreto.
Por ejemplo, en un DAG programado para ejecutarse todos los días a las 12:00, la ejecución real puede producirse a las 12:05, un tiempo después de que haya pasado la fecha lógica.
- Intervalo de programación
Representa cuándo y con qué frecuencia se debe ejecutar un DAG, en términos de fechas lógicas.
Por ejemplo, una programación diaria significa que un DAG se ejecuta una vez al día y que las fechas lógicas de sus ejecuciones tienen intervalos de 24 horas.
- Fecha de inicio
Especifica cuándo quieres que Airflow empiece a programar tu DAG.
Las tareas de tu DAG pueden tener fechas de inicio individuales o puedes especificar una única fecha de inicio para todas las tareas. Airflow programa las ejecuciones de DAG en función de la fecha de inicio mínima de las tareas de tu DAG y del intervalo de programación.
- Puesta al día, relleno y reintentos
Mecanismos para ejecutar ejecuciones de DAGs de fechas anteriores.
Catchup ejecuta las ejecuciones de DAG que aún no se han ejecutado. Por ejemplo, si el DAG se ha pausado durante un periodo prolongado y, después, se ha reanudado. Puedes usar el relleno para ejecutar ejecuciones de DAG en un intervalo de fechas determinado. Los reintentos especifican cuántos intentos debe hacer Airflow al ejecutar tareas de un DAG.
La programación funciona de la siguiente manera:
Una vez que haya pasado la fecha de inicio, Airflow esperará a la siguiente aparición del intervalo de programación.
Airflow programa la primera ejecución del DAG para que se produzca al final de este intervalo de programación.
Por ejemplo, si se programa un DAG para que se ejecute cada hora y la fecha de inicio es hoy a las 12:00, la primera ejecución del DAG se producirá hoy a las 13:00.
En la sección Programar un DAG de Airflow de este documento se describe cómo configurar la programación de tus DAGs con estos conceptos. Para obtener más información sobre las ejecuciones y la programación de DAGs, consulta Ejecuciones de DAGs en la documentación de Airflow.
Acerca de las formas de activar un DAG
Airflow ofrece las siguientes formas de activar un DAG:
Activar según una programación. Airflow activa el DAG automáticamente según la programación especificada en el archivo DAG.
Activar manualmente. Puedes activar un DAG manualmente desde laGoogle Cloud consola, la interfaz de usuario de Airflow o ejecutando un comando de la CLI de Airflow desde la CLI de Google Cloud.
Se activa en respuesta a eventos. La forma estándar de activar un DAG en respuesta a eventos es usar un sensor.
Otras formas de activar DAGs:
Activar de forma programática. Puedes activar un DAG mediante la API REST de Airflow. Por ejemplo, desde una secuencia de comandos de Python.
Se activan mediante programación en respuesta a eventos. Puedes activar DAGs en respuesta a eventos mediante funciones de Cloud Run y la API REST de Airflow.
Antes de empezar
- Asegúrate de que tu cuenta tenga un rol que pueda gestionar objetos en los contenedores del entorno, así como ver y activar DAGs. Para obtener más información, consulta Control de acceso.
Programar un DAG de Airflow
Defines una programación para un DAG en el archivo DAG. Edita la definición del DAG de la siguiente manera:
Localiza y edita el archivo DAG en tu ordenador. Si no tienes el archivo DAG, puedes descargar una copia del bucket del entorno. En el caso de un DAG nuevo, puedes definir todos los parámetros al crear el archivo DAG.
En el parámetro
schedule_interval
, define la programación. Puedes usar una expresión cron, como0 0 * * *
, o un preajuste, como@daily
. Para obtener más información, consulta Cron y los intervalos de tiempo en la documentación de Airflow.Airflow determina las fechas lógicas de las ejecuciones de DAGs en función de la programación que hayas definido.
En el parámetro
start_date
, defina la fecha de inicio.Airflow determina la fecha lógica de la primera ejecución del DAG mediante este parámetro.
(Opcional) En el parámetro
catchup
, define si Airflow debe ejecutar todas las ejecuciones anteriores de este DAG desde la fecha de inicio hasta la fecha actual que aún no se hayan ejecutado.Las ejecuciones de DAGs que se realicen durante la puesta al día tendrán su fecha lógica en el pasado y su fecha de ejecución reflejará el momento en el que se ejecutó la ejecución del DAG.
.(Opcional) En el parámetro
retries
, define cuántas veces debe reintentar Airflow las tareas que han fallado (cada DAG consta de una o varias tareas individuales). De forma predeterminada, las tareas de Cloud Composer se reintentan dos veces.Sube la nueva versión del DAG al bucket del entorno.
Espera a que Airflow analice correctamente el DAG. Por ejemplo, puedes consultar la lista de DAGs de tu entorno en laGoogle Cloud consola o en la interfaz de usuario de Airflow.
La siguiente definición de DAG de ejemplo se ejecuta dos veces al día, a las 00:00 y a las 12:00. Su fecha de inicio es el 1 de enero del 2024, pero Airflow no la ejecuta para fechas anteriores después de que la subas o la pauses porque la puesta al día está inhabilitada.
El DAG contiene una tarea llamada insert_query_job
, que inserta una fila en una tabla con el operador BigQueryInsertJobOperator
. Este operador es uno de los Google Cloud operadores de BigQuery, que puedes usar para gestionar conjuntos de datos y tablas, ejecutar consultas y validar datos.
Si falla una ejecución concreta de esta tarea, Airflow la vuelve a intentar cuatro veces más con el intervalo de reintento predeterminado. La fecha lógica de estos reintentos sigue siendo la misma.
La consulta SQL de esta fila usa plantillas de Airflow para escribir la fecha lógica y el nombre del DAG en la fila.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Para probar este DAG, puedes activarlo manualmente y, a continuación, ver los registros de ejecución de la tarea.
Más ejemplos de parámetros de programación
Los siguientes ejemplos de parámetros de programación ilustran cómo funciona la programación con diferentes combinaciones de parámetros:
Si
start_date
esdatetime(2024, 4, 4, 16, 25)
yschedule_interval
es30 16 * * *
, la primera ejecución del DAG se producirá a las 16:30 del 5 de abril del 2024.Si
start_date
esdatetime(2024, 4, 4, 16, 35)
yschedule_interval
es30 16 * * *
, la primera ejecución del DAG se producirá a las 16:30 del 6 de abril del 2024. Como la fecha de inicio es posterior al intervalo de programación del 4 de abril del 2024, el DAG no se ejecuta el 5 de abril del 2024. En su lugar, el intervalo de programación finaliza a las 16:35 del 5 de abril del 2024, por lo que la siguiente ejecución del DAG se programa para las 16:30 del día siguiente.Si
start_date
esdatetime(2024, 4, 4)
yschedule_interval
es@daily
, la primera ejecución del DAG se programará para las 00:00 del 5 de abril del 2024.Si
start_date
esdatetime(2024, 4, 4, 16, 30)
yschedule_interval
es0 * * * *
, la primera ejecución del DAG se programará para las 18:00 del 4 de abril del 2024. Una vez que haya pasado la fecha y la hora especificadas, Airflow programará una ejecución de DAG para que se produzca en el minuto 0 de cada hora. El momento más cercano en el que esto ocurre es a las 17:00. En ese momento, Airflow programa una ejecución de DAG para que se produzca al final del intervalo de programación, es decir, a las 18:00.
Activar un DAG manualmente
Cuando activas manualmente un DAG de Airflow, Airflow ejecuta el DAG una vez, independientemente de la programación especificada en el archivo del DAG.
Consola
La interfaz de usuario de DAG es compatible con Cloud Composer 1.17.8 y versiones posteriores.
Para activar un DAG desde la consola, sigue estos pasos: Google Cloud
En la Google Cloud consola, ve a la página Entornos.
Selecciona un entorno para ver sus detalles.
En la página Detalles del entorno, vaya a la pestaña DAGs.
Haz clic en el nombre de un DAG.
En la página Detalles del DAG, haz clic en Activar DAG. Se crea una nueva ejecución de DAG.
Google Cloud
Interfaz de usuario de Airflow
Para activar un DAG desde la interfaz de usuario de Airflow, sigue estos pasos:
En la Google Cloud consola, ve a la página Entornos.
En la columna Servidor web de Airflow, siga el enlace Airflow de su entorno.
Inicia sesión con la cuenta de Google que tenga los permisos adecuados.
En la interfaz web de Airflow, en la página DAGs (DAGs), en la columna Actions (Acciones) de su DAG, haga clic en el botón Trigger DAG (Activar DAG).
gcloud
En Airflow 1.10.12 o versiones anteriores, ejecuta el comando trigger_dag
de la CLI de Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
En Airflow 1.10.14 o versiones posteriores, incluido Airflow 2, ejecuta el comando de la CLI de Airflow dags trigger
:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Haz los cambios siguientes:
ENVIRONMENT_NAME
: el nombre de tu entorno.LOCATION
: la región en la que se encuentra el entorno.DAG_ID
: el nombre del DAG.
Para obtener más información sobre cómo ejecutar comandos de la CLI de Airflow en entornos de Cloud Composer, consulta Ejecutar comandos de la CLI de Airflow.
Para obtener más información sobre los comandos de la CLI de Airflow disponibles, consulta la referencia del comando gcloud composer environments run
.
Ver los registros y los detalles de las ejecuciones de DAG
En la Google Cloud consola, puedes hacer lo siguiente:
- Consulta los estados de las ejecuciones de DAGs anteriores y los detalles de los DAGs.
- Consulta los registros detallados de todas las ejecuciones de DAG y todas las tareas de estos DAG.
- Consulta las estadísticas de DAG.
Además, Cloud Composer proporciona acceso a la interfaz de usuario de Airflow, que es la interfaz web de Airflow.
Pausar un DAG
Consola
La interfaz de usuario de DAG es compatible con Cloud Composer 1.17.8 y versiones posteriores.
Para pausar un DAG desde la consola Google Cloud :
En la Google Cloud consola, ve a la página Entornos.
Selecciona un entorno para ver sus detalles.
En la página Detalles del entorno, vaya a la pestaña DAGs.
Haz clic en el nombre de un DAG.
En la página Detalles del DAG, haz clic en Pausar DAG.
Interfaz de usuario de Airflow
Para pausar un DAG desde la interfaz de usuario de Airflow, sigue estos pasos:
- En la Google Cloud consola, ve a la página Entornos.
En la columna Servidor web de Airflow, siga el enlace Airflow de su entorno.
Inicia sesión con la cuenta de Google que tenga los permisos adecuados.
En la interfaz web de Airflow, en la página DAGs (DAGs), haz clic en el interruptor situado junto al nombre del DAG.
gcloud
En Airflow 1.10.12 o versiones anteriores, ejecuta el comando pause
de la CLI de Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
pause -- DAG_ID
En Airflow 1.10.14 o versiones posteriores, incluido Airflow 2, ejecuta el comando de la CLI de Airflow dags pause
:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Haz los cambios siguientes:
ENVIRONMENT_NAME
: el nombre de tu entorno.LOCATION
: la región en la que se encuentra el entorno.DAG_ID
: el nombre del DAG.
Para obtener más información sobre cómo ejecutar comandos de la CLI de Airflow en entornos de Cloud Composer, consulta Ejecutar comandos de la CLI de Airflow.
Para obtener más información sobre los comandos de la CLI de Airflow disponibles, consulta la referencia del comando gcloud composer environments run
.