Usa CeleryKubernetesExecutor

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se explica cómo habilitar CeleryKubernetesExecutor en Cloud Composer y cómo usar KubernetesExecutor en tus DAG.

Información acerca de CeleryKubernetesExecutor

CeleryKubernetesExecutor es un tipo de ejecutor que puede usar CeleryExecutor y KubernetesExecutor al mismo tiempo. Airflow selecciona el ejecutor según la fila que definas para la tarea. En un DAG, puedes ejecutar algunas tareas con CeleryExecutor y otras tareas con KubernetesExecutor:

  • CeleryExecutor está optimizado para la ejecución rápida y escalable de tareas.
  • KubernetesExecutor está diseñado para ejecutar tareas intensivas en recursos y para ejecutar tareas de forma aislada.

CeleryKubernetesExecutor en Cloud Composer

CeleryKubernetesExecutor en Cloud Composer te permite usar KubernetesExecutor para tus tareas. No es posible usar KubernetesExecutor en Cloud Composer de forma independiente de CeleryKubernetesExecutor.

Cloud Composer ejecuta las tareas que ejecutas con KubernetesExecutor en el clúster de tu entorno, en el mismo espacio de nombres con los trabajadores de Airflow. Estas tareas tienen las mismas vinculaciones que los trabajadores de Airflow y pueden acceder a los recursos de tu proyecto.

Las tareas que ejecutas con KubernetesExecutor usan el modelo de precios de Cloud Composer, ya que los pods con estas tareas se ejecutan en el clúster de tu entorno. Los SKU de Compute de Cloud Composer (para CPU, memoria y almacenamiento) se aplican a estos pods.

Te recomendamos que ejecutes tareas con CeleryExecutor en los siguientes casos:

  • El tiempo de inicio de la tarea es importante.
  • Las tareas no requieren aislamiento del entorno de ejecución y no requieren muchos recursos.

Recomendamos que ejecutes tareas con KubernetesExecutor en los siguientes casos:

  • Las tareas requieren aislamiento del entorno de ejecución. Por ejemplo, para que las tareas no compitan por la memoria y la CPU, ya que se ejecutan en sus propios pods.
  • Las tareas requieren un uso intensivo de recursos y deseas controlar los recursos de CPU y memoria disponibles.

KubernetesExecutor en comparación con KubernetesPodOperator

Ejecutar tareas con KubernetesExecutor es similar a ejecutar tareas con KubernetesPodOperator. Las tareas se ejecutan en pods, lo que proporciona aislamiento de tareas a nivel del pod y una mejor administración de recursos.

Sin embargo, existen algunas diferencias clave:

  • KubernetesExecutor ejecuta tareas solo en el espacio de nombres de Cloud Composer con versión de tu entorno. No es posible cambiar este espacio de nombres en Cloud Composer. Puedes especificar un espacio de nombres en el que KubernetesPodOperator ejecute tareas de pod.
  • KubernetesExecutor puede usar cualquier operador integrado de Airflow. KubernetesPodOperator solo ejecuta una secuencia de comandos proporcionada que define el punto de entrada del contenedor.
  • KubernetesExecutor usa la imagen predeterminada de Docker de Cloud Composer con las mismas anulaciones de opciones de configuración de Python y Airflow, las variables de entorno y los paquetes de PyPI que se definen en tu entorno de Cloud Composer.

Acerca de las imágenes de Docker

De forma predeterminada, KubernetesExecutor inicia tareas con la misma imagen de Docker que usa Cloud Composer para los trabajadores de Celery. Esta es la imagen de Cloud Composer para tu entorno, con todos los cambios que especificaste para tu entorno, como paquetes de PyPI o variables de entorno personalizados.

Antes de comenzar

  • Puedes usar CeleryKubernetesExecutor en Cloud Composer 3.

  • No es posible usar ningún otro ejecutor que no sea CeleryKubernetesExecutor en Cloud Composer 3. Esto significa que puedes ejecutar tareas con CeleryExecutor, KubernetesExecutor o ambos en un DAG, pero no es posible configurar tu entorno para que solo use KubernetesExecutor o CeleryExecutor.

Configura CeleryKubernetesExecutor

Te recomendamos que anules las opciones de configuración existentes de Airflow que se relacionen con KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Esta opción define la cantidad de llamadas de creación de Pods de trabajador de Kubernetes por bucle de programador. El valor predeterminado es 1, por lo que solo se inicia un solo pod por cada mensaje de estado del programador. Si usas KubernetesExecutor con frecuencia, te recomendamos que aumentes este valor.

  • [kubernetes]worker_pods_pending_timeout

    Esta opción define, en segundos, cuánto tiempo puede permanecer un trabajador en el estado Pending (se está creando el Pod) antes de que se considere que falló. El valor predeterminado es de 5 minutos.

Ejecuta tareas con KubernetesExecutor o CeleryExecutor

Puedes ejecutar tareas con CeleryExecutor, KubernetesExecutor o ambos en un DAG:

  • Para ejecutar una tarea con KubernetesExecutor, especifica el valor kubernetes en el parámetro queue de una tarea.
  • Para ejecutar una tarea con CeleryExecutor, omite el parámetro queue.

En el siguiente ejemplo, se ejecuta la tarea task-kubernetes con KubernetesExecutor y la tarea task-celery con CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Ejecuta comandos de la CLI de Airflow relacionados con KubernetesExecutor

Puedes ejecutar varios comandos de la CLI de Airflow relacionados con KubernetesExecutor con gcloud.

Personaliza la especificación del grupo de trabajo

Puedes personalizar la especificación del pod de trabajador pasándola en el parámetro executor_config de una tarea. Puedes usarlo para definir requisitos personalizados de CPU y memoria.

Puedes anular toda la especificación del pod de trabajo que se usa para ejecutar una tarea. Para recuperar la especificación del pod de una tarea que usa KubernetesExecutor, puedes ejecutar el comando kubernetes generate-dag-yaml de la CLI de Airflow.

Para obtener más información sobre cómo personalizar las especificaciones del pod de trabajo, consulta la documentación de Airflow.

Cloud Composer 3 admite los siguientes valores para los requisitos de recursos:

Recurso Mínimo Máximo Paso
CPU 0.25 32 Valores de paso: 0.25, 0.5, 1, 2, 4, 6, 8, 10, …, 32. Los valores solicitados se redondean al valor de paso admitido más cercano (por ejemplo, de 5 a 6).
Memoria 2G (GB) 128 G (GB) Valores de paso: 2, 3, 4, 5, …, 128. Los valores solicitados se redondean al valor de paso compatible más cercano (por ejemplo, de 3.5G a 4G).
Almacenamiento - 100G (GB) Cualquier valor. Si se solicitan más de 100 GB, solo se proporcionan 100 GB.

Para obtener más información sobre las unidades de recursos en Kubernetes, consulta Unidades de recursos en Kubernetes.

En el siguiente ejemplo, se muestra una tarea que usa una especificación de pod de trabajador personalizada:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

Cómo ver los registros de tareas

Los registros de las tareas que ejecuta KubernetesExecutor están disponibles en la pestaña Registros, junto con los registros de las tareas que ejecuta CeleryExecutor:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros.

  4. Navega a Todos los registros > Registros de Airflow > Trabajadores.

  5. Los trabajadores llamados airflow-k8s-worker ejecutan tareas de KubernetesExecutor. Para buscar registros de una tarea específica, puedes usar un ID de DAG o un ID de tarea como palabra clave en la búsqueda.

¿Qué sigue?