Cómo depurar problemas en la programación de tareas

Cloud Composer 1 | Cloud Composer 2

En este instructivo, se explica cómo diagnosticar y solucionar problemas relacionados con la programación de tareas y el análisis de problemas que provocan el mal funcionamiento del programador, el análisis de errores y la latencia, y la falla de las tareas.

Introducción

El programador de Airflow se ve afectado principalmente por dos factores: la programación de tareas y el análisis de DAG. Los problemas en uno de esos factores pueden tener un impacto negativo en el estado y el rendimiento del entorno.

A veces, se programan demasiadas tareas simultáneamente. En esta situación, la cola se llena y las tareas permanecen en el estado “programado” o se vuelven a programar después de estar en cola, lo que puede causar fallas en las tareas y latencia de rendimiento.

Otro problema común es el análisis de la latencia y los errores causados por la complejidad de un código de DAG. Por ejemplo, un código DAG que contiene variables de Airflow en el nivel superior del código puede generar retrasos en el análisis, sobrecarga de la base de datos, fallas de programación y tiempos de espera de DAG.

En este instructivo, diagnosticarás los DAG de ejemplo y aprenderás a solucionar problemas de programación y análisis, mejorar la programación de DAG y optimizar el código de DAG y la configuración del entorno para mejorar el rendimiento.

Objetivos

En esta sección, se enumeran los objetivos de los ejemplos de este instructivo.

Ejemplo: Falla en el programador y latencia debido a la alta simultaneidad de tareas

  • Sube el DAG de muestra que se ejecuta varias veces de forma simultánea y diagnostica el mal funcionamiento del programador y los problemas de latencia con Cloud Monitoring.

  • Consolida las tareas y evalúa el impacto en el rendimiento para optimizar el código del DAG.

  • Distribuye las tareas de manera más uniforme a lo largo del tiempo y evalúa el impacto en el rendimiento.

  • Optimizar la configuración de Airflow y del entorno, y evaluar el impacto

Ejemplo: Errores de análisis y latencia de DAG causados por un código complejo

  • Subir el DAG de muestra con variables de Airflow y diagnosticar problemas de análisis con Cloud Monitoring

  • Para optimizar el código DAG, evita las variables de Airflow en el nivel superior del código y evalúa el impacto en el tiempo de análisis.

  • Optimizar la configuración de Airflow y del entorno, y evaluar el impacto en el tiempo de análisis

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

Cuando finalices este instructivo, puedes borrar los recursos creados para evitar que se te siga facturando. Para obtener más detalles, consulta Realiza una limpieza.

Antes de comenzar

En esta sección, se describen las acciones que se requieren antes de comenzar el instructivo.

Crea y configura un proyecto

Para este instructivo, necesitas un proyecto de Google Cloud. Configura el proyecto de la siguiente manera:

  1. En la consola de Google Cloud, selecciona o crea un proyecto:

    Ir al Selector de proyectos

  2. Asegúrate de tener habilitada la facturación para tu proyecto. Descubre cómo verificar si la facturación está habilitada en un proyecto.

  3. Asegúrate de que el usuario de tu proyecto de Google Cloud tenga los siguientes roles para crear los recursos necesarios:

    • Administrador de objetos de almacenamiento y entorno (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador de Compute (roles/compute.admin)

Habilita las API para tu proyecto.

Habilita la API de Cloud Composer.

Habilita la API

Crea tu entorno de Cloud Composer

Crea un entorno de Cloud Composer 2.

Como parte de la creación del entorno, otorgas la función Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) a la cuenta del agente de servicio de Composer. Cloud Composer usa esta cuenta para realizar operaciones en tu proyecto de Google Cloud.

Ejemplo: Falla en el programador y falla de las tareas debido a problemas de programación de tareas

En este ejemplo, se muestra la depuración del mal funcionamiento y la latencia del programador causada por la alta simultaneidad de tareas.

Sube el DAG de muestra a tu entorno

Sube el siguiente DAG de muestra al entorno que creaste en los pasos anteriores. En este instructivo, este DAG se llama dag_10_tasks_200_seconds_1.

Este DAG tiene 200 tareas. Cada tarea espera 1 segundo y muestra el mensaje “Complete!”. El DAG se activa automáticamente una vez que se sube. Cloud Composer ejecuta este DAG 10 veces, y todas las ejecuciones de DAG ocurren en paralelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostique el mal funcionamiento del programador y los problemas de falla de tareas

Una vez que se completen las ejecuciones del DAG, abre la IU de Airflow y haz clic en el DAG dag_10_tasks_200_seconds_1. Verás que 10 ejecuciones totales de DAG fueron exitosas y cada una tiene 200 tareas que se realizaron de forma correcta.

Revisa los registros de tareas de Airflow:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Trabajadores > Ver en el Explorador de registros.

En el histograma de registros, puedes ver los errores y las advertencias indicados con los colores rojo y naranja:

El histograma de los registros de trabajadores de Airflow con errores y advertencias indicados con los colores rojo y naranja
Figura 1: Histograma de registros de trabajadores de Airflow (haz clic para ampliar)

El DAG de ejemplo generó alrededor de 130 advertencias y 60 errores. Haz clic en cualquier columna que contenga barras amarillas y rojas. Verás algunos de los siguientes errores y advertencias en los registros:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Estos registros pueden indicar que el uso de recursos superó los límites y que el trabajador se reinició.

Si una tarea de Airflow se mantiene en la cola durante demasiado tiempo, el programador la marca como con errores y up_for_retry y va a reprogramarla una vez más para su ejecución. Una forma de observar los síntomas de esta situación es observar el gráfico con la cantidad de tareas en cola. Si los aumentos repentinos en este gráfico no disminuyen en alrededor de 10 minutos, es probable que haya fallas en las tareas (sin registros).

Revisa la información de supervisión:

  1. Ve a la pestaña Supervisión y selecciona Descripción general.

  2. Revisa el gráfico Tareas de Airflow.

    El grafo de las tareas de Airflow a lo largo del tiempo, que muestra un aumento en la cantidad de tareas en cola
    Figura 2: Gráfico de tareas de Airflow (haz clic para ampliar)

    En el gráfico de tareas de Airflow, hay un aumento repentino en las tareas en cola que dura más de 10 minutos, lo que podría significar que no hay suficientes recursos en tu entorno para procesar todas las tareas programadas.

  3. Revisa el gráfico Trabajadores activos:

    En el gráfico de trabajadores de Airflow activos a lo largo del tiempo, se muestra que la
    cantidad de trabajadores activos se incrementó hasta el límite máximo.
    Figura 3: Gráfico de trabajadores activos (haz clic para ampliar)

    El gráfico Trabajadores activos indica que el DAG activó el ajuste de escala automático hasta el límite máximo permitido de tres trabajadores durante la ejecución del DAG.

  4. Los gráficos de uso de recursos pueden indicar la falta de capacidad en los trabajadores de Airflow para ejecutar tareas en cola. En la pestaña Supervisión, selecciona Trabajadores y revisa los gráficos Uso total de CPU de los trabajadores y Uso de la memoria total del trabajador.

    En el gráfico del uso de CPU por parte de los trabajadores de Airflow, se muestra que el uso de CPU
    aumenta hasta el límite máximo.
    Figura 4: Gráfico del uso de CPU del total de trabajadores (haz clic para ampliar)
    En el gráfico del uso de memoria por parte de los trabajadores de Airflow, se muestra que el uso de memoria
    aumenta, pero no alcanza el límite máximo.
    Figura 5: Gráfico del uso de memoria total de los trabajadores (haz clic para ampliar)

    Los gráficos indican que la ejecución de demasiadas tareas a la vez dio como resultado el límite de CPU. Los recursos se usaron durante más de 30 minutos, lo que es incluso más que la duración total de 200 tareas en 10 ejecuciones de DAG que se ejecutan una por una.

Estos son los indicadores de que la cola se está llenando y la falta de recursos para procesar todas las tareas programadas.

Consolida tus tareas

El código actual crea muchos DAG y tareas sin recursos suficientes para procesar todas las tareas en paralelo, lo que hace que se llene la cola. Si mantienes las tareas en la cola durante mucho tiempo, es posible que se reprogramen o fallen. En estas situaciones, debes optar por una cantidad menor de tareas más consolidadas.

En el siguiente DAG de muestra, se cambia la cantidad de tareas del ejemplo inicial de 200 a 20 y se aumenta el tiempo de espera de 1 a 10 segundos para imitar tareas más consolidadas que realizan la misma cantidad de trabajo.

Sube el siguiente DAG de muestra al entorno que creaste. En este instructivo, este DAG se llama dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Evalúa el impacto de las tareas más consolidadas en los procesos de programación:

  1. Espera hasta que se completen las ejecuciones del DAG.

  2. En la IU de Airflow, en la página DAG, haz clic en el DAG dag_10_tasks_20_seconds_10. Verás 10 ejecuciones de DAG, cada una con 20 tareas que se completaron correctamente.

  3. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  4. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  5. Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Trabajadores > Ver en el Explorador de registros.

    El segundo ejemplo con tareas más consolidadas dio como resultado aproximadamente 10 advertencias y 7 errores. En el histograma, puedes comparar la cantidad de errores y advertencias en el ejemplo inicial (valores anteriores) y en el segundo ejemplo (valores posteriores).

    El histograma de registros de trabajadores de Airflow con errores y advertencias
    muestra la menor cantidad de errores y advertencias después de que se
    consolidaron las tareas
    Figura 6: Histograma de los registros de los trabajadores de Airflow una vez que se consolidaron las tareas (haz clic para ampliar)

    Cuando se compara el primer ejemplo con el más consolidado, puedes ver que hay muchos menos errores y advertencias en el segundo ejemplo. Sin embargo, los mismos errores relacionados con el cierre semicaliente continúan apareciendo en los registros debido a la sobrecarga de recursos.

  6. En la pestaña Monitoring, selecciona Workers y revisa los gráficos.

    Cuando comparas el gráfico de tareas de Airflow del primer ejemplo (valores anteriores) con el del segundo ejemplo con tareas más consolidadas, puedes ver que el pico de tareas en cola duró un período más corto cuando las tareas estaban más consolidadas. Sin embargo, duró cerca de 10 minutos, lo cual sigue siendo menos óptimo.

    En el gráfico de las tareas de Airflow en el tiempo, se muestra que el pico de tareas de Airflow duró más tiempo que antes.
    Figura 7: Gráfico de tareas de Airflow después de la consolidación de las tareas (haz clic para ampliar)

    En el gráfico de trabajadores activos, puedes ver que el primer ejemplo (en el lado izquierdo del gráfico) usó recursos por un período mucho más prolongado que el segundo, aunque ambos ejemplos imitan la misma cantidad de trabajo.

    En el gráfico de trabajadores de Airflow activos a lo largo del tiempo, se muestra que la
 cantidad de trabajadores activos se incrementó durante un período más corto
 que antes.
    Figura 8: Gráfico de trabajadores activos después de que se consolidaron las tareas (haz clic para ampliar)

    Revisa los gráficos de consumo de recursos de trabajadores. Aunque la diferencia entre los recursos usados en el ejemplo con tareas más consolidadas y el ejemplo inicial es bastante significativa, el uso de CPU alcanza el 70% del límite.

    En el gráfico del uso de CPU por parte de los trabajadores de Airflow, se muestra que el uso de CPU
 aumenta hasta el 70% del límite máximo.
    Figura 9: Gráfico del uso de CPU total de los trabajadores después de que se consolidaron las tareas (haz clic para ampliar)
    En el gráfico del uso de memoria por parte de los trabajadores de Airflow, se muestra que el uso de memoria aumenta, pero no alcanza el límite máximo.
    Figura 10: Gráfico del uso de memoria total de los trabajadores después de la consolidación de las tareas (haz clic para ampliar)

Distribuye las tareas de manera más uniforme a lo largo del tiempo

Si hay demasiadas tareas simultáneas, la cola se llena, lo que hace que las tareas queden atascadas en la cola o se reprogramen. En los pasos anteriores, disminuiste la cantidad de tareas mediante la consolidación. Sin embargo, los registros de salida y la supervisión indicaban que la cantidad de tareas simultáneas aún es inferior a la óptima.

Puedes controlar la cantidad de ejecuciones de tareas simultáneas mediante la implementación de un programa o la configuración de límites para la cantidad de tareas que se pueden ejecutar de forma simultánea.

En este instructivo, distribuyes las tareas de manera más uniforme a lo largo del tiempo mediante la adición de parámetros de nivel de DAG al DAG dag_10_tasks_20_seconds_10:

  1. Agrega el argumento max_active_runs=1 al administrador de contextos de DAG. Este argumento establece un límite de una sola instancia de un DAG que se ejecuta en un momento determinado.

  2. Agrega el argumento max_active_tasks=5 al administrador de contextos de DAG. Este argumento controla la cantidad máxima de instancias de tareas que pueden ejecutarse de forma simultánea en cada DAG.

Sube el siguiente DAG de muestra al entorno que creaste. En este instructivo, este DAG se llama dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Evaluar el impacto de distribuir tareas a lo largo del tiempo en los procesos de programación:

  1. Espera hasta que se completen las ejecuciones del DAG.

  2. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  3. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  4. Ve a la pestaña Registros y, luego, a Todos los registros > Registros de Airflow > Trabajadores > Ver en el Explorador de registros.

  5. En el histograma, puedes ver que el tercer DAG con una cantidad limitada de tareas y ejecuciones activas no generó ninguna advertencia ni error, y la distribución de registros se ve más uniforme en comparación con los valores anteriores.

    El histograma de registros de trabajadores de Airflow con errores y advertencias no muestra errores ni advertencias después de que las tareas se consolidaron y se distribuyeron a lo largo del tiempo.
    Figura 11: Histograma de registros de trabajadores de Airflow después de que las tareas se consolidaron y se distribuyeron a lo largo del tiempo (haz clic para ampliar)

Las tareas del ejemplo de dag_10_tasks_20_seconds_10_scheduled que tienen una cantidad limitada de tareas activas y ejecuciones no causaron presión de recursos porque las tareas se pusieron en cola de manera uniforme.

Después de realizar los pasos descritos, optimizaste el uso de recursos mediante la consolidación de tareas pequeñas y su distribución de manera más uniforme a lo largo del tiempo.

Optimiza la configuración del entorno

Puedes ajustar la configuración del entorno a fin de asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar tareas en cola.

Cantidad de trabajadores y simultaneidad de trabajadores

Puedes ajustar la cantidad máxima de trabajadores para que Cloud Composer escale automáticamente tu entorno dentro de los límites establecidos.

El parámetro [celery]worker_concurrency define el número máximo de tareas que un solo trabajador puede recoger de la lista de tareas en cola. Si cambias este parámetro, se ajusta la cantidad de tareas que un solo trabajador puede ejecutar al mismo tiempo. Puedes cambiar esta opción de configuración de Airflow mediante la anulación. Según la configuración predeterminada, la simultaneidad de trabajadores se establece en un mínimo de las siguientes opciones: 32, 12 * worker_CPU, 8 * worker_memory, lo que significa que depende de los límites de recursos de trabajadores. Consulta Optimiza entornos para obtener más información sobre los valores predeterminados de simultaneidad de trabajadores.

La cantidad de trabajadores y la simultaneidad de trabajadores funcionan en combinación entre sí, y el rendimiento del entorno depende en gran medida de ambos parámetros. Puedes usar las siguientes consideraciones para elegir la combinación correcta:

  • Varias tareas rápidas que se ejecutan en paralelo. Puedes aumentar la simultaneidad de trabajadores cuando hay tareas en espera en la cola y cuando tus trabajadores usan un porcentaje bajo de sus CPU y memoria al mismo tiempo. Sin embargo, en determinadas circunstancias, es posible que la cola nunca se llene, lo que hace que el ajuste de escala automático nunca se active. Si las tareas pequeñas finalizan la ejecución cuando los trabajadores nuevos están listos, un trabajador existente puede recoger las tareas restantes, y no habrá tareas para los trabajadores nuevos.

    En estas situaciones, se recomienda aumentar la cantidad mínima de trabajadores y la simultaneidad de trabajadores para evitar el escalamiento excesivo.

  • Varias tareas largas que se ejecutan en paralelo. La simultaneidad alta de trabajadores evita que el sistema escale la cantidad de trabajadores. Si varias tareas requieren muchos recursos y tardan mucho tiempo en completarse, una simultaneidad de trabajadores alta puede hacer que la cola nunca se llene y que solo un trabajador recoja todas las tareas, lo que genera problemas de rendimiento. En estas situaciones, se recomienda aumentar la cantidad máxima de trabajadores y disminuir la simultaneidad de trabajadores.

La importancia del paralelismo

Los programadores de Airflow controlan la programación de las ejecuciones de DAG y las tareas individuales de los DAG. La opción de configuración [core]parallelism de Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se cumplan todas las dependencias de estas tareas.

El paralelismo es un mecanismo de protección de Airflow que determina cuántas tareas se pueden ejecutar al mismo tiempo por cada programador, sin importar la cantidad de trabajadores. El valor de paralelismo, multiplicado por la cantidad de programadores del clúster, es la cantidad máxima de instancias de tareas que tu entorno puede poner en cola.

Por lo general, [core]parallelism se configura como un producto de una cantidad máxima de trabajadores y [celery]worker_concurrency. También se ve afectado por el grupo. Puedes cambiar esta opción de configuración de Airflow mediante la anulación. Si deseas obtener más información para ajustar la configuración de Airflow relacionada con el escalamiento, consulta Escala la configuración de Airflow.

Encuentra parámetros de configuración de entorno óptimos

La forma recomendada de solucionar problemas de programación es consolidar las tareas pequeñas en tareas más grandes y distribuirlas de manera más uniforme a lo largo del tiempo. Además de optimizar el código DAG, también puedes optimizar la configuración del entorno para tener una capacidad suficiente y ejecutar varias tareas de forma simultánea.

Por ejemplo, supongamos que consolidas las tareas en tu DAG tanto como sea posible, pero limitar las tareas activas a fin de que se distribuyan de manera más uniforme en el tiempo no es una solución preferida para tu caso de uso específico.

Puedes ajustar los parámetros de paralelismo, cantidad de trabajadores y de simultaneidad de trabajadores para ejecutar el DAG dag_10_tasks_20_seconds_10 sin limitar las tareas activas. En este ejemplo, el DAG se ejecuta 10 veces y cada ejecución contiene 20 tareas pequeñas. Si quieres ejecutarlos todos simultáneamente, haz lo siguiente:

  • Necesitarás un tamaño de entorno más grande, ya que controla los parámetros de rendimiento de la infraestructura administrada de Cloud Composer de tu entorno.

  • Los trabajadores de Airflow deben poder ejecutar 20 tareas de forma simultánea, lo que significa que debes establecer la simultaneidad de trabajadores en 20.

  • Los trabajadores necesitan suficiente CPU y memoria para manejar todas las tareas. La simultaneidad de los trabajadores se ve afectada por la CPU y la memoria del trabajador, por lo que necesitarás al menos worker_concurrency / 12 en la CPU y least worker_concurrency / 8 en la memoria.

  • Deberás aumentar el paralelismo para que coincida con la simultaneidad de trabajadores más alta. Para que los trabajadores puedan recoger 20 tareas de la cola, el programador deberá programar esas 20 tareas primero.

Ajusta la configuración del entorno de la siguiente manera:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Configuración del entorno.

  4. Busca la configuración Resources > Workloads y haz clic en Edit.

  5. En la sección Trabajador, en el campo Memoria, especifica el nuevo límite de memoria para los trabajadores de Airflow. En este instructivo, usa 4 GB.

  6. En el campo CPU, especifica el límite de CPU nuevo para los trabajadores de Airflow. En este instructivo, usa 2 CPU virtuales.

  7. Guarda los cambios y espera varios minutos para que se reinicien los trabajadores de Airflow.

A continuación, anula las opciones de configuración de Airflow para el paralelismo y la simultaneidad de trabajadores:

  1. Ve a la pestaña Anulaciones de configuración de Airflow.

  2. Haz clic en Editar y, luego, en Agregar anulación de configuración de Airflow.

  3. Anula la configuración de parralelismo:

    Sección Clave Valor
    core parallelism 20
  4. Haz clic en Agregar anulación de configuración de Airflow y anula la configuración de simultaneidad de trabajadores:

    Sección Clave Valor
    celery worker_concurrency 20
  5. Haz clic en Guardar y espera hasta que el entorno actualice su configuración.

Vuelve a activar el mismo DAG de ejemplo con la configuración ajustada:

  1. En la IU de Airflow, ve a la página DAG.

  2. Busca el DAG dag_10_tasks_20_seconds_10 y bórralo.

    Después de borrar el DAG, Airflow verifica la carpeta de DAG en el bucket de tu entorno y vuelve a ejecutarlo de forma automática.

Una vez que se completen las ejecuciones del DAG, vuelve a revisar el histograma de registros. En el diagrama, puedes ver que el ejemplo de dag_10_tasks_20_seconds_10 con tareas más consolidadas no generó ningún error ni advertencia cuando se ejecutaba con la configuración de entorno ajustada. Compara los resultados con los datos anteriores del diagrama, en los que el mismo ejemplo generó errores y advertencias cuando se ejecutaba con la configuración del entorno predeterminado tge.

El histograma de registros de trabajadores de Airflow con errores y advertencias
        no muestra errores ni advertencias después de que se ajustó la configuración
        del entorno
Figura 12: Histograma de los registros de los trabajadores de Airflow después de que se ajustó la configuración del entorno (haz clic para ampliar)

Los parámetros de configuración del entorno y de Airflow desempeñan una función fundamental en la programación de tareas. Sin embargo, no es posible aumentar la configuración más allá de ciertos límites.

Recomendamos optimizar el código DAG, consolidar tareas y usar la programación para optimizar el rendimiento y la eficiencia.

Ejemplo: Errores de análisis y latencia de DAG debido a un código de DAG complejo

En este ejemplo, debes investigar la latencia del análisis de un DAG de muestra que imita un exceso de variables de Airflow.

Crea una nueva variable de Airflow

Antes de subir el código de muestra, crea una nueva variable de Airflow.

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la columna Servidor web de Airflow, sigue el vínculo de Airflow para tu entorno.

  3. Ve a Administrador > Variables > Agregar un registro nuevo.

  4. Configura los siguientes valores:

    • clave: example_var
    • val: test_airflow_variable

Sube el DAG de muestra a tu entorno

Sube el siguiente DAG de muestra al entorno que creaste en los pasos anteriores. En este instructivo, este DAG se llama dag_for_loop_airflow_variable.

Este DAG contiene un bucle for que se ejecuta 1,000 veces y, además, imita un exceso de las variables de Airflow. Cada iteración lee la variable example_var y genera una tarea. Cada tarea contiene un comando que imprime el valor de la variable.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostica los problemas de análisis

El tiempo de análisis del DAG es la cantidad de tiempo que tarda el programador de Airflow en leer un archivo de DAG y analizarlo. Antes de que el programador de Airflow pueda programar cualquier tarea desde un DAG, el programador debe analizar el archivo DAG para descubrir la estructura del DAG y las tareas definidas.

Si un DAG tarda mucho tiempo en analizarse, esto consume la capacidad del programador y podría reducir el rendimiento de las ejecuciones de DAG.

Para supervisar el tiempo de análisis del DAG, haz lo siguiente:

  1. Ejecuta el comando de CLI de Airflow dags report en gcloud CLI para ver el tiempo de análisis de todos tus DAG:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Reemplaza lo siguiente:

    • ENVIRONMENT_NAME: Es el nombre de tu entorno.
    • LOCATION: Es la región en la que se encuentra el entorno.
  2. En el resultado del comando, busca el valor de duración del DAG dag_for_loop_airflow_variables. Un valor grande puede indicar que este DAG no se implementó de manera óptima. Si tienes varios DAG, en la tabla de salida, puedes identificar qué DAG tienen un tiempo de análisis prolongado.

    Ejemplo:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspecciona los tiempos de análisis del DAG en la consola de Google Cloud:

    1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  4. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  5. Ve a la pestaña Registros y, luego, a Todos los registros > Administrador del procesador de DAG.

  6. Revisa los registros de dag-processor-manager e identifica posibles problemas.

    Una entrada de registro para el DAG de muestra indica que el tiempo de análisis del DAG es de 46.3 segundos
    Figura 13: Los registros del administrador de procesadores de DAG muestran los tiempos de análisis de DAG (haz clic para ampliar)

Si el tiempo total de análisis del DAG supera los 10 segundos, es posible que los programadores estén sobrecargados con el análisis del DAG y no puedan ejecutarlos de manera eficaz.

Optimiza el código del DAG

Se recomienda evitar el código de Python de “nivel superior” innecesario en tus DAG. Los DAG con muchas importaciones, variables y funciones fuera del DAG presentan tiempos de análisis mayores para el programador de Airflow. Esto reduce el rendimiento y la escalabilidad de Cloud Composer y Airflow. El exceso de lectura de variables de Airflow genera un largo tiempo de análisis y una alta carga de la base de datos. Si este código está en un archivo DAG, estas funciones se ejecutarán en cada señal de monitoreo de funcionamiento del programador, que puede ser lenta.

Los campos de plantilla de Airflow te permiten incorporar valores de las variables de Airflow y las plantillas de Jinja en tus DAG. Esto evita la ejecución innecesaria de funciones durante las señales de monitoreo de funcionamiento del programador.

Para implementar el ejemplo de DAG de una mejor manera, evita usar variables de Airflow en el código de Python de nivel superior de los DAG. En su lugar, pasa las variables de Airflow a los operadores existentes a través de una plantilla de Jinja, lo que retrasará la lectura del valor hasta la ejecución de la tarea.

Sube la versión nueva del DAG de muestra a tu entorno. En este instructivo, este DAG se llama dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Inspecciona el nuevo tiempo de análisis de DAG:

  1. Espera hasta que se complete la ejecución del DAG.

  2. Vuelve a ejecutar el comando dags report para ver el tiempo de análisis de todos tus DAG:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Revisa los registros de dag-processor-manager nuevamente y analiza la duración del análisis.

    Una entrada de registro para el DAG de muestra indica que el tiempo de análisis del DAG es de 4.21
 segundos.
    Figura 14: Los registros del administrador de procesadores de DAG muestran los tiempos de análisis del DAG después de que se optimizó su código (haz clic para ampliar)

Mediante el reemplazo de las variables de entorno por plantillas de Airflow, simplificaste el código DAG y redujiste la latencia de análisis alrededor de diez veces.

Optimiza la configuración del entorno de Airflow

El programador de Airflow intenta de forma constante activar tareas nuevas y analiza todos los DAG en tu bucket de entorno. Si tus DAG tienen un largo tiempo de análisis y el programador consume muchos recursos, puedes optimizar la configuración del programador de Airflow para que el programador use los recursos de manera más eficiente.

En este instructivo, los archivos DAG tardan mucho tiempo en analizarse, y los ciclos de análisis comienzan a superponerse, lo que, luego, agota la capacidad del programador. En nuestro ejemplo, el primer DAG de ejemplo tarda más de 5 segundos en analizarse, por lo que configurarás el programador para que se ejecute con menos frecuencia a fin de usar los recursos de manera más eficiente. Anularás la opción de configuración scheduler_heartbeat_sec de Airflow. Esta configuración define la frecuencia con la que se debe ejecutar el programador (en segundos). De forma predeterminada, el valor se establece en 5 segundos. Puedes cambiar esta opción de configuración de Airflow mediante la anulación.

Anula la opción de configuración scheduler_heartbeat_sec de Airflow:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Anulaciones de configuración de Airflow.

  4. Haz clic en Editar y, luego, en Agregar anulación de configuración de Airflow.

  5. Anula la opción de configuración de Airflow:

    Sección Clave Valor
    scheduler scheduler_heartbeat_sec 10
  6. Haz clic en Guardar y espera hasta que el entorno actualice su configuración.

Verifica las métricas del programador:

  1. Ve a la pestaña Supervisión y selecciona Programadores.

  2. En el gráfico Señal de monitoreo de funcionamiento del programador, haz clic en el botón Más opciones (tres puntos) y, luego, en Ver en el Explorador de métricas.

El gráfico de latido del programador muestra que la frecuencia cardíaca ocurre con menos frecuencia
Figura 15: Gráfico de señal de monitoreo de funcionamiento del programador (haz clic para ampliar)

En el gráfico, verás que el programador se ejecuta dos veces con menos frecuencia después de cambiar la configuración predeterminada de 5 segundos a 10 segundos. Cuando reduces la frecuencia de las señales de monitoreo de funcionamiento, te aseguras de que el programador no comience a ejecutarse mientras el ciclo de análisis anterior esté en curso y no se agote la capacidad de los recursos del programador.

Asignar más recursos al programador

En Cloud Composer 2, puedes asignar más recursos de CPU y memoria al programador. De esta manera, puedes aumentar el rendimiento del programador y acelerar el tiempo de análisis del DAG.

Asigna CPU y memoria adicionales al programador:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Configuración del entorno.

  4. Busca la configuración Resources > Workloads y haz clic en Edit.

  5. En la sección Programador, en el campo Memoria (Memory), especifica el límite de memoria nuevo. En este instructivo, usa 4 GB.

  6. En el campo CPU, especifica el límite de CPU nuevo. En este instructivo, usa 2 CPU virtuales.

  7. Guarda los cambios y espera varios minutos para que se reinicien los programadores de Airflow.

  8. Ve a la pestaña Registros y, luego, a Todos los registros > Administrador del procesador de DAG.

  9. Revisa los registros de dag-processor-manager y compara la duración del análisis de los DAG de ejemplo:

    Una entrada de registro para el DAG de muestra indica que el tiempo de análisis del DAG para el DAG optimizado es de 1.5 segundos. Para el DAG no optimizado, el tiempo de análisis es de 28.71 segundos.
    Figura 16: Los registros del administrador del procesador de DAG muestran los tiempos de análisis del DAG después de que se asignaron más recursos al programador (haz clic para ampliar)

Cuando asignaste más recursos al programador, aumentaste su capacidad y redujiste la latencia de análisis de forma significativa en comparación con la configuración predeterminada del entorno. Con más recursos, el programador puede analizar los DAG más rápido. Sin embargo, los costos asociados con los recursos de Cloud Composer también aumentarán. Además, no es posible aumentar los recursos más allá de un límite determinado.

Recomendamos asignar recursos solo después de que se hayan implementado el posible código DAG y las optimizaciones de configuración de Airflow.

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

  1. En la consola de Google Cloud, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra los recursos individuales

Si planeas explorar varios instructivos y guías de inicio rápido, la reutilización de proyectos puede ayudarte a evitar exceder los límites de las cuotas del proyecto.

Borra el entorno de Cloud Composer. También debes borrar el bucket del entorno durante este procedimiento.

¿Qué sigue?