Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Este instructivo te proporciona una guía para diagnosticar y solucionar problemas relacionados con la programación de tareas y analizar problemas que provocan el mal funcionamiento del programador, el análisis de errores y latencias y fallas en las tareas.
Introducción
El programador de Airflow se ve afectado principalmente por dos factores: la programación de tareas y el análisis de DAG. Los problemas en uno de esos factores pueden tener un impacto negativo en el y el rendimiento del entorno.
A veces, se programan demasiadas tareas de forma simultánea. En esta situación, la fila se llena y las tareas permanecen en el estado “programado” o se vuelven a programar después de estar en fila, lo que puede causar fallas en las tareas y latencia de rendimiento.
Otro problema común es analizar la latencia y los errores causados por la complejidad de un código de DAG. Por ejemplo, un código de DAG que contiene variables de Airflow en el nivel superior del código puede generar demoras en el análisis, sobrecarga de la base de datos, fallas de programación y tiempos de espera de DAG.
En este instructivo, diagnosticarás los DAG de ejemplo y aprenderás a solucionar problemas de programación y análisis, mejorar la programación del DAG y optimizar tu código DAG y las configuraciones del entorno para mejorar el rendimiento.
Objetivos
En esta sección, se enumeran los objetivos en los ejemplos de este instructivo.
Ejemplo: Falla en el programador y latencia causadas por la simultaneidad de tareas alta
Sube el DAG de muestra que se ejecuta varias veces de forma simultánea y diagnostica los problemas de latencia y mal funcionamiento del programador con Cloud Monitoring.
Para optimizar tu código DAG, consolida las tareas y evalúa el impacto en el rendimiento.
Distribuye las tareas de manera más uniforme en el tiempo y evalúa el impacto en el rendimiento.
Optimiza tus parámetros de configuración de Airflow y los parámetros de configuración del entorno y evaluar el impacto.
Ejemplo: Latencia y errores de análisis del DAG causados por un código complejo
Sube el DAG de muestra con las variables de Airflow y diagnostica los problemas de análisis con Cloud Monitoring.
Para optimizar el código DAG, evita las variables de Airflow en el nivel superior de el código y evaluar su impacto en el tiempo de análisis.
Optimizar los parámetros de configuración y los parámetros de configuración del entorno de Airflow y evaluar el impacto en el tiempo de análisis.
Costos
En este instructivo, se usan los siguientes componentes facturables de Google Cloud:
- Cloud Composer (consulta los costos adicionales)
- Cloud Monitoring
Cuando finalices este instructivo, puedes borrar los recursos creados para evitar que se te siga facturando. Para obtener más detalles, consulta Realiza una limpieza.
Antes de comenzar
En esta sección, se describen las acciones que debes realizar antes de comenzar el instructivo.
Crea y configura un proyecto
Para este instructivo, necesitas un proyecto de Google Cloud. Configura el proyecto de la siguiente manera:
En la consola de Google Cloud, selecciona o crea un proyecto:
Asegúrate de tener habilitada la facturación para tu proyecto. Obtén más información para verificar si la facturación está habilitada en un proyecto.
Asegúrate de que el usuario de tu proyecto de Google Cloud tenga los siguientes roles para crear los recursos necesarios:
- Administrador de objetos de almacenamiento y entorno
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador de Compute (
roles/compute.admin
)
- Administrador de objetos de almacenamiento y entorno
(
Habilita las API para tu proyecto.
Enable the Cloud Composer API.
Crea tu entorno de Cloud Composer
Crea un entorno de Cloud Composer 2.
Como parte de la creación del entorno, otorgas el rol Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) a la cuenta del agente de servicio de Composer. Cloud Composer usa esta cuenta para realizar operaciones en tu proyecto de Google Cloud.
Ejemplo: El programador funciona mal y la tarea falla debido a problemas de programación
En este ejemplo, se muestra el mal funcionamiento del programador y la latencia causada por la depuración debido a la alta simultaneidad de tareas.
Sube el DAG de muestra a tu entorno
Sube el siguiente DAG de muestra al entorno
que creaste en los pasos anteriores. En este instructivo, este DAG se llama
dag_10_tasks_200_seconds_1
Este DAG tiene 200 tareas. Cada tarea espera 1 segundo y, luego, imprime "Complete!". El DAG se activa automáticamente una vez que se sube. Cloud Composer ejecuta este DAG 10 veces, y todas las ejecuciones de DAG se realizan en paralelo.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnostica los problemas de funcionamiento incorrecto del programador y de fallas de tareas
Cuando se completen las ejecuciones del DAG, abre la IU de Airflow y haz clic en
DAG dag_10_tasks_200_seconds_1
. Verás que se ejecutaron 10 ejecuciones de DAG
correctamente, y cada uno tiene 200 tareas correctas.
Revisa los registros de tareas de Airflow:
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Trabajadores > Ver en el Explorador de registros.
En el histograma de registros, puedes ver los errores y las advertencias indicados con colores rojo y naranja:
El DAG de ejemplo generó alrededor de 130 advertencias y 60 errores. Haz clic en cualquiera que contiene barras amarillas y rojas. Verás algunos de los siguientes advertencias y errores en los registros:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Estos registros pueden indicar que el uso de recursos superó los límites se reinició a sí mismo.
Si una tarea de Airflow se mantiene en la cola por mucho tiempo, el programador la marcará como con errores y up_for_retry, y lo reprogramará otra vez para ejecución. Una forma de observar los síntomas de esta situación gráfico con la cantidad de tareas en cola y si los picos de este gráfico no en alrededor de 10 minutos, entonces es probable que las tareas fallen (sin registros).
Revisa la información de supervisión:
Ve a la pestaña Monitoring y selecciona Descripción general.
Revisa el gráfico Tareas de Airflow.
En el gráfico de tareas de Airflow, hay un aumento repentino en las tareas en cola que dura más de 10 minutos, lo que podría significar que no hay suficientes recursos en tu entorno para procesar todas las tareas programadas.
Revisa el gráfico Trabajadores activos:
En el gráfico Trabajadores activos, se indica que el DAG activó el ajuste de escala automático al límite máximo permitido de tres trabajadores durante la ejecución del DAG.
Los gráficos de uso de recursos pueden indicar la falta de capacidad en los trabajadores de Airflow. para ejecutar tareas en cola. En la pestaña Supervisión, selecciona Trabajadores y revisa los gráficos Uso total de CPU de los trabajadores y Uso total de memoria de los trabajadores.
Los gráficos indican que la ejecución de demasiadas tareas de forma simultánea resultó en alcanzar el límite de la CPU. Los recursos se habían usado durante más de 30 minutos, lo que es incluso más largo que la duración total de 200 tareas en 10 ejecuciones de DAG que se ejecutan una por una.
Estos son los indicadores de que la cola se está llenando y de la falta de recursos para procesar todas las tareas programadas.
Consolida tus tareas
El código actual crea muchos DAG y tareas sin recursos suficientes para procesar todas las tareas en paralelo, lo que hace que la cola se llene. Mantener las tareas en la cola durante demasiado tiempo puede provocar que se reprogramen o fallen. En estos casos, debe optar por una cantidad menor de campañas más consolidadas tareas.
En el siguiente DAG de muestra, se cambia la cantidad de tareas del ejemplo inicial de 200 a 20 y se aumenta el tiempo de espera de 1 a 10 segundos para imitar tareas más consolidadas que realizan la misma cantidad de trabajo.
Sube el siguiente DAG de muestra al entorno
que creaste. En este instructivo, este DAG se llama
dag_10_tasks_20_seconds_10
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evalúa el impacto de las tareas más consolidadas en los procesos de programación:
Espera a que se completen las ejecuciones de DAG.
En la IU de Airflow, en la página DAG, haz clic en
dag_10_tasks_20_seconds_10
DAG. Verás 10 ejecuciones de DAG, cada una con 20 tareas que tuvieron éxito.En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Workers > Ver en el Explorador de registros.
El segundo ejemplo con tareas más consolidadas dio como resultado aproximadamente 10 advertencias y 7 errores. En el histograma, puedes comparar la cantidad de errores y advertencias en el ejemplo inicial (valores anteriores) y en el segundo ejemplo (valores posteriores).
Si comparas el primer ejemplo con el más consolidado, puedes vea que hay muchos menos errores y advertencias en la segunda ejemplo. Sin embargo, los mismos errores relacionados con el cierre tibio siguen apareciendo en los registros debido a la sobrecarga de recursos.
En la pestaña Supervisión, selecciona Trabajadores y revisa los gráficos.
Cuando compares el gráfico Tareas de Airflow para el primer ejemplo (anteriormente, valores) con el gráfico del segundo ejemplo con tareas más consolidadas, puedes ver que el aumento repentino de las tareas en cola duró un período más corto en que las tareas estaban más consolidadas. Sin embargo, duró cerca de 10 minutos, lo cual sigue siendo subóptimo.
En el gráfico de trabajadores activos, puedes ver el primer ejemplo (a la izquierda). parte del gráfico) usaron recursos por un período de tiempo mucho más prolongado que el segundo, aunque ambos ejemplos imitan la misma cantidad de el trabajo.
Revisa los gráficos de consumo de recursos de los trabajadores. Aunque la diferencia entre los recursos que se usan en el ejemplo con tareas más consolidadas y el ejemplo inicial es bastante significativa, el uso de la CPU sigue aumentando hasta el 70% del límite.
Distribuye las tareas de manera más uniforme con el tiempo
Demasiadas tareas simultáneas hacen que la cola se llene, lo que genera que las tareas se bloqueen en la cola o se reprogramen. En los pasos anteriores, reduciste la cantidad de tareas consolidándolas. Sin embargo, los registros de salida y la supervisión indicaron que la cantidad de tareas simultáneas aún es poco óptima.
Puedes controlar la cantidad de ejecuciones simultáneas de tareas implementando un programa o establecer límites para la cantidad de tareas que se pueden ejecutar simultáneamente.
En este instructivo, distribuirás las tareas de manera más uniforme con el tiempo agregando parámetros a nivel del DAG al DAG dag_10_tasks_20_seconds_10
:
Agrega el argumento
max_active_runs=1
al administrador de contexto de DAG. Este argumento establece el límite de una sola instancia de ejecución de DAG en un momento determinado.Agrega el argumento
max_active_tasks=5
al administrador de contexto de DAG. Este argumento controla la cantidad máxima de instancias de tareas que pueden ejecutarse simultáneamente en cada DAG.
Sube el siguiente DAG de muestra al entorno
que creaste. En este instructivo, este DAG se llama
dag_10_tasks_20_seconds_10_scheduled.py
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evalúa el impacto de la distribución de tareas a lo largo del tiempo en los procesos de programación:
Espera a que se completen las ejecuciones de DAG.
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Workers > Ver en el Explorador de registros.
En el histograma, puedes ver que el tercer DAG con una cantidad limitada las tareas y ejecuciones activas no generaron advertencias ni errores, y el de registros se ve más uniforme en comparación con los valores anteriores.
Las tareas del ejemplo de dag_10_tasks_20_seconds_10_scheduled
que tienen un
la cantidad limitada de tareas y ejecuciones activas no generó presión sobre los recursos porque
las tareas se colocaron en cola de manera uniforme.
Después de realizar los pasos descritos, optimizaste el uso de recursos con el objetivo de consolidar tareas pequeñas y distribuirlas de manera más uniforme a lo largo del tiempo.
Optimiza los parámetros de configuración del entorno
Puedes ajustar la configuración de tu entorno para asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar tareas en cola.
Cantidad de trabajadores y simultaneidad de trabajadores
Puedes ajustar la cantidad máxima de trabajadores. para que Cloud Composer escale automáticamente tu entorno en los límites establecidos.
El parámetro [celery]worker_concurrency
define la cantidad máxima de tareas que un solo trabajador puede recuperar de la lista de tareas en cola. Si cambias este parámetro, se ajusta la cantidad de tareas que un solo trabajador puede ejecutar al mismo tiempo.
Puedes cambiar esta opción de configuración de Airflow si haces lo siguiente:
y anularla. De forma predeterminada, la simultaneidad del trabajador se establece en un mínimo de lo siguiente: 32, 12 * worker_CPU, 8 * worker_memory
, lo que significa que depende de los límites de recursos del trabajador. Consulta
Optimiza entornos para obtener más información sobre la configuración
valores de simultaneidad de los trabajadores.
La cantidad de trabajadores y la simultaneidad de los trabajadores funcionan en combinación entre sí, y el rendimiento de tu entorno depende en gran medida de ambos parámetros. Puedes tener en cuenta las siguientes consideraciones para elegir la combinación correcta:
Varias tareas rápidas que se ejecutan en paralelo. Puedes aumentar la simultaneidad de los trabajadores cuando hay tareas en espera en la cola y tus trabajadores usan un porcentaje bajo de sus CPUs y memoria al mismo tiempo. Sin embargo, en ciertas circunstancias, es posible que la cola nunca se llene, lo que provocará que el ajuste de escala automático nunca se active. Si las tareas pequeñas terminan de ejecutarse cuando los trabajadores nuevos están listos, un trabajador existente puede retomar las tareas restantes y no habrá tareas para los trabajadores creados recientemente.
En estas situaciones, se recomienda Aumentar la cantidad mínima de trabajadores y la simultaneidad de trabajadores para evitar un escalamiento excesivo.
Varias tareas largas que se ejecutan en paralelo La simultaneidad alta de trabajadores impide que el sistema escale la cantidad de trabajadores. Si hay varias tareas consumen muchos recursos y tardan mucho tiempo en completarse, una alta demanda la simultaneidad puede hacer que la cola nunca se rellene y que todas las tareas se que solo un trabajador recoge, lo que genera problemas de rendimiento. En estas situaciones, se recomienda aumentar la cantidad máxima de trabajadores y disminuir la simultaneidad de los trabajadores.
La importancia del paralelismo
Los programadores de Airflow controlan la programación de las ejecuciones de DAG y las tareas individuales desde
DAG. La opción [core]parallelism
de configuración de Airflow controla cuántos
que el programador de Airflow puede poner en cola en la cola del ejecutor después de todo
las dependencias para estas tareas.
El paralelismo es un mecanismo de protección de Airflow que determina cuántas tareas pueden ejecutarse al mismo tiempo para cada programador, independientemente de la cantidad de trabajadores. El valor de paralelismo, multiplicado por la cantidad de programadores en tu clúster, es la cantidad máxima de instancias de tareas que tu entorno puede poner en cola.
Por lo general, [core]parallelism
se establece como un producto de una cantidad máxima de trabajadores y [celery]worker_concurrency
. También se ve afectado por el
grupo.
Puedes cambiar esta opción de configuración de Airflow si la anulas. Para obtener más información sobre cómo ajustar Airflow
relacionadas con el escalamiento, consulta
Escala la configuración de Airflow.
Encuentra la configuración óptima del entorno
La forma recomendada de solucionar los problemas de programación es consolidar las tareas pequeñas en tareas más grandes y distribuirlas de manera más uniforme con el tiempo. Además de optimizar el código de DAG, también puedes optimizar las configuraciones de entorno para tener una capacidad suficiente para ejecutar varias tareas de forma simultánea.
Por ejemplo, supongamos que consolidas tareas en tu DAG. tanto como sea posible, pero limitando las tareas activas para distribuirlas de manera más uniforme el tiempo no es la solución preferida para tu caso de uso específico.
Puedes ajustar el paralelismo, la cantidad de trabajadores y los parámetros de simultaneidad de los trabajadores para ejecutar el DAG de dag_10_tasks_20_seconds_10
sin limitar las tareas activas. En este ejemplo, DAG ejecuta 10 veces y cada ejecución contiene 20 tareas pequeñas.
Si quieres ejecutarlos todos de forma simultánea, haz lo siguiente:
Necesitará un tamaño de entorno más grande, ya que controla el rendimiento. parámetros de la infraestructura administrada de Cloud Composer de tu entorno.
Los trabajadores de Airflow deben poder ejecutar 20 tareas de forma simultánea, lo que significa que debes establecer la simultaneidad de los trabajadores en 20.
Los trabajadores necesitan suficiente CPU y memoria para controlar todas las tareas. Trabajador la simultaneidad se ve afectada por la CPU y la memoria del trabajador, por lo que deberás al menos
worker_concurrency / 12
en CPU y alleast worker_concurrency / 8
en la memoria.Deberás aumentar el paralelismo para que coincida con la mayor simultaneidad de trabajadores. Para que los trabajadores puedan recoger 20 tareas de la cola, el programador primero debes programar esas 20 tareas.
Ajusta la configuración de tu entorno de la siguiente manera:
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Configuración del entorno.
Busca la configuración de Recursos > Cargas de trabajo y haz clic en Editar.
En la sección Trabajador, en el campo Memoria, especifica el nuevo límite de memoria para los trabajadores de Airflow. En este instructivo, usa 4 GB.
En el campo CPU, especifica el nuevo límite de CPU para los trabajadores de Airflow. En este instructivo, usa 2 vCPU.
Guarda los cambios y espera unos minutos para que los trabajadores de Airflow se reinicien.
A continuación, anula las opciones de configuración de paralelismo y simultaneidad de trabajadores de Airflow:
Ve a la pestaña Anulaciones de configuración de Airflow.
Haz clic en Editar y, luego, en Agregar anulación de configuración de Airflow.
Anula la configuración de paralelismo:
Sección Clave Valor core
parallelism
20
Haz clic en Agregar anulación de configuración de Airflow y anula la configuración de simultaneidad del trabajador:
Sección Clave Valor celery
worker_concurrency
20
Haz clic en Guardar y espera hasta que el entorno actualice su configuración.
Vuelve a activar el mismo DAG de ejemplo con la configuración ajustada:
En la IU de Airflow, ve a la página DAG.
Busca el DAG de
dag_10_tasks_20_seconds_10
y bórralo.Una vez que se borra el DAG, Airflow verifica la carpeta de DAG en tu en el bucket de tu entorno y lo vuelve a ejecutar automáticamente.
Una vez que se completen las ejecuciones de DAG, vuelve a revisar el histograma de registros. En el diagrama,
Puedes ver que el ejemplo de dag_10_tasks_20_seconds_10
con más
las tareas consolidadas no generaron ningún error ni advertencia al ejecutarse
la configuración del entorno ajustado. Compara los resultados con los datos anteriores del diagrama, en los que el mismo ejemplo generó errores y advertencias cuando se ejecutaba con la configuración de entorno predeterminada.
Los parámetros de configuración del entorno y de Airflow tienen un rol esencial la programación de tareas, sin embargo, no es posible aumentar la configuración más allá de ciertos límites.
Recomendamos optimizar el código DAG, consolidar tareas y usar la programación para optimizar el rendimiento y la eficiencia.
Ejemplo: errores de análisis y latencia de DAG debido a un código de DAG complejo
En este ejemplo, investigarás la latencia de análisis de un DAG de muestra que imita un exceso de variables de Airflow.
Crea una nueva variable de Airflow
Antes de subir el código de muestra, crea una nueva variable de Airflow.
En la consola de Google Cloud, ve a la página Entornos.
En la columna Servidor web de Airflow, sigue el vínculo de Airflow para tu entorno.
Ve a Administrador > Variables > Agrega un registro nuevo.
Configura los siguientes valores:
- clave:
example_var
- val:
test_airflow_variable
- clave:
Sube el DAG de muestra a tu entorno
Sube el siguiente DAG de muestra al entorno
que creaste en los pasos anteriores. En este instructivo, este DAG se llama
dag_for_loop_airflow_variable
Este DAG contiene un bucle for que se ejecuta 1,000 veces y imita un exceso de variables de Airflow. Cada iteración lee la variable example_var
y
genera una tarea. Cada tarea contiene un comando que imprime el archivo
valor.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnostica los problemas de análisis
El tiempo de análisis del DAG es la cantidad de tiempo que tarda el programador de Airflow en leer un archivo DAG y analizarlo. Antes de que el programador de Airflow pueda programar cualquier tarea desde un DAG, el programador debe analizar el archivo del DAG para descubrir la estructura del DAG y las tareas definidas.
Si un DAG tarda mucho tiempo en analizarse, esto consume la capacidad del programador y podría reducir el rendimiento de las ejecuciones de DAG.
Para supervisar el tiempo de análisis del DAG, haz lo siguiente:
Ejecuta el comando de la CLI de Airflow
dags report
en gcloud CLI para ver el tiempo de análisis de todos tus DAG:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Reemplaza lo siguiente:
ENVIRONMENT_NAME
: Es el nombre de tu entorno.LOCATION
: Es la región en la que se encuentra el entorno.
En el resultado del comando, busca el valor de duración del DAG de
dag_for_loop_airflow_variables
. Un valor alto puede indicar que este DAG no se implementa de forma óptima. Si tienes varios DAG, en la tabla de resultados, puedes identificar qué DAG tienen un tiempo de análisis prolongado.Ejemplo:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspecciona los tiempos de análisis del DAG en la consola de Google Cloud:
- En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros y, luego, a Todos los registros > Administrador de procesadores DAG.
Revisa los registros de
dag-processor-manager
y, luego, identifica posibles problemas.
Si el tiempo total de análisis de DAG supera los 10 segundos, es posible que tus programadores estén sobrecargados con el análisis de DAG y no puedan ejecutarlos de manera eficaz.
Optimiza el código del DAG
Sí recomendado para evitar que los perfiles “de nivel superior” código de Python en tus DAG. Los DAG con muchas importaciones, variables y funciones fuera del DAG introducen tiempos de análisis más largos para el programador de Airflow. Esto reduce el rendimiento y la escalabilidad de Cloud Composer y Airflow. Exceso de lectura de variables de Airflow lleva a un tiempo de análisis prolongado y a una alta carga de la base de datos. Si este código está en un archivo DAG, estas funciones se ejecutan en cada indicador de actividad del programador, lo que puede ser lento.
Los campos de la plantilla de Airflow te permiten incorporar valores de Airflow variables y plantillas de Jinja en tus DAG. De este modo, se evita que la ejecución de la función durante las señales de monitoreo de funcionamiento del programador.
Para implementar el ejemplo de DAG de una mejor manera, evita usar variables de Airflow en el código de nivel superior de Python de los DAG. En su lugar, pasa las variables de Airflow operadores a través de una plantilla Jinja, lo que retrasará la lectura del valor hasta la ejecución de la tarea.
Sube la versión nueva del DAG de ejemplo a tu
entorno. En este instructivo, este DAG se llama dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspecciona el nuevo tiempo de análisis del DAG:
Espera hasta que se complete la ejecución del DAG.
Ejecuta el comando
dags report
otra vez para ver la tiempo de análisis de todos tus DAG:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Revisa los registros de
dag-processor-manager
nuevamente y analiza la duración del análisis.
Mediante el reemplazo de las variables de entorno por plantillas de Airflow, simplificaste el código DAG y redujo la latencia de análisis alrededor de diez veces.
Optimiza los parámetros de configuración del entorno de Airflow
El programador de Airflow intenta activar tareas nuevas y analizar todos los DAGs de tu bucket de entorno de forma constante. Si tus DAG tienen un tiempo de análisis largo y el programador consume muchos recursos, puedes optimizar las configuraciones del programador de Airflow para que use los recursos de manera más eficiente.
En este instructivo, los archivos DAG tardan mucho tiempo en analizarse y en analizar ciclos
empiezan a superponerse, lo que agota la capacidad del programador. En nuestro ejemplo,
el primer ejemplo de DAG tarda más de 5 segundos en analizarse, así que lo configurarás
que el programador se ejecute con menos frecuencia
y use los recursos de forma más eficiente. Anularás la opción de configuración de Airflow scheduler_heartbeat_sec
. Esta configuración define con qué frecuencia
El programador debería ejecutarse (en segundos). De forma predeterminada, el valor se establece en 5 segundos.
Puedes cambiar esta opción de configuración de Airflow si haces lo siguiente:
y anularla.
Anula la opción de configuración de Airflow scheduler_heartbeat_sec
:
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Airflow Configuration Overrides.
Haz clic en Editar y, luego, en Agregar anulación de configuración de Airflow.
Anula la opción de configuración de Airflow:
Sección Clave Valor scheduler
scheduler_heartbeat_sec
10
Haz clic en Guardar y espera hasta que el entorno actualice su configuración.
Verifica las métricas del programador:
Ve a la pestaña Monitoring y selecciona Programadores.
En el gráfico Scheduler heartbeat, haz clic en el botón Más opciones (tres puntos) y, luego, en Ver en el Explorador de métricas.
En el gráfico, verás que el programador se ejecuta con dos veces menos frecuencia después de cambió la configuración predeterminada de 5 a 10 segundos. Cuando reduces la frecuencia de los mensajes de estado, te aseguras de que el programador no comience a ejecutarse mientras el ciclo de análisis anterior está en curso y no se agote la capacidad de recursos del programador.
Asignar más recursos al programador
En Cloud Composer 2, puedes asignar más recursos de CPU y memoria a la scheduler. De esta manera, puedes aumentar el rendimiento del programador y y acelerar el tiempo de análisis de tu DAG.
Asignar CPU y memoria adicionales al programador:
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Configuración del entorno.
Busca la configuración de Recursos > Cargas de trabajo y haz clic en Editar.
En la sección Programador, en el campo Memoria, especifica la memoria nueva. límite. En este instructivo, usa 4 GB.
En el campo CPU, especifica el límite de CPU nuevo. En este instructivo, usa 2 vCPU.
Guarda los cambios y espera unos minutos para que los programadores de Airflow reiniciar.
Ve a la pestaña Registros y, luego, a Todos los registros > Administrador del procesador de DAG.
Revisa los registros de
dag-processor-manager
y compara la duración del análisis del DAG de ejemplo:
Cuando asignaste más recursos al programador, aumentaste su capacidad y redujiste la latencia de análisis de forma significativa en comparación con las configuraciones de entorno predeterminadas. Con más recursos, el programador puede analizar DAG es más rápido, pero los costos asociados con Cloud Composer recursos de nube también aumentarán. Además, no es posible aumentar el recursos más allá de un límite determinado.
Recomendamos asignar recursos solo después del código DAG posible y Se implementaron las optimizaciones de la configuración de Airflow.
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Borra los recursos individuales
Si planeas explorar varios instructivos y guías de inicio rápido, la reutilización de proyectos puede ayudarte a evitar exceder los límites de las cuotas del proyecto.
Borra el entorno de Cloud Composer. También borrarás el bucket del entorno durante este procedimiento.