Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页介绍了如何在 Cloud Composer 中启用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。
CeleryKubernetesExecutor 简介
CeleryKubernetesExecutor 是一种执行器,可同时使用 CeleryExecutor 和 KubernetesExecutor。Airflow 会根据您为任务定义的队列选择执行器。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务,并使用 KubernetesExecutor 运行其他任务:
- CeleryExecutor 经过优化,可快速且可伸缩地执行任务。
- KubernetesExecutor 专用于执行资源密集型任务和隔离运行任务。
Cloud Composer 中的 CeleryKubernetesExecutor
Cloud Composer 中的 CeleryKubernetesExecutor 可让您为任务使用 KubernetesExecutor。无法在 Cloud Composer 中单独使用 KubernetesExecutor 和 CeleryKubernetesExecutor。
Cloud Composer 会在环境集群中运行您使用 KubernetesExecutor 执行的任务,并与 Airflow 工作器位于同一命名空间中。此类任务与 Airflow 工作器具有相同的绑定,并且可以访问项目中的资源。
由于包含这些任务的 Pod 在环境的集群中运行,因此您使用 KubernetesExecutor 执行的任务会采用 Cloud Composer 价格模式。Cloud Composer 计算 SKU(适用于 CPU、内存和存储)适用于这些 pod。
在以下情况下,我们建议使用 CeleryExecutor 运行任务:
- 任务启动时间很重要。
- 任务不需要运行时隔离,也不会占用大量资源。
在以下情况下,我们建议您使用 KubernetesExecutor 运行任务:
- 任务需要运行时隔离。例如,这样一来,任务便不会争用内存和 CPU,因为它们在各自的 Pod 中运行。
- 任务非常耗资源,并且您希望控制可用的 CPU 和内存资源。
KubernetesExecutor 与 KubernetesPodOperator 的比较
使用 KubernetesExecutor 运行任务与使用 KubernetesPodOperator 运行任务类似。任务在 Pod 中执行,从而实现 Pod 级任务隔离和更好的资源管理。
不过,二者间也有一些重要的区别:
- KubernetesExecutor 仅在您环境的版本化 Cloud Composer 命名空间中运行任务。您无法在 Cloud Composer 中更改此命名空间。您可以指定 KubernetesPodOperator 运行 pod 任务的命名空间。
- KubernetesExecutor 可以使用任何内置 Airflow 运算符。KubernetesPodOperator 仅执行由容器入口点定义的提供的脚本。
- KubernetesExecutor 使用默认的 Cloud Composer Docker 映像,其中包含与 Cloud Composer 环境中定义的 Python、Airflow 配置选项替换项、环境变量和 PyPI 软件包相同的配置。
Docker 映像简介
默认情况下,KubernetesExecutor 使用 Cloud Composer 为 Celery 工作器使用的 Docker 映像启动任务。这是您的环境的 Cloud Composer 映像,其中包含您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量。
准备工作
您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。
在 Cloud Composer 3 中,无法使用除 CeleryKubernetesExecutor 以外的任何执行器。这意味着,您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或二者结合来运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。
配置 CeleryKubernetesExecutor
您可能需要替换与 KubernetesExecutor 相关的现有 Airflow 配置选项:
[kubernetes]worker_pods_creation_batch_size
此选项用于定义每个调度程序循环中 Kubernetes 工作器 Pod 创建调用的次数。默认值为
1
,因此每个调度程序心跳只会启动一个 pod。如果您大量使用 KubernetesExecutor,我们建议您增加此值。[kubernetes]worker_pods_pending_timeout
此选项定义了工作器在
Pending
状态(正在创建 Pod)下可以保持多长时间(以秒为单位),之后才会被视为失败。默认值为 5 分钟。
使用 KubernetesExecutor 或 CeleryExecutor 运行任务
您可以在一个 DAG 中使用 CeleryExecutor 和/或 KubernetesExecutor 运行任务:
- 如需使用 KubernetesExecutor 运行任务,请在任务的
queue
参数中指定kubernetes
值。 - 如需使用 CeleryExecutor 运行任务,请省略
queue
参数。
以下示例使用 KubernetesExecutor 运行 task-kubernetes
任务,使用 CeleryExecutor 运行 task-celery
任务:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
运行与 KubernetesExecutor 相关的 Airflow CLI 命令
您可以使用 gcloud
运行多个与 KubernetesExecutor 相关的 Airflow CLI 命令。
自定义工作器 pod 规格
您可以通过在任务的 executor_config
参数中传递工作器 Pod 规范来自定义工作器 Pod 规范。您可以使用此属性来定义自定义 CPU 和内存要求。
您可以替换用于运行任务的整个工作器 pod 规范。如需检索 KubernetesExecutor 使用的任务的 pod 规范,您可以运行 kubernetes generate-dag-yaml
Airflow CLI 命令。
如需详细了解如何自定义工作器 pod 规范,请参阅 Airflow 文档。
Cloud Composer 3 支持以下资源要求值:
资源 | 最小值 | 最大值 | 步骤 |
---|---|---|---|
CPU | 0.25 | 32 | 步长值:0.25、0.5、1、2、4、6、8、10、...、32。系统会将请求的值向上舍入到最接近的支持的步长值(例如,5 舍入为 6)。 |
内存 | 2G (GB) | 128G (GB) | 步长值:2、3、4、5、...、128。系统会将请求的值向上舍入到最接近的支持的步长值(例如,3.5G 到 4G)。 |
存储 | - | 100G (GB) | 任意值。如果请求的存储空间超过 100 GB,则系统只会提供 100 GB。 |
如需详细了解 Kubernetes 中的资源单位,请参阅 Kubernetes 中的资源单位。
以下示例演示了使用自定义工作器 pod 规范的任务:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
查看任务日志
由 KubernetesExecutor 执行的任务的日志可在 Logs 标签页中找到,其中还包含由 CeleryExecutor 运行的任务的日志:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页。
依次前往所有日志 > Airflow 日志 > 工作器。
名为
airflow-k8s-worker
的工作器会执行 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。