Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En este tutorial se explica cómo diagnosticar y solucionar problemas de programación de tareas y de análisis que provocan un funcionamiento incorrecto del programador, errores de análisis, latencia y fallos en las tareas.
Introducción
El programador de Airflow se ve afectado principalmente por dos factores: la programación de tareas y el análisis de DAGs. Si hay problemas en alguno de esos factores, puede afectar negativamente al estado y al rendimiento del entorno.
A veces, se programan demasiadas tareas simultáneamente. En esta situación, la cola se llena y las tareas permanecen en el estado "programado" o se vuelven a programar después de ponerse en cola, lo que puede provocar errores en las tareas y latencia en el rendimiento.
Otro problema habitual es la latencia de análisis y los errores causados por la complejidad del código de un DAG. Por ejemplo, un código de DAG que contenga variables de Airflow en el nivel superior del código puede provocar retrasos en el análisis, sobrecarga de la base de datos, errores de programación y tiempos de espera de DAGs.
En este tutorial, diagnosticarás los DAGs de ejemplo y aprenderás a solucionar problemas de programación y análisis, mejorar la programación de los DAGs y optimizar el código de los DAGs y las configuraciones del entorno para mejorar el rendimiento.
Objetivos
En esta sección se enumeran los objetivos de los ejemplos de este tutorial.
Ejemplo: el programador no funciona correctamente y la latencia se debe a la alta simultaneidad de tareas
Sube el DAG de ejemplo que se ejecuta varias veces simultáneamente y diagnostica el mal funcionamiento del programador y los problemas de latencia con Cloud Monitoring.
Optimiza el código de tu DAG consolidando las tareas y evalúa el impacto en el rendimiento.
Distribuye las tareas de forma más uniforme a lo largo del tiempo y evalúa el impacto en el rendimiento.
Optimiza las configuraciones de Airflow y del entorno, y evalúa el impacto.
Ejemplo: errores de análisis de DAG y latencia causados por código complejo
Sube el DAG de ejemplo con variables de Airflow y diagnostica problemas de análisis con Cloud Monitoring.
Optimiza el código del DAG evitando las variables de Airflow en el nivel superior del código y evalúa el impacto en el tiempo de análisis.
Optimizar las configuraciones de Airflow y del entorno, y evaluar el impacto en el tiempo de análisis.
Costes
En este tutorial se usan los siguientes componentes facturables de Google Cloud:
- Cloud Composer (consulta los costes adicionales).
- Cloud Monitoring
Cuando termines este tutorial, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta Limpiar.
Antes de empezar
En esta sección se describen las acciones que debes llevar a cabo antes de empezar el tutorial.
Crear y configurar un proyecto
Para este tutorial, necesitas un Google Cloud proyecto. Configura el proyecto de la siguiente manera:
En la Google Cloud consola, selecciona o crea un proyecto:
Comprueba que la facturación esté habilitada en tu proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
Asegúrate de que el usuario de tu proyecto Google Cloud tenga los siguientes roles para crear los recursos necesarios:
- Administrador de objetos de entorno y almacenamiento
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador de Compute (
roles/compute.admin
)
- Administrador de objetos de entorno y almacenamiento
(
Habilitar APIs en tu proyecto
Enable the Cloud Composer API.
Crear un entorno de Cloud Composer
Crea un entorno de Cloud Composer 2.
.Como parte de la creación del entorno,
asignas el rol Extensión del agente de servicio de la API de Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) a la cuenta del agente de servicio de Composer. Cloud Composer usa esta cuenta para realizar operaciones en tu proyecto Google Cloud .
Ejemplo: el programador no funciona correctamente y la tarea falla debido a problemas de programación
En este ejemplo se muestra cómo depurar un error de funcionamiento del programador y la latencia causada por una alta simultaneidad de tareas.
Subir el DAG de ejemplo a tu entorno
Sube el siguiente DAG de ejemplo al entorno que has creado en los pasos anteriores. En este tutorial, el DAG se llama dag_10_tasks_200_seconds_1
.
Este DAG tiene 200 tareas. Cada tarea espera 1 segundo e imprime "Complete!". El DAG se activa automáticamente una vez que se ha subido. Cloud Composer ejecuta este DAG 10 veces y todas las ejecuciones de DAG se realizan en paralelo.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnosticar problemas de funcionamiento incorrecto del programador y de fallos en las tareas
Una vez que se hayan completado las ejecuciones del DAG, abre la interfaz de usuario de Airflow y haz clic en el DAG dag_10_tasks_200_seconds_1
. Verá que se han completado 10 ejecuciones de DAGs y que cada una tiene 200 tareas completadas.
Revisa los registros de tareas de Airflow:
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, a continuación, a Todos los registros > Registros de Airflow > Workers > Ver en Explorador de registros.
En el histograma de registros, puede ver los errores y las advertencias indicados con los colores rojo y naranja:

El DAG de ejemplo ha generado unas 130 advertencias y 60 errores. Haz clic en cualquier columna que contenga barras amarillas y rojas. Verá algunas de las siguientes advertencias y errores en los registros:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Estos registros pueden indicar que el uso de recursos ha superado los límites y que el trabajador se ha reiniciado.
Si una tarea de Airflow permanece en la cola durante demasiado tiempo, el programador la marca como fallida y como pendiente de reintento, y la vuelve a programar para su ejecución. Una forma de observar los síntomas de esta situación es consultar el gráfico con el número de tareas en cola. Si los picos de este gráfico no disminuyen en unos 10 minutos, es probable que se produzcan fallos en las tareas (sin registros).
Revisa la información de monitorización:
Vaya a la pestaña Monitorización y seleccione Resumen.
Consulta el gráfico Tareas de Airflow.
Imagen 2. Gráfico de tareas de Airflow (haz clic en la imagen para ampliarla) En el gráfico de tareas de Airflow, hay un pico en las tareas en cola que dura más de 10 minutos, lo que podría significar que no hay suficientes recursos en tu entorno para procesar todas las tareas programadas.
Consulta el gráfico Trabajadores activos:
Imagen 3. Gráfico de trabajadores activos (haz clic para ampliar) El gráfico Trabajadores activos indica que el DAG ha activado el autoescalado hasta el límite máximo permitido de tres trabajadores durante la ejecución del DAG.
Los gráficos de uso de recursos pueden indicar la falta de capacidad de los trabajadores de Airflow para ejecutar las tareas en cola. En la pestaña Monitorización, selecciona Trabajadores y consulta los gráficos Uso total de la CPU de los trabajadores y Uso total de la memoria de los trabajadores.
Imagen 4. Gráfico de uso de CPU total de los trabajadores (haz clic para ampliarlo) Imagen 5. Gráfico de uso de memoria total de los trabajadores (haz clic en la imagen para ampliarla) Los gráficos indican que la ejecución de demasiadas tareas simultáneamente ha provocado que se alcance el límite de la CPU. Los recursos se habían usado durante más de 30 minutos, lo que es incluso más que la duración total de 200 tareas en 10 ejecuciones de DAG que se ejecutan una tras otra.
Estos son los indicadores de que la cola se está llenando y de que no hay recursos para procesar todas las tareas programadas.
Consolidar tareas
El código actual crea muchos DAGs y tareas sin recursos suficientes para procesar todas las tareas en paralelo, lo que provoca que la cola se llene. Si las tareas permanecen en la cola demasiado tiempo, es posible que se reprogramen o fallen. En estas situaciones, deberías optar por un número menor de tareas más consolidadas.
El siguiente DAG de ejemplo cambia el número de tareas del ejemplo inicial de 200 a 20 y aumenta el tiempo de espera de 1 a 10 segundos para imitar tareas más consolidadas que hacen la misma cantidad de trabajo.
Sube el siguiente DAG de ejemplo al entorno que has creado. En este tutorial, el DAG se llama dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evalúa el impacto de las tareas más consolidadas en los procesos de programación:
Espera a que se completen las ejecuciones del DAG.
En la interfaz de usuario de Airflow, en la página DAGs, haz clic en el DAG
dag_10_tasks_20_seconds_10
. Verá 10 ejecuciones de DAG, cada una con 20 tareas completadas.En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, a continuación, a Todos los registros > Registros de Airflow > Workers > Ver en Explorador de registros.
En el segundo ejemplo, con tareas más consolidadas, se han producido aproximadamente 10 advertencias y 7 errores. En el histograma, puede comparar el número de errores y advertencias del ejemplo inicial (valores anteriores) y del segundo ejemplo (valores posteriores).
Imagen 6. Histograma de los registros de los trabajadores de Airflow después de que se consolidaran las tareas (haz clic para ampliar) Si comparas el primer ejemplo con el más consolidado, verás que hay muchos menos errores y advertencias en el segundo. Sin embargo, los mismos errores relacionados con el cierre en caliente siguen apareciendo en los registros debido a la sobrecarga de recursos.
En la pestaña Monitorización, selecciona Trabajadores y consulta los gráficos.
Si comparas el gráfico Tareas de Airflow del primer ejemplo (valores anteriores) con el gráfico del segundo ejemplo, en el que las tareas están más consolidadas, verás que el pico de tareas en cola duró menos tiempo cuando las tareas estaban más consolidadas. Sin embargo, ha durado casi 10 minutos, lo que sigue siendo un tiempo inferior al óptimo.
Imagen 7. Gráfico de tareas de Airflow después de que se hayan consolidado las tareas (haz clic para ampliar) En el gráfico Trabajadores activos, puedes ver que el primer ejemplo (en la parte izquierda del gráfico) usó recursos durante un periodo mucho más prolongado que el segundo, aunque ambos ejemplos imitan la misma cantidad de trabajo.
Imagen 8. Gráfico de trabajadores activos después de que se consolidaran las tareas (haz clic para ampliar) Revisa los gráficos de consumo de recursos de los trabajadores. Aunque la diferencia entre los recursos utilizados en el ejemplo con más tareas consolidadas y el ejemplo inicial es bastante significativa, el uso de la CPU sigue aumentando hasta el 70% del límite.
Imagen 9. Gráfico de uso de CPU total de los trabajadores después de que se consolidaran las tareas (haz clic para ampliarlo) Imagen 10. Gráfico del uso total de memoria de los trabajadores después de que se consolidaran las tareas (haz clic para ampliarlo)
Distribuir las tareas de forma más uniforme a lo largo del tiempo
Si hay demasiadas tareas simultáneas, la cola se llenará, lo que provocará que las tareas se queden atascadas en la cola o se reprogramen. En los pasos anteriores, has reducido el número de tareas consolidándolas. Sin embargo, los registros de salida y la monitorización indican que el número de tareas simultáneas sigue siendo inferior al óptimo.
Puedes controlar el número de ejecuciones de tareas simultáneas implementando una programación o estableciendo límites en el número de tareas que se pueden ejecutar simultáneamente.
En este tutorial, distribuyes las tareas de forma más uniforme a lo largo del tiempo añadiendo parámetros a nivel de DAG al DAG dag_10_tasks_20_seconds_10
:
Añade el argumento
max_active_runs=1
al gestor de contexto de DAG. Este argumento establece un límite de una sola instancia de una ejecución de DAG en un momento dado.Añade el argumento
max_active_tasks=5
al gestor de contexto de DAG. Este argumento controla el número máximo de instancias de tareas que se pueden ejecutar simultáneamente en cada DAG.
Sube el siguiente DAG de ejemplo al entorno que has creado. En este tutorial, el DAG se llama dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evalúa el impacto de distribuir las tareas a lo largo del tiempo en los procesos de programación:
Espera a que se completen las ejecuciones del DAG.
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, a continuación, a Todos los registros > Registros de Airflow > Workers > Ver en Explorador de registros.
En el histograma, puedes ver que el tercer DAG, que tiene un número limitado de tareas y ejecuciones activas, no ha generado ninguna advertencia ni ningún error, y que la distribución de los registros es más uniforme en comparación con los valores anteriores.
Imagen 11. Histograma de los registros de los trabajadores de Airflow después de que las tareas se hayan consolidado y distribuido a lo largo del tiempo (haz clic para ampliar)
Las tareas del ejemplo de dag_10_tasks_20_seconds_10_scheduled
, que tiene un número limitado de tareas activas y ejecuciones, no han provocado presión en los recursos porque las tareas se han puesto en cola de forma uniforme.
Después de seguir los pasos descritos, has optimizado el uso de recursos consolidando tareas pequeñas y distribuyéndolas de forma más uniforme a lo largo del tiempo.
Optimizar las configuraciones del entorno
Puedes ajustar las configuraciones de tu entorno para asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar las tareas en cola.
Número de trabajadores y simultaneidad de trabajadores
Puedes ajustar el número máximo de trabajadores para que Cloud Composer escale automáticamente tu entorno dentro de los límites establecidos.
El parámetro [celery]worker_concurrency
define el número máximo de tareas que un solo trabajador puede recoger de la cola de tareas. Al cambiar este parámetro, se ajusta el número de tareas que puede ejecutar un solo trabajador al mismo tiempo.
Puedes cambiar esta opción de configuración de Airflow anulándola. De forma predeterminada, la simultaneidad de los trabajadores se establece en función del número de instancias de tareas simultáneas ligeras que un trabajador puede admitir. Esto significa que su valor depende de los límites de recursos de los trabajadores.
El valor de simultaneidad de los trabajadores no depende del número de trabajadores de tu entorno.
El número de trabajadores y la simultaneidad de los trabajadores funcionan en combinación entre sí, y el rendimiento de tu entorno depende en gran medida de ambos parámetros. Puedes tener en cuenta los siguientes aspectos para elegir la combinación correcta:
Varias tareas rápidas que se ejecutan en paralelo. Puedes aumentar la simultaneidad de los trabajadores cuando haya tareas en la cola y tus trabajadores usen un porcentaje bajo de sus CPUs y memoria al mismo tiempo. Sin embargo, en determinadas circunstancias, es posible que la cola nunca se llene, lo que provocará que el autoescalado nunca se active. Si las tareas pequeñas terminan de ejecutarse cuando los nuevos trabajadores están listos, un trabajador puede encargarse de las tareas restantes y no habrá tareas para los trabajadores recién creados.
En estas situaciones, se recomienda aumentar el número mínimo de trabajadores y la simultaneidad de los trabajadores para evitar un escalado excesivo.
Varias tareas largas se ejecutan en paralelo. La alta simultaneidad de los trabajadores impide que el sistema escale el número de trabajadores. Si varias tareas consumen muchos recursos y tardan mucho en completarse, una alta simultaneidad de los trabajadores puede provocar que la cola nunca se llene y que todas las tareas las realice un solo trabajador, lo que provoca problemas de rendimiento. En estas situaciones, se recomienda aumentar el número máximo de trabajadores y reducir la simultaneidad de los trabajadores.
La importancia del paralelismo
Los programadores de Airflow controlan la programación de las ejecuciones de DAGs y de las tareas individuales de los DAGs. La opción de configuración [core]parallelism
Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se hayan cumplido todas las dependencias de estas tareas.
El paralelismo es un mecanismo de protección de Airflow que determina cuántas tareas se pueden ejecutar al mismo tiempo por cada programador, independientemente del número de trabajadores. El valor de paralelismo, multiplicado por el número de programadores de tu clúster, es el número máximo de instancias de tareas que puede poner en cola tu entorno.
Normalmente, [core]parallelism
se define como el producto de un número máximo de trabajadores y [celery]worker_concurrency
. También se ve afectada por el pool.
Puedes cambiar esta opción de configuración de Airflow anulándola. Para obtener más información sobre cómo ajustar las configuraciones de Airflow relacionadas con el escalado, consulta Configuración de escalado de Airflow.
Buscar configuraciones de entorno óptimas
La forma recomendada de solucionar los problemas de programación es consolidar las tareas pequeñas en tareas más grandes y distribuir las tareas de forma más uniforme a lo largo del tiempo. Además de optimizar el código de DAG, también puedes optimizar las configuraciones del entorno para que tenga capacidad suficiente para ejecutar varias tareas simultáneamente.
Por ejemplo, supongamos que consolidas las tareas de tu DAG tanto como sea posible, pero que limitar las tareas activas para distribuirlas de forma más uniforme a lo largo del tiempo no es la solución preferida para tu caso práctico concreto.
Puedes ajustar los parámetros de paralelismo, número de trabajadores y simultaneidad de los trabajadores para ejecutar el DAG dag_10_tasks_20_seconds_10
sin limitar las tareas activas. En este ejemplo, el DAG se ejecuta 10 veces y cada ejecución contiene 20 tareas pequeñas.
Si quieres ejecutarlos todos a la vez, sigue estos pasos:
Necesitarás un entorno de mayor tamaño, ya que controla los parámetros de rendimiento de la infraestructura gestionada de Cloud Composer de tu entorno.
Los trabajadores de Airflow deben poder ejecutar 20 tareas simultáneamente, lo que significa que debes definir la simultaneidad de los trabajadores en 20.
Los trabajadores necesitan suficiente CPU y memoria para gestionar todas las tareas. La simultaneidad de los trabajadores se ve afectada por la CPU y la memoria de los trabajadores, por lo que necesitarás al menos
worker_concurrency / 12
de CPU yleast worker_concurrency / 8
de memoria.Deberá aumentar el paralelismo para que coincida con la mayor simultaneidad de los trabajadores. Para que los trabajadores puedan recoger 20 tareas de la cola, el programador tendrá que programar esas 20 tareas primero.
Ajusta las configuraciones de tu entorno de la siguiente manera:
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Configuración del entorno.
Busca la configuración de Recursos > Cargas de trabajo y haz clic en Editar.
En la sección Worker (Trabajador), en el campo Memory (Memoria), especifica el nuevo límite de memoria para los trabajadores de Airflow. En este tutorial, usa 4 GB.
En el campo CPU, especifica el nuevo límite de CPU para los trabajadores de Airflow. En este tutorial, usa 2 vCPUs.
Guarda los cambios y espera varios minutos a que se reinicien los trabajadores de Airflow.
A continuación, anula las opciones de configuración de paralelismo y simultaneidad de trabajadores de Airflow:
Ve a la pestaña Anulaciones de configuración de Airflow.
Haz clic en Editar y, a continuación, en Añadir anulación de configuración de Airflow.
Anula la configuración de paralelismo:
Sección Clave Valor core
parallelism
20
Haz clic en Añadir anulación de configuración de Airflow y anula la configuración de concurrencia de los workers:
Sección Clave Valor celery
worker_concurrency
20
Haz clic en Guardar y espera a que el entorno actualice su configuración.
Vuelve a activar el mismo DAG de ejemplo con las configuraciones ajustadas:
En la interfaz de usuario de Airflow, ve a la página DAGs (DAGs).
Busca el
dag_10_tasks_20_seconds_10
DAG y elimínalo.Una vez que se haya eliminado el DAG, Airflow comprobará la carpeta DAGs del bucket de tu entorno y volverá a ejecutar el DAG automáticamente.
Una vez que se hayan completado las ejecuciones del DAG, vuelve a revisar el histograma de registros. En el diagrama, puedes ver que el dag_10_tasks_20_seconds_10
ejemplo con más tareas consolidadas no ha generado ningún error ni advertencia al ejecutarse con la configuración de entorno ajustada. Compara los resultados con los datos anteriores del diagrama, donde el mismo ejemplo generó errores y advertencias al ejecutarse con la configuración de entorno predeterminada.

Las configuraciones de entorno y de Airflow desempeñan un papel fundamental en la programación de tareas. Sin embargo, no es posible aumentar las configuraciones más allá de ciertos límites.
Te recomendamos que optimices el código DAG, consolides las tareas y uses la programación para optimizar el rendimiento y la eficiencia.
Ejemplo: errores de análisis de DAG y latencia debido a un código de DAG complejo
En este ejemplo, investigas la latencia de análisis de un DAG de ejemplo que imita un exceso de variables de Airflow.
Crear una variable de Airflow
Antes de subir el código de muestra, crea una variable de Airflow.
En la Google Cloud consola, ve a la página Entornos.
En la columna Servidor web de Airflow, siga el enlace Airflow de su entorno.
Vaya a Administrar > Variables > Añadir un nuevo registro.
Establece los valores siguientes:
- Tecla:
example_var
- Valor:
test_airflow_variable
- Tecla:
Subir el DAG de ejemplo a tu entorno
Sube el siguiente DAG de ejemplo al entorno que has creado en los pasos anteriores. En este tutorial, el DAG se llama dag_for_loop_airflow_variable
.
Este DAG contiene un bucle for que se ejecuta 1000 veces e imita un exceso de variables de Airflow. En cada iteración se lee la variable example_var
y se genera una tarea. Cada tarea contiene un comando que imprime el valor de la variable.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnosticar los problemas de análisis
El tiempo de análisis de DAG es el tiempo que tarda el programador de Airflow en leer un archivo de DAG y analizarlo. Para que el programador de Airflow pueda programar cualquier tarea de un DAG, debe analizar el archivo de DAG para descubrir la estructura del DAG y las tareas definidas.
Si un DAG tarda mucho en analizarse, consumirá la capacidad del programador y podría reducir el rendimiento de las ejecuciones de DAGs.
Para monitorizar el tiempo de análisis de los DAGs, siga estos pasos:
Ejecuta el
dags report
comando de la CLI de Airflow en gcloud CLI para ver el tiempo de análisis de todos tus DAGs:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Haz los cambios siguientes:
ENVIRONMENT_NAME
: el nombre de tu entorno.LOCATION
: la región en la que se encuentra el entorno.
En el resultado del comando, busca el valor de duración del DAG
dag_for_loop_airflow_variables
. Un valor alto puede indicar que este DAG no se ha implementado de forma óptima. Si tienes varios DAGs, puedes identificar en la tabla de resultados cuáles tienen un tiempo de análisis prolongado.Ejemplo:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspecciona los tiempos de análisis de DAG en la Google Cloud consola:
- En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, a continuación, a Todos los registros > Administrador del procesador de DAG.
Revisa los
dag-processor-manager
registros e identifica posibles problemas.Figura 13. Los registros del administrador del procesador de DAG muestran los tiempos de análisis de DAG (haga clic para ampliar)
Si el tiempo total de análisis de los DAG supera los 10 segundos, es posible que los programadores estén sobrecargados con el análisis de los DAG y no puedan ejecutarlos de forma eficaz.
Optimizar el código DAG
Te recomendamos que evites el código Python de nivel superior innecesario en tus DAGs. Los DAGs con muchas importaciones, variables y funciones fuera del DAG aumentan los tiempos de análisis del programador de Airflow. Esto reduce el rendimiento y la escalabilidad de Cloud Composer y Airflow. Si se leen demasiadas variables de Airflow, el tiempo de análisis será largo y la carga de la base de datos será alta. Si este código está en un archivo DAG, estas funciones se ejecutan en cada latido del programador, lo que puede ser lento.
Los campos de plantilla de Airflow te permiten incorporar valores de variables de Airflow y plantillas de Jinja en tus DAGs. De esta forma, se evita la ejecución innecesaria de funciones durante los latidos del programador.
Para implementar el ejemplo de DAG de una forma más adecuada, no uses variables de Airflow en el código Python de nivel superior de los DAGs. En su lugar, pasa las variables de Airflow a los operadores a través de una plantilla Jinja, lo que retrasará la lectura del valor hasta que se ejecute la tarea.
Sube la nueva versión del DAG de ejemplo a tu entorno. En este tutorial, el DAG se llama dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspecciona el nuevo tiempo de análisis del DAG:
Espera a que se complete la ejecución del DAG.
Vuelve a ejecutar el comando
dags report
para ver el tiempo de análisis de todos tus DAGs:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Vuelve a revisar los registros de
dag-processor-manager
y analiza la duración del análisis.Imagen 14. Los registros del administrador del procesador de DAG muestran los tiempos de análisis de DAG después de que se optimizara el código de DAG (haz clic para ampliar)
Al sustituir las variables de entorno por plantillas de Airflow, simplificaste el código del DAG y redujiste la latencia de análisis unas diez veces.
Optimizar las configuraciones del entorno de Airflow
El programador de Airflow intenta constantemente activar nuevas tareas y analiza todos los DAGs de tu bucket de entorno. Si tus DAGs tienen un tiempo de análisis prolongado y el programador consume muchos recursos, puedes optimizar las configuraciones del programador de Airflow para que este use los recursos de forma más eficiente.
En este tutorial, los archivos DAG tardan mucho en analizarse y los ciclos de análisis empiezan a superponerse, lo que agota la capacidad del programador. En nuestro ejemplo, el primer DAG tarda más de 5 segundos en analizarse, por lo que configuraremos el programador para que se ejecute con menos frecuencia y así usar los recursos de forma más eficiente. Se anulará la opción de configuración de Airflow scheduler_heartbeat_sec
. Esta configuración define la frecuencia con la que debe ejecutarse el programador (en segundos). De forma predeterminada, el valor es de 5 segundos.
Puedes cambiar esta opción de configuración de Airflow anulándola.
Anula la opción de configuración de scheduler_heartbeat_sec
Airflow:
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Anulaciones de configuración de Airflow.
Haz clic en Editar y, a continuación, en Añadir anulación de configuración de Airflow.
Anula la opción de configuración de Airflow:
Sección Clave Valor scheduler
scheduler_heartbeat_sec
10
Haz clic en Guardar y espera a que el entorno actualice su configuración.
Consulta las métricas del programador:
Ve a la pestaña Monitorización y selecciona Programadores.
En el gráfico Latido de programador, haga clic en el botón Más opciones (tres puntos) y, a continuación, en Ver en Explorador de métricas.

En el gráfico, verás que el programador se ejecuta dos veces con menos frecuencia después de cambiar la configuración predeterminada de 5 a 10 segundos. Al reducir la frecuencia de los latidos, te aseguras de que el programador no empiece a ejecutarse mientras esté en curso el ciclo de análisis anterior y de que no se agote la capacidad de recursos del programador.
Asignar más recursos al programador
En Cloud Composer 2, puedes asignar más recursos de CPU y memoria al programador. De esta forma, puedes aumentar el rendimiento de tu programador y acelerar el tiempo de análisis de tu DAG.
Asigna más CPU y memoria al programador:
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Configuración del entorno.
Busca la configuración de Recursos > Cargas de trabajo y haz clic en Editar.
En la sección Programador, en el campo Memoria, especifica el nuevo límite de memoria. En este tutorial, usa 4 GB.
En el campo CPU, especifica el nuevo límite de CPU. En este tutorial, usa 2 vCPUs.
Guarda los cambios y espera varios minutos a que se reinicien los programadores de Airflow.
Ve a la pestaña Registros y, a continuación, a Todos los registros > Administrador del procesador de DAG.
Revisa los registros de
dag-processor-manager
y compara la duración del análisis de los DAG de ejemplo:Imagen 16. Los registros del administrador del procesador de DAG muestran los tiempos de análisis de DAG después de que se asignaran más recursos al programador (haga clic para ampliar)
Al asignar más recursos al programador, has aumentado su capacidad y has reducido la latencia de análisis de forma significativa en comparación con las configuraciones predeterminadas del entorno. Con más recursos, el programador puede analizar los DAGs más rápido. Sin embargo, los costes asociados a los recursos de Cloud Composer también aumentarán. Además, no es posible aumentar los recursos más allá de un límite determinado.
Te recomendamos que asignes recursos solo después de haber implementado las posibles optimizaciones del código DAG y de la configuración de Airflow.
Limpieza
Para evitar que se apliquen cargos en tu cuenta de Google Cloud por los recursos utilizados en este tutorial, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.
Eliminar el proyecto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Eliminar recursos concretos
Si tienes previsto consultar varios tutoriales y guías de inicio rápido, reutilizar los proyectos puede ayudarte a no superar los límites de cuota de proyectos.
Elimina el entorno de Cloud Composer. También eliminarás el contenedor del entorno durante este procedimiento.
Siguientes pasos
- Optimizar el rendimiento y los costes del entorno
- Entornos de escalabilidad
- Solucionar problemas de DAGs