Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se explica cómo habilitar CeleryKubernetesExecutor en Cloud Composer y cómo usar KubernetesExecutor en tus DAGs.
Acerca de CeleryKubernetesExecutor
CeleryKubernetesExecutor es un tipo de ejecutor que puede usar CeleryExecutor y KubernetesExecutor al mismo tiempo. Airflow selecciona el ejecutor en función de la cola que definas para la tarea. En un DAG, puedes ejecutar algunas tareas con CeleryExecutor y otras con KubernetesExecutor:
- CeleryExecutor está optimizado para la ejecución rápida y escalable de tareas.
- KubernetesExecutor se ha diseñado para ejecutar tareas que requieren muchos recursos y para ejecutar tareas de forma aislada.
CeleryKubernetesExecutor en Cloud Composer
CeleryKubernetesExecutor en Cloud Composer te permite usar KubernetesExecutor para tus tareas. No se puede usar KubernetesExecutor en Cloud Composer por separado de CeleryKubernetesExecutor.
Cloud Composer ejecuta las tareas que ejecutas con KubernetesExecutor en el clúster de tu entorno, en el mismo espacio de nombres que los trabajadores de Airflow. Estas tareas tienen los mismos enlaces que los workers de Airflow y pueden acceder a los recursos de tu proyecto.
Las tareas que ejecutas con KubernetesExecutor usan el modelo de precios de Cloud Composer, ya que los pods con estas tareas se ejecutan en el clúster de tu entorno. Los SKUs de Compute de Cloud Composer (para CPU, memoria y almacenamiento) se aplican a estos pods.
Te recomendamos que ejecutes tareas con CeleryExecutor en los siguientes casos:
- El tiempo de inicio de las tareas es importante.
- Las tareas no requieren aislamiento en el tiempo de ejecución y no consumen muchos recursos.
Te recomendamos que ejecutes tareas con KubernetesExecutor en los siguientes casos:
- Las tareas requieren aislamiento en el tiempo de ejecución. Por ejemplo, para que las tareas no compitan por la memoria y la CPU, ya que se ejecutan en sus propios pods.
- Las tareas requieren muchos recursos y quieres controlar los recursos de CPU y memoria disponibles.
Comparación entre KubernetesExecutor y KubernetesPodOperator
Ejecutar tareas con KubernetesExecutor es similar a ejecutar tareas con KubernetesPodOperator. Las tareas se ejecutan en pods, lo que proporciona aislamiento de tareas a nivel de pod y una mejor gestión de los recursos.
Sin embargo, hay algunas diferencias importantes:
- KubernetesExecutor solo ejecuta tareas en el espacio de nombres de Cloud Composer con versiones de tu entorno. No se puede cambiar este espacio de nombres en Cloud Composer. Puedes especificar un espacio de nombres en el que KubernetesPodOperator ejecute tareas de pods.
- KubernetesExecutor puede usar cualquier operador de Airflow integrado. KubernetesPodOperator solo ejecuta una secuencia de comandos proporcionada definida por el punto de entrada del contenedor.
- KubernetesExecutor usa la imagen Docker predeterminada de Cloud Composer con los mismos paquetes de Python, las mismas anulaciones de opciones de configuración de Airflow, las mismas variables de entorno y los mismos paquetes de PyPI que se definen en tu entorno de Cloud Composer.
Acerca de las imágenes de Docker
De forma predeterminada, KubernetesExecutor inicia tareas con la misma imagen Docker que usa Cloud Composer para los trabajadores de Celery. Esta es la imagen de Cloud Composer de tu entorno, con todos los cambios que hayas especificado, como paquetes PyPI personalizados o variables de entorno.
Antes de empezar
Puedes usar CeleryKubernetesExecutor en Cloud Composer 3.
En Cloud Composer 3, no se puede usar ningún otro ejecutor que no sea CeleryKubernetesExecutor. Esto significa que puedes ejecutar tareas con CeleryExecutor, KubernetesExecutor o ambos en un DAG, pero no puedes configurar tu entorno para que solo use KubernetesExecutor o CeleryExecutor.
Configurar CeleryKubernetesExecutor
Puede que quieras anular las opciones de configuración de Airflow que estén relacionadas con KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Esta opción define el número de llamadas de creación de pods de trabajador de Kubernetes por bucle de programador. El valor predeterminado es
1
, por lo que solo se inicia un pod por latido del programador. Si usas KubernetesExecutor con frecuencia, te recomendamos que aumentes este valor.[kubernetes]worker_pods_pending_timeout
Esta opción define, en segundos, cuánto tiempo puede permanecer un trabajador en el estado
Pending
(se está creando el pod) antes de que se considere que ha fallado. El valor predeterminado es de 5 minutos.
Ejecutar tareas con KubernetesExecutor o CeleryExecutor
Puedes ejecutar tareas con CeleryExecutor, KubernetesExecutor o ambos en un mismo DAG:
- Para ejecutar una tarea con KubernetesExecutor, especifica el valor
kubernetes
en el parámetroqueue
de una tarea. - Para ejecutar una tarea con CeleryExecutor, omite el parámetro
queue
.
En el siguiente ejemplo se ejecuta la tarea task-kubernetes
con KubernetesExecutor y la tarea task-celery
con CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Ejecutar comandos de la CLI de Airflow relacionados con KubernetesExecutor
Puedes ejecutar varios comandos de la CLI de Airflow relacionados con KubernetesExecutor con gcloud
.
Personalizar la especificación del pod de trabajador
Puedes personalizar la especificación del pod de trabajador pasándola en el parámetro executor_config
de una tarea. Puedes usarlo para definir requisitos personalizados de CPU y memoria.
Puedes anular toda la especificación del pod de trabajador que se usa para ejecutar una tarea. Para obtener la especificación del pod de una tarea utilizada por KubernetesExecutor, puedes ejecutar el comando kubernetes generate-dag-yaml
de la CLI de Airflow.
Para obtener más información sobre cómo personalizar la especificación del pod de trabajador, consulta la documentación de Airflow.
Cloud Composer 3 admite los siguientes valores para los requisitos de recursos:
Recurso | Mínimo | Máximo | Paso |
---|---|---|---|
CPU | 0,25 | 32 | Valores de incremento: 0,25, 0,5, 1, 2, 4, 6, 8, 10, ..., 32. Los valores solicitados se redondean al valor de paso admitido más cercano (por ejemplo, de 5 a 6). |
Memoria | 2G (GB) | 128 G (GB) | Valores del incremento: 2, 3, 4, 5, ..., 128. Los valores solicitados se redondean al valor de paso admitido más cercano (por ejemplo, 3,5 G se redondea a 4 G). |
Almacenamiento | - | 100G (GB) | Cualquier valor. Si se solicitan más de 100 GB, solo se proporcionarán 100 GB. |
Para obtener más información sobre las unidades de recursos en Kubernetes, consulta Unidades de recursos en Kubernetes.
En el siguiente ejemplo se muestra una tarea que usa una especificación de pod de trabajador personalizada:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
Ver registros de tareas
Los registros de las tareas ejecutadas por KubernetesExecutor están disponibles en la pestaña Registros, junto con los registros de las tareas ejecutadas por CeleryExecutor:
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros.
Ve a Todos los registros > Registros de Airflow > Workers.
Los trabajadores llamados
airflow-k8s-worker
ejecutan tareas de KubernetesExecutor. Para buscar los registros de una tarea específica, puede usar un ID de DAG o un ID de tarea como palabra clave en la búsqueda.
Siguientes pasos
- Solucionar problemas de KubernetesExecutor
- Usar KubernetesPodOperator
- Usar operadores de GKE
- Anular opciones de configuración de Airflow