Usa CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Esta página explica cómo habilitar CeleryKubernetesExecutor en Cloud Composer y cómo usar KubernetesExecutor en tus DAG.

Acerca de CeleryKubernetesExecutor

CeleryKubernetesExecutor es un de ejecución que puede usar CeleryExecutor y KubernetesExecutor al mismo tiempo. Airflow selecciona el ejecutor en función de la cola que definas para la tarea. En un DAG, puedes ejecutar algunas tareas con CeleryExecutor y otras con KubernetesExecutor:

  • CeleryExecutor está optimizado para ejecutar tareas de forma rápida y escalable.
  • KubernetesExecutor está diseñado para ejecutar tareas que consumen muchos recursos y ejecutar tareas aisladas.

CeleryKubernetesExecutor en Cloud Composer

CeleryKubernetesExecutor en Cloud Composer permite realizar las siguientes acciones: usar KubernetesExecutor para tus tareas. No es posible utilizar KubernetesExecutor en Cloud Composer por separado de CeleryKubernetesExecutor

Cloud Composer ejecuta las tareas que ejecutas con KubernetesExecutor en el clúster de tu entorno, en el mismo espacio de nombres con los trabajadores de Airflow. Tales las tareas tienen las mismas vinculaciones que Airflow trabajadores y pueden acceder a los recursos de tu proyecto.

Las tareas que ejecutas con KubernetesExecutor usan el Modelo de precios de Cloud Composer, ya que los Pods con estos las tareas que se ejecutan en el clúster de tu entorno. SKU de Cloud Composer Compute (para CPU, memoria y almacenamiento) se aplican a estos Pods.

Recomendamos ejecutar tareas con CeleryExecutor en los siguientes casos:

  • El tiempo de inicio de la tarea es importante.
  • Las tareas no requieren aislamiento del entorno de ejecución y no consumen muchos recursos.

Recomendamos ejecutar tareas con KubernetesExecutor en los siguientes casos:

  • Las tareas requieren aislamiento del entorno de ejecución. Por ejemplo, para que las tareas no compitan para la memoria y la CPU, ya que se ejecutan en sus propios Pods.
  • Las tareas requieren bibliotecas de sistema adicionales (o paquetes de PyPI).
  • Las tareas consumen muchos recursos y quieres controlar las tareas recursos de CPU y memoria.

Comparación entre KubernetesExecutor y KubernetesPodOperator

Ejecutar tareas con KubernetesExecutor es similar a ejecutar tareas con KubernetesPodOperator. Las tareas se ejecutan en Pods, lo que proporciona aislamiento de las tareas a nivel de Pod y una mejor administración de los recursos.

Sin embargo, existen algunas diferencias clave:

  • KubernetesExecutor ejecuta tareas solo en la versión de Cloud Composer espacio de nombres de tu entorno. No es posible cambiar este espacio de nombres en Cloud Composer. Puedes especificar un espacio de nombres en el que KubernetesPodOperator ejecuta las tareas del Pod.
  • KubernetesExecutor puede usar cualquier operador de Airflow integrado. KubernetesPodOperator ejecuta solo una secuencia de comandos proporcionada que define el punto de entrada del contenedor.
  • KubernetesExecutor usa la imagen predeterminada de Docker de Cloud Composer con las mismas anulaciones de opciones de configuración de Python, Airflow, entorno variables y paquetes de PyPI que se definen en tu entorno de Cloud Composer.

Acerca de las imágenes de Docker

De forma predeterminada, KubernetesExecutor inicia tareas con la misma imagen de Docker que Cloud Composer usa para los trabajadores de Celery. Este es el imagen de Cloud Composer para tu entorno, con todos los cambios que especificaste para tu entorno, como PyPI personalizado. paquetes o variables de entorno.

Antes de comenzar

  • Puedes usar CeleryKubernetesExecutor en Cloud Composer 3.

  • No es posible usar ningún ejecutor que no sea CeleryKubernetesExecutor en Cloud Composer 3. Esto significa que puedes ejecutar tareas usando CeleryExecutor, KubernetesExecutor o ambos en un DAG, pero no es posible configurar tu entorno para que solo use KubernetesExecutor CeleryExecutor

Configura CeleryKubernetesExecutor

Es posible que quieras anular la configuración existente de Airflow. relacionadas con KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Esta opción define la cantidad de llamadas de creación de Pods de trabajadores de Kubernetes por del programador. El valor predeterminado es 1, por lo que solo se inicia un solo Pod por señal de monitoreo de funcionamiento. Si usas mucho KubernetesExecutor, se recomienda aumentar este valor.

  • [kubernetes]worker_pods_pending_timeout

    Esta opción define, en segundos, cuánto tiempo puede permanecer un trabajador en el Pending (se está creando el Pod) antes de que se considere con errores. Predeterminado es 5 minutos.

Ejecuta tareas con KubernetesExecutor o CeleryExecutor

Puedes ejecutar tareas con CeleryExecutor, KubernetesExecutor o ambos en un DAG:

  • Para ejecutar una tarea con KubernetesExecutor, especifica el valor kubernetes en Parámetro queue de una tarea.
  • Para ejecutar una tarea con CeleryExecutor, omite el parámetro queue.

En el siguiente ejemplo, se ejecuta la tarea task-kubernetes con KubernetesExecutor y la tarea task-celery con CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Ejecuta comandos de la CLI de Airflow relacionados con KubernetesExecutor

Puedes ejecutar varias Comandos de la CLI de Airflow relacionados con KubernetesExecutor usando gcloud.

Personaliza las especificaciones del Pod de trabajador

Para personalizar las especificaciones del Pod del trabajador, pásalo al archivo executor_config. parámetro de una tarea. Puedes usarlo para definir parámetros de configuración y los requisitos de cumplimiento.

Puedes anular toda la especificación del Pod de trabajador que se usa para ejecutar una tarea. Para recuperar la especificación de Pod de una tarea utilizada por KubernetesExecutor, puedes Ejecuta la CLI de Airflow de kubernetes generate-dag-yaml kubectl.

Para obtener más información sobre cómo personalizar las especificaciones del Pod de trabajador, consulta Documentación de Airflow.

En el siguiente ejemplo, se muestra una tarea que usa una especificación de pod de trabajador personalizado:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Ver registros de tareas

Los registros de tareas que ejecuta KubernetesExecutor están disponibles en la pestaña Registros. junto con los registros de tareas que ejecuta CeleryExecutor:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros.

  4. Navega a Todos los registros > Registros de Airflow. > Trabajadores.

  5. Se ejecutan los trabajadores llamados airflow-k8s-worker. KubernetesExecutor. Para buscar registros de una tarea específica, puedes usar un ID de DAG o de una tarea como palabra clave en la búsqueda.

¿Qué sigue?