importdatetimeimportairflowfromairflow.operators.python_operatorimportPythonOperatorwithairflow.DAG("composer_sample_celery_kubernetes",start_date=datetime.datetime(2022,1,1),schedule_interval="@daily")asdag:defkubernetes_example():print("This task runs using KubernetesExecutor")defcelery_example():print("This task runs using CeleryExecutor")# To run with KubernetesExecutor, set queue to kubernetestask_kubernetes=PythonOperator(task_id='task-kubernetes',python_callable=kubernetes_example,dag=dag,queue='kubernetes')# To run with CeleryExecutor, omit the queue argumenttask_celery=PythonOperator(task_id='task-celery',python_callable=celery_example,dag=dag)task_kubernetes >> task_celery
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-08-29。"],[[["\u003cp\u003eCeleryKubernetesExecutor in Cloud Composer allows for running tasks with either CeleryExecutor or KubernetesExecutor, based on the designated queue, enabling resource-intensive and isolated tasks alongside fast, scalable ones within the same DAG.\u003c/p\u003e\n"],["\u003cp\u003eTasks run with KubernetesExecutor in Cloud Composer use the environment's cluster and share the same bindings as Airflow workers, applying Cloud Composer Compute SKUs for pricing and running in the same namespace.\u003c/p\u003e\n"],["\u003cp\u003eKubernetesExecutor tasks utilize the same Docker image as Celery workers by default, and custom images are not supported, limiting configuration changes to the environment.\u003c/p\u003e\n"],["\u003cp\u003eYou can run tasks with KubernetesExecutor by setting the \u003ccode\u003equeue\u003c/code\u003e parameter to \u003ccode\u003ekubernetes\u003c/code\u003e, and run them with CeleryExecutor by omitting it, offering flexibility in task execution within a DAG.\u003c/p\u003e\n"],["\u003cp\u003eCustomizing worker pod specifications, like CPU and memory requirements, is possible within specific parameters and ranges when using KubernetesExecutor, ensuring tasks have adequate resources.\u003c/p\u003e\n"]]],[],null,["\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n**Cloud Composer 3** \\| Cloud Composer 2 \\| Cloud Composer 1\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page explains how to enable CeleryKubernetesExecutor in\nCloud Composer and how to use KubernetesExecutor in your DAGs.\n\nAbout CeleryKubernetesExecutor\n\n[CeleryKubernetesExecutor](https://airflow.apache.org/docs/apache-airflow/stable/executor/celery_kubernetes.html) is a\ntype of executor that can use CeleryExecutor and KubernetesExecutor at the same\ntime. Airflow selects the executor based on the queue that you define for the\ntask. In one DAG, you can run some tasks with CeleryExecutor, and other tasks\nwith KubernetesExecutor:\n\n- CeleryExecutor is optimized for fast and scalable execution of tasks.\n- KubernetesExecutor is designed for execution of resource-intensive tasks and running tasks in isolation.\n\nCeleryKubernetesExecutor in Cloud Composer\n\nCeleryKubernetesExecutor in Cloud Composer provides the ability to\nuse KubernetesExecutor for your tasks. It is not possible to use\nKubernetesExecutor in Cloud Composer separately from\nCeleryKubernetesExecutor.\n\nCloud Composer runs tasks that you execute with KubernetesExecutor\nin your environment's cluster, in the same namespace with Airflow workers. Such\ntasks have the same [bindings](/composer/docs/composer-3/access-control#composer-sa) as Airflow\nworkers and can access resources in your project.\n\nTasks that you execute with KubernetesExecutor use the\n[Cloud Composer pricing model](/composer/pricing), since pods with these\ntasks run in your environment's cluster. Cloud Composer Compute SKUs\n(for CPU, Memory, and Storage) apply to these pods.\n\nWe recommend to run tasks with the CeleryExecutor when:\n\n- Task start-up time is important.\n- Tasks do not require runtime isolation and are not resource-intensive.\n\nWe recommend to run tasks with the KubernetesExecutor when:\n\n- Tasks require runtime isolation. For example, so that tasks do not compete for memory and CPU, since they run in their own pods.\n- Tasks are resource-intensive and you want to control the available CPU and memory resources.\n\nKubernetesExecutor compared to KubernetesPodOperator\n\nRunning tasks with KubernetesExecutor is similar to\n[running tasks using KubernetesPodOperator](/composer/docs/composer-3/use-kubernetes-pod-operator). Tasks are executed in\npods, thus providing pod-level task isolation and better resource management.\n\nHowever, there are some key differences:\n\n- KubernetesExecutor runs tasks only in the versioned Cloud Composer namespace of your environment. It is not possible to change this namespace in Cloud Composer. You can specify a namespace where KubernetesPodOperator runs pod tasks.\n- KubernetesExecutor can use any built-in Airflow operator. KubernetesPodOperator executes only a provided script defined by the entrypoint of the container.\n- KubernetesExecutor uses the default Cloud Composer Docker image with the same Python, Airflow configuration option overrides, environment variables, and PyPI packages that are defined in your Cloud Composer environment.\n\nAbout Docker images\n\nBy default, KubernetesExecutor launches tasks using the same Docker image that\nCloud Composer uses for Celery workers. This is the\n[Cloud Composer image](/composer/docs/composer-versions) for your environment, with\nall changes that you specified for your environment, such as custom PyPI\npackages or environment variables.\n| **Warning:** Cloud Composer **does not support using custom images** with KubernetesExecutor.\n\nBefore you begin\n\n- You can use CeleryKubernetesExecutor in Cloud Composer 3.\n\n- It is not possible to use any executor other than CeleryKubernetesExecutor\n in Cloud Composer 3. This means you can run tasks using\n CeleryExecutor, KubernetesExecutor or both in one DAG, but it's not\n possible to configure your environment to only use KubernetesExecutor or\n CeleryExecutor.\n\nConfigure CeleryKubernetesExecutor\n\nYou might want to [override](/composer/docs/composer-3/override-airflow-configurations) existing Airflow configuration\noptions that are related to KubernetesExecutor:\n\n- `[kubernetes]worker_pods_creation_batch_size`\n\n This option defines the number of Kubernetes Worker Pod creation calls per\n scheduler loop. The default value is `1`, so only a single pod is launched\n per scheduler heartbeat. If you use KubernetesExecutor heavily, we\n recommended to increase this value.\n- `[kubernetes]worker_pods_pending_timeout`\n\n This option defines, in seconds, how long a worker can stay in the `Pending`\n state (Pod is being created) before it is considered failed. The default\n value is 5 minutes.\n\nRun tasks with KubernetesExecutor or CeleryExecutor\n\nYou can run tasks using CeleryExecutor, KubernetesExecutor, or both in one DAG:\n\n- To run a task with KubernetesExecutor, specify the `kubernetes` value in the `queue` parameter of a task.\n- To run a task with CeleryExecutor, omit the `queue` parameter.\n\n| **Note:** The default value for the `[celery_kubernetes_executor]kubernetes_queue` Airflow configuration option is `kubernetes`. You do not need to override this value.\n\nThe following example runs the `task-kubernetes` task using\nKubernetesExecutor and the `task-celery` task using CeleryExecutor: \n\n import datetime\n import airflow\n from airflow.operators.python_operator import PythonOperator\n\n with airflow.DAG(\n \"composer_sample_celery_kubernetes\",\n start_date=datetime.datetime(2022, 1, 1),\n schedule_interval=\"@daily\") as dag:\n\n def kubernetes_example():\n print(\"This task runs using KubernetesExecutor\")\n\n def celery_example():\n print(\"This task runs using CeleryExecutor\")\n\n # To run with KubernetesExecutor, set queue to kubernetes\n task_kubernetes = PythonOperator(\n task_id='task-kubernetes',\n python_callable=kubernetes_example,\n dag=dag,\n queue='kubernetes')\n\n # To run with CeleryExecutor, omit the queue argument\n task_celery = PythonOperator(\n task_id='task-celery',\n python_callable=celery_example,\n dag=dag)\n\n task_kubernetes \u003e\u003e task_celery\n\nRun Airflow CLI commands related to KubernetesExecutor\n\nYou can run several\n[Airflow CLI commands related to KubernetesExecutor](https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#kubernetes)\nusing `gcloud`.\n\nCustomize worker pod spec\n\nYou can customize worker pod spec by passing it in the `executor_config`\nparameter of a task. You can use this to define custom CPU and memory\nrequirements.\n\nYou can override the entire worker pod spec that is used to run a task. To\nretrieve the pod spec of a task used by KubernetesExecutor, you can\n[run the `kubernetes generate-dag-yaml`](#generate-dag-yaml) Airflow CLI\ncommand.\n\nFor more information about customizing worker pod spec, see\n[Airflow documentation](https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html#pod-override).\n| **Caution:** Always use `base` as the name of the container in the overridden pod spec.\n\nCloud Composer 3 supports the following values for resource requirements:\n\n| Resource | Minimum | Maximum | Step |\n|----------|---------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------|\n| CPU | 0.25 | 32 | Step values: 0.25, 0.5, 1, 2, 4, 6, 8, 10, ..., 32. Requested values are rounded up to the closest supported step value (for example, 5 to 6). |\n| Memory | 2G (GB) | 128G (GB) | Step values: 2, 3, 4, 5, ..., 128. Requested values are rounded up to the closest supported step value (for example, 3.5G to 4G). |\n| Storage | - | 100G (GB) | Any value. If more than 100 GB are requested, only 100 GB are provided. |\n\nFor more information about resource units in Kubernetes, see\n[Resource units in Kubernetes](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes).\n\nThe following example demonstrates a task that uses custom worker pod spec: \n\n PythonOperator(\n task_id='custom-spec-example',\n python_callable=f,\n dag=dag,\n queue='kubernetes',\n executor_config={\n 'pod_override': k8s.V1Pod(\n spec=k8s.V1PodSpec(\n containers=[\n k8s.V1Container(\n name='base',\n resources=k8s.V1ResourceRequirements(requests={\n 'cpu': '0.5',\n 'memory': '2G',\n })\n ),\n ],\n ),\n )\n },\n )\n\nView task logs\n\nLogs of tasks executed by KubernetesExecutor are available in the **Logs** tab,\ntogether with logs of tasks run by CeleryExecutor:\n\n1. In Google Cloud console, go to the **Environments** page.\n\n [Go to Environments](https://console.cloud.google.com/composer/environments)\n2. In the list of environments, click the name of your environment.\n The **Environment details** page opens.\n\n3. Go to the **Logs** tab.\n\n4. Navigate to **All logs** \\\u003e **Airflow logs**\n \\\u003e **Workers**.\n\n5. Workers named `airflow-k8s-worker` execute\n KubernetesExecutor tasks. To look for logs of a specific task, you can\n use a DAG id or a task id as a keyword in the search.\n\nWhat's next\n\n- [Troubleshooting KubernetesExecutor](/composer/docs/composer-3/troubleshooting-kubernetes-executor)\n- [Using KubernetesPodOperator](/composer/docs/composer-3/use-kubernetes-pod-operator)\n- [Using GKE operators](/composer/docs/composer-3/use-gke-operator)\n- [Overriding Airflow configuration options](/composer/docs/composer-3/override-airflow-configurations)"]]