Use CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

This page explains how to enable CeleryKubernetesExecutor in Cloud Composer and how to use KubernetesExecutor in your DAGs.

About CeleryKubernetesExecutor

CeleryKubernetesExecutor is a type of executor that can use CeleryExecutor and KubernetesExecutor at the same time. Airflow selects the executor based on the queue that you define for the task. In one DAG, you can run some tasks with CeleryExecutor, and other tasks with KubernetesExecutor:

  • CeleryExecutor is optimized for fast and scalable execution of tasks.
  • KubernetesExecutor is designed for execution of resource-intensive tasks and running tasks in isolation.

CeleryKubernetesExecutor in Cloud Composer

CeleryKubernetesExecutor in Cloud Composer provides the ability to use KubernetesExecutor for your tasks. It is not possible to use KubernetesExecutor in Cloud Composer separately from CeleryKubernetesExecutor.

Cloud Composer runs tasks that you execute with KubernetesExecutor in your environment's cluster, in the same namespace with Airflow workers. Such tasks have the same bindings as Airflow workers and can access resources in your project.

Tasks that you execute with KubernetesExecutor use the Cloud Composer pricing model, since pods with these tasks run in your environment's cluster. Cloud Composer Compute SKUs (for CPU, Memory, and Storage) apply to these pods.

We recommend to run tasks with the CeleryExecutor when:

  • Task start-up time is important.
  • Tasks do not require runtime isolation and are not resource-intensive.

We recommend to run tasks with the KubernetesExecutor when:

  • Tasks require runtime isolation. For example, so that tasks do not compete for memory and CPU, since they run in their own pods.
  • Tasks require additional system libraries (or PyPI packages).
  • Tasks are resource-intensive and you want to control the available CPU and memory resources.

KubernetesExecutor compared to KubernetesPodOperator

Running tasks with KubernetesExecutor is similar to running tasks using KubernetesPodOperator. Tasks are executed in pods, thus providing pod-level task isolation and better resource management.

However, there are some key differences:

  • KubernetesExecutor runs tasks only in the versioned Cloud Composer namespace of your environment. It is not possible to change this namespace in Cloud Composer. You can specify a namespace where KubernetesPodOperator runs pod tasks.
  • KubernetesExecutor can use any built-in Airflow operator. KubernetesPodOperator executes only a provided script defined by the entrypoint of the container.
  • KubernetesExecutor uses the default Cloud Composer Docker image with the same Python, Airflow configuration option overrides, environment variables, and PyPI packages that are defined in your Cloud Composer environment.

About Docker images

By default, KubernetesExecutor launches tasks using the same Docker image that Cloud Composer uses for Celery workers. This is the Cloud Composer image for your environment, with all changes that you specified for your environment, such as custom PyPI packages or environment variables.

Before you begin

  • You can use CeleryKubernetesExecutor in Cloud Composer 3.

  • It is not possible to use any executor other than CeleryKubernetesExecutor in Cloud Composer 3. This means you can run tasks using CeleryExecutor, KubernetesExecutor or both in one DAG, but it's not possible to configure your environment to only use KubernetesExecutor or CeleryExecutor.

Configure CeleryKubernetesExecutor

You might want to override existing Airflow configuration options that are related to KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    This option defines the number of Kubernetes Worker Pod creation calls per scheduler loop. The default value is 1, so only a single pod is launched per scheduler heartbeat. If you use KubernetesExecutor heavily, we recommended to increase this value.

  • [kubernetes]worker_pods_pending_timeout

    This option defines, in seconds, how long a worker can stay in the Pending state (Pod is being created) before it is considered failed. The default value is 5 minutes.

Run tasks with KubernetesExecutor or CeleryExecutor

You can run tasks using CeleryExecutor, KubernetesExecutor, or both in one DAG:

  • To run a task with KubernetesExecutor, specify the kubernetes value in the queue parameter of a task.
  • To run a task with CeleryExecutor, omit the queue parameter.

The following example runs the task-kubernetes task using KubernetesExecutor and the task-celery task using CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Run Airflow CLI commands related to KubernetesExecutor

You can run several Airflow CLI commands related to KubernetesExecutor using gcloud.

Customize worker pod spec

You can customize worker pod spec by passing it in the executor_config parameter of a task. You can use this to define custom CPU and memory requirements.

You can override the entire worker pod spec that is used to run a task. To retrieve the pod spec of a task used by KubernetesExecutor, you can run the kubernetes generate-dag-yaml Airflow CLI command.

For more information about customizing worker pod spec, see Airflow documentation.

The following example demonstrates a task that uses custom worker pod spec:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

View task logs

Logs of tasks executed by KubernetesExecutor are available in the Logs tab, together with logs of tasks run by CeleryExecutor:

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment. The Environment details page opens.

  3. Go to the Logs tab.

  4. Navigate to All logs > Airflow logs > Workers.

  5. Workers named airflow-k8s-worker execute KubernetesExecutor tasks. To look for logs of a specific task, you can use a DAG id or a task id as a keyword in the search.

What's next