Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本页面介绍了如何在 Cloud Composer 以及如何在 DAG 中使用 KubernetesExecutor。
CeleryKubernetesExecutor 简介
CeleryKubernetesExecutor 是一个 可以同时使用 CeleryExecutor 和 KubernetesExecutor 的 Executor 类型 。Airflow 会根据您为 任务。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务,并使用 KubernetesExecutor 运行其他任务:
- CeleryExecutor 经过优化,可快速执行任务。
- KubernetesExecutor 专用于执行资源密集型任务和隔离运行任务。
Cloud Composer 中的 CeleryKubernetesExecutor
Cloud Composer 中的 CeleryKubernetesExecutor 可让您为任务使用 KubernetesExecutor。无法在 Cloud Composer 中将 KubernetesExecutor 与 CeleryKubernetesExecutor 分开使用。
Cloud Composer 运行您使用 KubernetesExecutor 执行的任务 在环境的集群中,与 Airflow 工作器位于同一命名空间中。此类 具有与 Airflow 相同的绑定 工作器,并且可以访问项目中的资源。
由于包含这些任务的 Pod 在环境的集群中运行,因此您使用 KubernetesExecutor 执行的任务会采用 Cloud Composer 价格模式。Cloud Composer 计算 SKU(适用于 CPU、内存和存储)适用于这些 pod。
在以下情况下,我们建议您使用 CeleryExecutor 运行任务:
- 任务启动时间很重要。
- 任务不需要运行时隔离,也不会占用大量资源。
在以下情况下,我们建议您使用 KubernetesExecutor 运行任务:
- 任务需要运行时隔离。例如,这样任务不会 因为它们在各自的 Pod 中运行
- 任务需要额外的系统库(或 PyPI 软件包)。
- 任务非常耗资源,并且您希望控制可用的 CPU 和内存资源。
KubernetesExecutor 与 KubernetesPodOperator 的比较
使用 KubernetesExecutor 运行任务与使用 KubernetesPodOperator 运行任务类似。任务在 Pod 中执行,从而实现 Pod 级任务隔离和更好的资源管理。
不过,二者间也有一些重要的区别:
- KubernetesExecutor 仅在具有版本控制的 Cloud Composer 中运行任务 命名空间中您无法在 Cloud Composer 中更改此命名空间。您可以指定 KubernetesPodOperator 在其中运行 Pod 任务的命名空间。
- KubernetesExecutor 可以使用任何内置 Airflow 操作器。 KubernetesPodOperator 仅执行由容器的入口点定义的提供的脚本。
- KubernetesExecutor 使用默认的 Cloud Composer Docker 映像,其中包含与 Cloud Composer 环境中定义的 Python、Airflow 配置选项替换项、环境变量和 PyPI 软件包相同的配置。
Docker 映像简介
默认情况下,KubernetesExecutor 使用 Cloud Composer 用于 Celery 工作器。这是 为您的环境安装 Cloud Composer 映像, 您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量
准备工作
您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。
无法使用除 CeleryKubernetesExecutor 之外的任何执行器 。这意味着,您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或二者结合来运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。
配置 CeleryKubernetesExecutor
您可能需要替换与 KubernetesExecutor 相关的现有 Airflow 配置选项:
[kubernetes]worker_pods_creation_batch_size
此选项定义每个 Pod 的 Kubernetes 工作器 Pod 创建调用次数, 调度程序循环。默认值为
1
,因此仅启动一个 Pod 每个调度器检测信号。如果您大量使用 KubernetesExecutor 建议提高此值。[kubernetes]worker_pods_pending_timeout
此选项定义了工作器在
Pending
状态(正在创建 Pod)下可以保持多长时间(以秒为单位),之后才会被视为失败。默认值为 5 分钟。
使用 KubernetesExecutor 或 CeleryExecutor 运行任务
您可以在一个 DAG 中使用 CeleryExecutor 和/或 KubernetesExecutor 运行任务:
- 如需使用 KubernetesExecutor 运行任务,请在任务的
queue
参数中指定kubernetes
值。 - 如需使用 CeleryExecutor 运行任务,请省略
queue
参数。
以下示例使用以下代码运行 task-kubernetes
任务:
KubernetesExecutor 和使用 CeleryExecutor 的 task-celery
任务:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
运行与 KubernetesExecutor 相关的 Airflow CLI 命令
您可以使用 gcloud
运行多个与 KubernetesExecutor 相关的 Airflow CLI 命令。
自定义工作器 Pod 规范
您可以通过在任务的 executor_config
参数中传递工作器 Pod 规范来自定义工作器 Pod 规范。您可以用它来定义自定义 CPU 和内存
要求。
您可以替换用于运行任务的整个工作器 pod 规范。如需检索 KubernetesExecutor 使用的任务的 pod 规范,您可以运行 kubernetes generate-dag-yaml
Airflow CLI 命令。
如需详细了解如何自定义工作器 pod 规范,请参阅 Airflow 文档。
以下示例演示了使用自定义工作器 pod 规范的任务:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
查看任务日志
日志标签页中提供由 KubernetesExecutor 执行的任务的日志, 以及 CeleryExecutor 运行的任务日志:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页。
依次前往所有日志 > Airflow 日志 > 工作器。
执行名为
airflow-k8s-worker
的工作器 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。