Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
This page explains how to enable CeleryKubernetesExecutor in Cloud Composer and how to use KubernetesExecutor in your DAGs.
About CeleryKubernetesExecutor
CeleryKubernetesExecutor is a type of executor that can use CeleryExecutor and KubernetesExecutor at the same time. Airflow selects the executor based on the queue that you define for the task. In one DAG, you can run some tasks with CeleryExecutor, and other tasks with KubernetesExecutor:
- CeleryExecutor is optimized for fast and scalable execution of tasks.
- KubernetesExecutor is designed for execution of resource-intensive tasks and running tasks in isolation.
CeleryKubernetesExecutor in Cloud Composer
CeleryKubernetesExecutor in Cloud Composer provides the ability to use KubernetesExecutor for your tasks. It is not possible to use KubernetesExecutor in Cloud Composer separately from CeleryKubernetesExecutor.
Cloud Composer runs tasks that you execute with KubernetesExecutor in your environment's cluster, in the same namespace with Airflow workers. Such tasks have the same bindings as Airflow workers and can access resources in your project.
Tasks that you execute with KubernetesExecutor use the Cloud Composer pricing model, since pods with these tasks run in your environment's cluster. Cloud Composer Compute SKUs (for CPU, Memory, and Storage) apply to these pods.
We recommend to run tasks with the CeleryExecutor when:
- Task start-up time is important.
- Tasks do not require runtime isolation and are not resource-intensive.
We recommend to run tasks with the KubernetesExecutor when:
- Tasks require runtime isolation. For example, so that tasks do not compete for memory and CPU, since they run in their own pods.
- Tasks are resource-intensive and you want to control the available CPU and memory resources.
KubernetesExecutor compared to KubernetesPodOperator
Running tasks with KubernetesExecutor is similar to running tasks using KubernetesPodOperator. Tasks are executed in pods, thus providing pod-level task isolation and better resource management.
However, there are some key differences:
- KubernetesExecutor runs tasks only in the versioned Cloud Composer namespace of your environment. It is not possible to change this namespace in Cloud Composer. You can specify a namespace where KubernetesPodOperator runs pod tasks.
- KubernetesExecutor can use any built-in Airflow operator. KubernetesPodOperator executes only a provided script defined by the entrypoint of the container.
- KubernetesExecutor uses the default Cloud Composer Docker image with the same Python, Airflow configuration option overrides, environment variables, and PyPI packages that are defined in your Cloud Composer environment.
About Docker images
By default, KubernetesExecutor launches tasks using the same Docker image that Cloud Composer uses for Celery workers. This is the Cloud Composer image for your environment, with all changes that you specified for your environment, such as custom PyPI packages or environment variables.
Before you begin
You can use CeleryKubernetesExecutor in Cloud Composer 3.
It is not possible to use any executor other than CeleryKubernetesExecutor in Cloud Composer 3. This means you can run tasks using CeleryExecutor, KubernetesExecutor or both in one DAG, but it's not possible to configure your environment to only use KubernetesExecutor or CeleryExecutor.
Configure CeleryKubernetesExecutor
You might want to override existing Airflow configuration options that are related to KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
This option defines the number of Kubernetes Worker Pod creation calls per scheduler loop. The default value is
1
, so only a single pod is launched per scheduler heartbeat. If you use KubernetesExecutor heavily, we recommended to increase this value.[kubernetes]worker_pods_pending_timeout
This option defines, in seconds, how long a worker can stay in the
Pending
state (Pod is being created) before it is considered failed. The default value is 5 minutes.
Run tasks with KubernetesExecutor or CeleryExecutor
You can run tasks using CeleryExecutor, KubernetesExecutor, or both in one DAG:
- To run a task with KubernetesExecutor, specify the
kubernetes
value in thequeue
parameter of a task. - To run a task with CeleryExecutor, omit the
queue
parameter.
The following example runs the task-kubernetes
task using
KubernetesExecutor and the task-celery
task using CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Run Airflow CLI commands related to KubernetesExecutor
You can run several
Airflow CLI commands related to KubernetesExecutor
using gcloud
.
Customize worker pod spec
You can customize worker pod spec by passing it in the executor_config
parameter of a task. You can use this to define custom CPU and memory
requirements.
You can override the entire worker pod spec that is used to run a task. To
retrieve the pod spec of a task used by KubernetesExecutor, you can
run the kubernetes generate-dag-yaml
Airflow CLI
command.
For more information about customizing worker pod spec, see Airflow documentation.
Cloud Composer 3 supports the following values for resource requirements:
Resource | Minimum | Maximum | Step |
---|---|---|---|
CPU | 0.25 | 32 | Step values: 0.25, 0.5, 1, 2, 4, 6, 8, 10, ..., 32. Requested values are rounded up to the closest supported step value (for example, 5 to 6). |
Memory | 2G (GB) | 128G (GB) | Step values: 2, 3, 4, 5, ..., 128. Requested values are rounded up to the closest supported step value (for example, 3.5G to 4G). |
Storage | - | 100G (GB) | Any value. If more than 100 GB are requested, only 100 GB are provided. |
For more information about resource units in Kubernetes, see Resource units in Kubernetes.
The following example demonstrates a task that uses custom worker pod spec:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
View task logs
Logs of tasks executed by KubernetesExecutor are available in the Logs tab, together with logs of tasks run by CeleryExecutor:
In Google Cloud console, go to the Environments page.
In the list of environments, click the name of your environment. The Environment details page opens.
Go to the Logs tab.
Navigate to All logs > Airflow logs > Workers.
Workers named
airflow-k8s-worker
execute KubernetesExecutor tasks. To look for logs of a specific task, you can use a DAG id or a task id as a keyword in the search.
What's next
- Troubleshooting KubernetesExecutor
- Using KubernetesPodOperator
- Using GKE operators
- Overriding Airflow configuration options