Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En este instructivo, se te guiará para diagnosticar y solucionar problemas de programación y análisis de tareas que provocan un funcionamiento incorrecto del programador, errores de análisis y latencia, y fallas en las tareas.
Introducción
El programador de Airflow se ve afectado principalmente por dos factores: la programación de tareas y el análisis de DAG. Los problemas en uno de esos factores pueden tener un impacto negativo en el rendimiento y el estado del entorno.
A veces, se programan demasiadas tareas de forma simultánea. En esta situación, la cola se llena y las tareas permanecen en el estado "programado" o se reprograman después de ponerse en cola, lo que puede provocar fallas en las tareas y latencia en el rendimiento.
Otro problema común son la latencia del análisis y los errores causados por la complejidad del código de un DAG. Por ejemplo, un código de DAG que contiene variables de Airflow en el nivel superior del código puede provocar retrasos en el análisis, sobrecarga de la base de datos, fallas en la programación y tiempos de espera agotados del DAG.
En este instructivo, diagnosticarás los DAG de ejemplo y aprenderás a solucionar problemas de programación y análisis, mejorar la programación de DAGs y optimizar la configuración del entorno y el código de tus DAGs para mejorar el rendimiento.
Objetivos
En esta sección, se enumeran los objetivos de los ejemplos de este instructivo.
Ejemplo: Mal funcionamiento del programador y latencia causada por una alta simultaneidad de tareas
Sube el DAG de muestra que se ejecuta varias veces de forma simultánea y diagnostica el mal funcionamiento del programador y los problemas de latencia con Cloud Monitoring.
Consolida las tareas para optimizar el código de tu DAG y evalúa el impacto en el rendimiento.
Distribuye las tareas de manera más uniforme a lo largo del tiempo y evalúa el impacto en el rendimiento.
Optimiza la configuración de Airflow y del entorno, y evalúa el impacto.
Ejemplo: Errores de análisis y latencia del DAG causados por código complejo
Sube el DAG de muestra con variables de Airflow y diagnostica problemas de análisis con Cloud Monitoring.
Optimiza el código del DAG evitando las variables de Airflow en el nivel superior del código y evalúa el impacto en el tiempo de análisis.
Optimiza la configuración de Airflow y del entorno, y evalúa el impacto en el tiempo de análisis.
Costos
En este instructivo, se usan los siguientes componentes facturables de Google Cloud:
- Cloud Composer (consulta los costos adicionales)
- Cloud Monitoring
Cuando finalices este instructivo, puedes borrar los recursos creados para evitar que se te siga facturando. Para obtener más información, consulta Realiza una limpieza.
Antes de comenzar
En esta sección, se describen las acciones que debes realizar antes de comenzar el instructivo.
Crea y configura un proyecto
Para este instructivo, necesitas un Google Cloud proyecto. Configura el proyecto de la siguiente manera:
En la Google Cloud consola, selecciona o crea un proyecto:
Asegúrate de tener habilitada la facturación para tu proyecto. Obtén información para verificar si la facturación está habilitada en un proyecto.
Asegúrate de que el usuario del proyecto Google Cloud tenga los siguientes roles para crear los recursos necesarios:
- Administrador de objetos de almacenamiento y entorno
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador de Compute (
roles/compute.admin
)
- Administrador de objetos de almacenamiento y entorno
(
Habilita las API para tu proyecto.
Enable the Cloud Composer API.
Crea tu entorno de Cloud Composer
Crea un entorno de Cloud Composer 2.
Como parte de la creación del entorno, le otorgas el rol Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) a la cuenta del agente de servicio de Composer. Cloud Composer usa esta cuenta para realizar operaciones en tu proyecto Google Cloud .
Ejemplo: Mal funcionamiento del programador y falla de la tarea debido a problemas de programación
En este ejemplo, se muestra cómo depurar el mal funcionamiento del programador y la latencia causada por la alta simultaneidad de tareas.
Sube el DAG de muestra a tu entorno
Sube el siguiente DAG de ejemplo al entorno que creaste en los pasos anteriores. En este instructivo, este DAG se llama dag_10_tasks_200_seconds_1
.
Este DAG tiene 200 tareas. Cada tarea espera 1 segundo y muestra el mensaje "Complete!". El DAG se activa automáticamente una vez que se sube. Cloud Composer ejecuta este DAG 10 veces, y todas las ejecuciones del DAG se realizan en paralelo.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnosticar los problemas de funcionamiento del programador y de errores de tareas
Una vez que se completen las ejecuciones del DAG, abre la IU de Airflow y haz clic en el DAG dag_10_tasks_200_seconds_1
. Verás que se completaron 10 ejecuciones de DAG en total y que cada una tiene 200 tareas completadas.
Revisa los registros de tareas de Airflow:
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Trabajadores > Ver en el Explorador de registros.
En el histograma de registros, puedes ver los errores y las advertencias indicados con colores rojo y naranja:

El DAG de ejemplo generó alrededor de 130 advertencias y 60 errores. Haz clic en cualquier columna que contenga barras amarillas y rojas. Verás algunas de las siguientes advertencias y errores en los registros:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Estos registros pueden indicar que el uso de recursos superó los límites y que el trabajador se reinició.
Si una tarea de Airflow permanece en la cola durante demasiado tiempo, el programador la marca como fallida y como apta para reintentarse, y la reprograma para su ejecución. Una forma de observar los síntomas de esta situación es mirar el gráfico con la cantidad de tareas en cola. Si los picos en este gráfico no disminuyen en aproximadamente 10 minutos, es probable que se produzcan fallas en las tareas (sin registros).
Revisa la información de supervisión:
Ve a la pestaña Monitoring y selecciona Overview.
Revisa el gráfico de tareas de Airflow.
Figura 2. Gráfico de tareas de Airflow (haz clic para ampliar) En el gráfico de tareas de Airflow, se observa un aumento repentino en las tareas en cola que dura más de 10 minutos, lo que podría significar que no hay suficientes recursos en tu entorno para procesar todas las tareas programadas.
Revisa el gráfico de Trabajadores activos:
Figura 3: Gráfico de trabajadores activos (haz clic para ampliar) El gráfico Trabajadores activos indica que el DAG activó el ajuste de escala automático hasta el límite máximo permitido de tres trabajadores durante la ejecución del DAG.
Los gráficos de uso de recursos pueden indicar la falta de capacidad en los trabajadores de Airflow para ejecutar tareas en cola. En la pestaña Supervisión, selecciona Trabajadores y revisa los gráficos de Uso total de CPU de trabajadores y Uso total de memoria de trabajadores.
Figura 4. Gráfico de uso total de CPU de los trabajadores (haz clic para ampliar) Figura 5. Gráfico del uso total de la memoria de los trabajadores (haz clic para ampliar) Los gráficos indican que la ejecución de demasiadas tareas de forma simultánea provocó que se alcanzara el límite de CPU. Los recursos se habían usado durante más de 30 minutos, lo que es incluso más que la duración total de 200 tareas en 10 ejecuciones de DAG que se ejecutan una por una.
Estos son los indicadores de que la cola se está llenando y de que faltan recursos para procesar todas las tareas programadas.
Consolida tus tareas
El código actual crea muchos DAG y tareas sin recursos suficientes para procesar todas las tareas en paralelo, lo que provoca que se llene la cola. Si las tareas permanecen en la cola durante demasiado tiempo, es posible que se reprogramen o fallen. En tales casos, debes elegir una cantidad menor de tareas más consolidadas.
El siguiente DAG de ejemplo cambia la cantidad de tareas del ejemplo inicial de 200 a 20 y aumenta el tiempo de espera de 1 a 10 segundos para imitar tareas más consolidadas que realizan la misma cantidad de trabajo.
Sube el siguiente DAG de muestra al entorno que creaste. En este instructivo, este DAG se llama dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evalúa el impacto de las tareas más consolidadas en los procesos de programación:
Espera hasta que se completen las ejecuciones del DAG.
En la IU de Airflow, en la página DAGs, haz clic en el DAG
dag_10_tasks_20_seconds_10
. Verás 10 ejecuciones de DAG, cada una con 20 tareas completadas correctamente.En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Trabajadores > Ver en el Explorador de registros.
El segundo ejemplo, con tareas más consolidadas, generó aproximadamente 10 advertencias y 7 errores. En el histograma, puedes comparar la cantidad de errores y advertencias en el ejemplo inicial (valores anteriores) y en el segundo ejemplo (valores posteriores).
Figura 6: Histograma de registros de trabajadores de Airflow después de que se consolidaron las tareas (haz clic para ampliar) Cuando comparas el primer ejemplo con el más consolidado, puedes ver que hay muchos menos errores y advertencias en el segundo ejemplo. Sin embargo, los mismos errores relacionados con el cierre en caliente siguen apareciendo en los registros debido a la sobrecarga de recursos.
En la pestaña Supervisión, selecciona Trabajadores y revisa los gráficos.
Cuando comparas el gráfico de tareas de Airflow del primer ejemplo (valores anteriores) con el gráfico del segundo ejemplo con más tareas consolidadas, puedes ver que el aumento repentino de las tareas en cola duró un período más corto cuando las tareas estaban más consolidadas. Sin embargo, duró cerca de 10 minutos, lo que sigue siendo subóptimo.
Figura 7: Gráfico de tareas de Airflow después de que se consolidaron las tareas (haz clic para ampliar) En el gráfico de trabajadores activos, puedes ver que el primer ejemplo (a la izquierda del gráfico) usó recursos durante un período mucho más prolongado que el segundo, aunque ambos ejemplos imitan la misma cantidad de trabajo.
Figura 8. Gráfico de trabajadores activos después de que se consolidaron las tareas (haz clic para ampliar) Revisa los gráficos de consumo de recursos de los trabajadores. Aunque la diferencia entre los recursos utilizados en el ejemplo con más tareas consolidadas y el ejemplo inicial es bastante significativa, el uso de CPU sigue aumentando hasta el 70% del límite.
Figura 9. Gráfico del uso total de CPU de los trabajadores después de que se consolidaron las tareas (haz clic para ampliar) Figura 10. Gráfico del uso total de memoria de los trabajadores después de que se consolidaron las tareas (haz clic para ampliar)
Distribuye las tareas de manera más uniforme a lo largo del tiempo
Si hay demasiadas tareas simultáneas, la cola se llenará, lo que provocará que las tareas queden atascadas en la cola o se reprogramen. En los pasos anteriores, disminuiste la cantidad de tareas consolidándolas. Sin embargo, los registros de salida y la supervisión indicaron que la cantidad de tareas simultáneas sigue siendo subóptima.
Puedes controlar la cantidad de ejecuciones de tareas simultáneas implementando una programación o estableciendo límites para la cantidad de tareas que se pueden ejecutar de forma simultánea.
En este instructivo, distribuirás las tareas de manera más uniforme a lo largo del tiempo agregando parámetros a nivel del DAG en el DAG dag_10_tasks_20_seconds_10
:
Agrega el argumento
max_active_runs=1
al administrador de contexto del DAG. Este argumento establece un límite de solo una instancia de una ejecución de DAG en un momento determinado.Agrega el argumento
max_active_tasks=5
al administrador de contexto del DAG. Este argumento controla la cantidad máxima de instancias de tareas que se pueden ejecutar de forma simultánea en cada DAG.
Sube el siguiente DAG de muestra al entorno que creaste. En este instructivo, este DAG se llama dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evalúa el impacto de la distribución de tareas a lo largo del tiempo en los procesos de programación:
Espera hasta que se completen las ejecuciones del DAG.
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Trabajadores > Ver en el Explorador de registros.
En el histograma, puedes ver que el tercer DAG, con una cantidad limitada de tareas y ejecuciones activas, no generó ninguna advertencia ni error, y la distribución de los registros parece más uniforme en comparación con los valores anteriores.
Figura 11. Histograma de registros de trabajadores de Airflow después de que las tareas se consolidaron y distribuyeron a lo largo del tiempo (haz clic para ampliar)
Las tareas del ejemplo de dag_10_tasks_20_seconds_10_scheduled
que tiene una cantidad limitada de tareas activas y ejecuciones no causaron presión de recursos porque las tareas se pusieron en cola de manera uniforme.
Después de realizar los pasos descritos, optimizaste el uso de recursos consolidando tareas pequeñas y distribuyéndolas de manera más uniforme a lo largo del tiempo.
Optimiza la configuración del entorno
Puedes ajustar la configuración de tu entorno para asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar tareas en cola.
Cantidad de trabajadores y simultaneidad de trabajadores
Puedes ajustar la cantidad máxima de trabajadores para que Cloud Composer escale automáticamente tu entorno dentro de los límites establecidos.
El parámetro [celery]worker_concurrency
define la cantidad máxima de tareas que un solo trabajador puede tomar de la lista de tareas en cola. Cambiar este parámetro ajusta la cantidad de tareas que un solo trabajador puede ejecutar al mismo tiempo.
Puedes cambiar esta opción de configuración de Airflow anulándola. De forma predeterminada, la simultaneidad del trabajador se establece en función de la cantidad de instancias de tareas simultáneas ligeras que un trabajador puede admitir. Esto significa que su valor depende de los límites de recursos del trabajador.
El valor de simultaneidad del trabajador no depende de la cantidad de trabajadores en tu entorno.
La cantidad de trabajadores y la simultaneidad de trabajadores funcionan en combinación, y el rendimiento de tu entorno depende en gran medida de ambos parámetros. Puedes usar las siguientes consideraciones para elegir la combinación correcta:
Varias tareas rápidas que se ejecutan en paralelo. Puedes aumentar la simultaneidad de los trabajadores cuando hay tareas en espera en la cola y tus trabajadores usan un porcentaje bajo de sus CPU y memoria al mismo tiempo. Sin embargo, en ciertas circunstancias, es posible que la cola nunca se llene, lo que hará que el ajuste de escala automático nunca se active. Si las tareas pequeñas finalizan su ejecución cuando los trabajadores nuevos están listos, un trabajador existente puede tomar las tareas restantes y no habrá tareas para los trabajadores recién creados.
En estas situaciones, se recomienda aumentar la cantidad mínima de trabajadores y la simultaneidad de trabajadores para evitar el escalamiento excesivo.
Varias tareas largas que se ejecutan en paralelo. La alta simultaneidad de trabajadores impide que el sistema escale la cantidad de trabajadores. Si varias tareas requieren muchos recursos y tardan mucho en completarse, una alta simultaneidad de trabajadores puede hacer que la cola nunca se llene y que solo un trabajador tome todas las tareas, lo que genera problemas de rendimiento. En estas situaciones, se recomienda aumentar la cantidad máxima de trabajadores y disminuir la simultaneidad de los trabajadores.
La importancia del paralelismo
Los programadores de Airflow controlan la programación de las ejecuciones del DAG y las tareas individuales de los DAG. La opción de configuración [core]parallelism
de Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se cumplen todas las dependencias para estas tareas.
El paralelismo es un mecanismo de protección de Airflow que determina cuántas tareas se pueden ejecutar al mismo tiempo por cada programador, independientemente de la cantidad de trabajadores. El valor de paralelismo, multiplicado por la cantidad de programadores en tu clúster, es la cantidad máxima de instancias de tareas que tu entorno puede poner en cola.
Por lo general, [core]parallelism
se establece como el producto de una cantidad máxima de trabajadores y [celery]worker_concurrency
. También se ve afectado por el pool.
Puedes cambiar esta opción de configuración de Airflow anulándola. Para obtener más información sobre cómo ajustar la configuración de Airflow relacionada con el ajuste de escala, consulta Configuración del ajuste de escala de Airflow.
Cómo encontrar configuraciones óptimas del entorno
La forma recomendada de solucionar los problemas de programación es consolidar las tareas pequeñas en tareas más grandes y distribuir las tareas de manera más uniforme a lo largo del tiempo. Además de optimizar el código del DAG, también puedes optimizar la configuración del entorno para tener capacidad suficiente para ejecutar varias tareas de forma simultánea.
Por ejemplo, supongamos que consolidas las tareas en tu DAG tanto como sea posible, pero limitar las tareas activas para distribuirlas de manera más uniforme a lo largo del tiempo no es una solución preferida para tu caso de uso específico.
Puedes ajustar los parámetros de paralelismo, cantidad de trabajadores y simultaneidad de trabajadores para ejecutar el DAG de dag_10_tasks_20_seconds_10
sin limitar las tareas activas. En este ejemplo, el DAG se ejecuta 10 veces y cada ejecución contiene 20 tareas pequeñas.
Si quieres ejecutarlos todos de forma simultánea, haz lo siguiente:
Necesitarás un tamaño de entorno más grande, ya que controla los parámetros de rendimiento de la infraestructura administrada de Cloud Composer de tu entorno.
Los trabajadores de Airflow deben poder ejecutar 20 tareas de forma simultánea, lo que significa que debes establecer la simultaneidad de los trabajadores en 20.
Los trabajadores necesitan suficiente CPU y memoria para controlar todas las tareas. La simultaneidad de los trabajadores se ve afectada por la CPU y la memoria de los trabajadores, por lo que necesitarás al menos
worker_concurrency / 12
de CPU yleast worker_concurrency / 8
de memoria.Deberás aumentar el paralelismo para que coincida con la mayor simultaneidad de trabajadores. Para que los trabajadores puedan tomar 20 tareas de la cola, el programador deberá programar esas 20 tareas primero.
Ajusta la configuración de tu entorno de la siguiente manera:
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Configuración del entorno.
Busca la configuración de Recursos > Cargas de trabajo y haz clic en Editar.
En la sección Trabajador, en el campo Memoria, especifica el nuevo límite de memoria para los trabajadores de Airflow. En este instructivo, usa 4 GB.
En el campo CPU, especifica el nuevo límite de CPU para los trabajadores de Airflow. En este instructivo, usa 2 CPU virtuales.
Guarda los cambios y espera varios minutos para que se reinicien los trabajadores de Airflow.
A continuación, anula las opciones de configuración de Airflow para el paralelismo y la simultaneidad de los trabajadores:
Ve a la pestaña Anulaciones de configuración de Airflow.
Haz clic en Editar y, luego, en Agregar anulación de configuración de Airflow.
Anula la configuración de paralelismo:
Sección Clave Valor core
parallelism
20
Haz clic en Agregar anulación de configuración de Airflow y anula la configuración de simultaneidad del trabajador:
Sección Clave Valor celery
worker_concurrency
20
Haz clic en Guardar y espera a que el entorno actualice su configuración.
Vuelve a activar el mismo DAG de ejemplo con los parámetros de configuración ajustados:
En la IU de Airflow, ve a la página DAGs.
Busca el DAG
dag_10_tasks_20_seconds_10
y bórralo.Después de que se borra el DAG, Airflow verifica la carpeta de los DAG en el bucket de tu entorno y vuelve a ejecutar el DAG automáticamente.
Una vez que se completen las ejecuciones del DAG, vuelve a revisar el histograma de registros. En el diagrama, puedes ver que el ejemplo de dag_10_tasks_20_seconds_10
con más tareas consolidadas no generó ningún error ni advertencia cuando se ejecutó con la configuración del entorno ajustada. Compara los resultados con los datos anteriores del diagrama, en los que el mismo ejemplo generó errores y advertencias cuando se ejecutó con la configuración predeterminada del entorno.

Las configuraciones del entorno y de Airflow desempeñan un papel fundamental en la programación de tareas. Sin embargo, no es posible aumentar las configuraciones más allá de ciertos límites.
Recomendamos optimizar el código del DAG, consolidar tareas y usar la programación para optimizar el rendimiento y la eficiencia.
Ejemplo: Errores de análisis y latencia de DAG debido a código de DAG complejo
En este ejemplo, investigarás la latencia del análisis de un DAG de muestra que imita un exceso de variables de Airflow.
Crea una nueva variable de Airflow
Antes de subir el código de muestra, crea una variable de Airflow nueva.
En la consola de Google Cloud , ve a la página Entornos.
En la columna Servidor web de Airflow, sigue el vínculo de Airflow para tu entorno.
Ve a Admin > Variables > Add a new record.
Configura los siguientes valores:
- clave:
example_var
- val:
test_airflow_variable
- clave:
Sube el DAG de muestra a tu entorno
Sube el siguiente DAG de ejemplo al entorno que creaste en los pasos anteriores. En este instructivo, este DAG se llama dag_for_loop_airflow_variable
.
Este DAG contiene un bucle for que se ejecuta 1,000 veces y simula un exceso de variables de Airflow. Cada iteración lee la variable example_var
y genera una tarea. Cada tarea contiene un comando que imprime el valor de la variable.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnostica los problemas de análisis
El tiempo de análisis del DAG es la cantidad de tiempo que tarda el programador de Airflow en leer un archivo DAG y analizarlo. Antes de que el programador de Airflow pueda programar cualquier tarea desde un DAG, el programador debe analizar el archivo del DAG para descubrir la estructura del DAG y las tareas definidas.
Si un DAG tarda mucho en analizarse, esto consume la capacidad del programador y podría reducir el rendimiento de las ejecuciones del DAG.
Para supervisar el tiempo de análisis del DAG, haz lo siguiente:
Ejecuta el comando de la CLI de Airflow
dags report
en gcloud CLI para ver el tiempo de análisis de todos tus DAG:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Reemplaza lo siguiente:
ENVIRONMENT_NAME
: Es el nombre de tu entorno.LOCATION
: Es la región en la que se encuentra el entorno.
En el resultado del comando, busca el valor de duración del DAG
dag_for_loop_airflow_variables
. Un valor alto puede indicar que este DAG no se implementa de manera óptima. Si tienes varios DAG, en la tabla de resultados, puedes identificar cuáles tienen un tiempo de análisis prolongado.Ejemplo:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspecciona los tiempos de análisis del DAG en la consola de Google Cloud :
- En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Administrador de procesadores de DAG.
Revisa los registros de
dag-processor-manager
y detecta posibles problemas.Figura 13. Los registros del administrador del procesador de DAG muestran los tiempos de análisis del DAG (haz clic para ampliar)
Si el tiempo total de análisis del DAG supera los 10 segundos, es posible que tus programadores estén sobrecargados con el análisis del DAG y no puedan ejecutar los DAG de manera eficaz.
Optimiza el código del DAG
Se recomienda evitar el código Python innecesario de "nivel superior" en tus DAG. Los DAGs con muchas importaciones, variables y funciones fuera del DAG generan tiempos de análisis más largos para el programador de Airflow. Esto reduce el rendimiento y la escalabilidad de Cloud Composer y Airflow. El exceso de lectura de variables de Airflow genera un tiempo de análisis prolongado y una carga alta de la base de datos. Si este código se encuentra en un archivo DAG, estas funciones se ejecutan en cada latido del programador, lo que puede ser lento.
Los campos de plantilla de Airflow te permiten incorporar valores de variables de Airflow y plantillas de Jinja en tus DAG. Esto evita la ejecución innecesaria de funciones durante los latidos del programador.
Para implementar el ejemplo de DAG de una mejor manera, evita usar variables de Airflow en el código de Python de nivel superior de los DAG. En cambio, pasa las variables de Airflow a los operadores existentes a través de una plantilla de Jinja, lo que retrasará la lectura del valor hasta la ejecución de la tarea.
Sube la nueva versión del DAG de ejemplo a tu entorno. En este instructivo, este DAG se llama dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspecciona el nuevo tiempo de análisis del DAG:
Espera hasta que se complete la ejecución del DAG.
Vuelve a ejecutar el comando
dags report
para ver el tiempo de análisis de todos tus DAG:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Vuelve a revisar los registros de
dag-processor-manager
y analiza la duración del análisis.Figura 14. Los registros del administrador de procesadores de DAG muestran los tiempos de análisis del DAG después de que se optimizó el código del DAG (haz clic para ampliar).
Al reemplazar las variables de entorno por plantillas de Airflow, simplificaste el código del DAG y redujiste la latencia de análisis en aproximadamente diez veces.
Optimiza la configuración del entorno de Airflow
El programador de Airflow intenta constantemente activar tareas nuevas y analiza todos los DAG en tu bucket de entorno. Si tus DAG tienen un tiempo de análisis prolongado y el programador consume muchos recursos, puedes optimizar la configuración del programador de Airflow para que este use los recursos de manera más eficiente.
En este instructivo, los archivos DAG tardan mucho en analizarse, y los ciclos de análisis comienzan a superponerse, lo que agota la capacidad del programador. En nuestro ejemplo, el primer DAG de ejemplo tarda más de 5 segundos en analizarse, por lo que configurarás el programador para que se ejecute con menos frecuencia y, así, usar los recursos de manera más eficiente. Anularás la opción de configuración de Airflow scheduler_heartbeat_sec
. Esta configuración define la frecuencia con la que se debe ejecutar el programador (en segundos). De forma predeterminada, el valor se establece en 5 segundos.
Puedes cambiar esta opción de configuración de Airflow anulándola.
Anula la opción de configuración de Airflow scheduler_heartbeat_sec
:
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Anulaciones de configuración de Airflow.
Haz clic en Editar y, luego, en Agregar anulación de configuración de Airflow.
Anula la opción de configuración de Airflow:
Sección Clave Valor scheduler
scheduler_heartbeat_sec
10
Haz clic en Guardar y espera a que el entorno actualice su configuración.
Verifica las métricas del programador:
Ve a la pestaña Monitoring y selecciona Schedulers.
En el gráfico Latido del programador, haz clic en el botón Más opciones (tres puntos) y, luego, en Ver en el Explorador de métricas.

En el gráfico, verás que el programador se ejecuta dos veces con menos frecuencia después de que cambiaste la configuración predeterminada de 5 a 10 segundos. Si reduces la frecuencia de los latidos, te aseguras de que el programador no comience a ejecutarse mientras el ciclo de análisis anterior está en curso y la capacidad de recursos del programador no se agota.
Asigna más recursos al programador
En Cloud Composer 2, puedes asignar más recursos de CPU y memoria al programador. De esta manera, puedes aumentar el rendimiento de tu programador y acelerar el tiempo de análisis de tu DAG.
Asigna CPU y memoria adicionales al programador:
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Configuración del entorno.
Busca la configuración de Recursos > Cargas de trabajo y haz clic en Editar.
En la sección Programador, en el campo Memoria, especifica el nuevo límite de memoria. En este instructivo, usa 4 GB.
En el campo CPU, especifica el nuevo límite de CPU. En este instructivo, usa 2 CPU virtuales.
Guarda los cambios y espera varios minutos para que se reinicien los programadores de Airflow.
Ve a la pestaña Registros y, luego, a Todos los registros > Administrador de procesadores de DAG.
Revisa los registros de
dag-processor-manager
y compara la duración del análisis de los DAG de ejemplo:Figura 16: Los registros del administrador del procesador de DAG muestran los tiempos de análisis del DAG después de que se asignaron más recursos al programador (haz clic para ampliar).
Al asignar más recursos al programador, aumentaste su capacidad y redujiste la latencia del análisis de manera significativa en comparación con las configuraciones predeterminadas del entorno. Con más recursos, el programador puede analizar los DAG más rápido. Sin embargo, también aumentarán los costos asociados con los recursos de Cloud Composer. Además, no es posible aumentar los recursos más allá de un cierto límite.
Te recomendamos que asignes recursos solo después de que se implementen las posibles optimizaciones del código del DAG y la configuración de Airflow.
Limpia
Para evitar que se apliquen cargos a tu Google Cloud cuenta por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Borra los recursos individuales
Si planeas explorar varios instructivos y guías de inicio rápido, la reutilización de proyectos puede ayudarte a evitar exceder los límites de las cuotas del proyecto.
Borra el entorno de Cloud Composer. También borrarás el bucket del entorno durante este procedimiento.