使用 CeleryKubernetesExecutor

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页介绍了如何在 Cloud Composer 中启用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。

CeleryKubernetesExecutor 简介

CeleryKubernetesExecutor 是一种执行器,可同时使用 CeleryExecutor 和 KubernetesExecutor。Airflow 会根据您为任务定义的队列选择执行器。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务,并使用 KubernetesExecutor 运行其他任务:

  • CeleryExecutor 经过优化,可快速且可伸缩地执行任务。
  • KubernetesExecutor 专用于执行资源密集型任务和隔离运行任务。

Cloud Composer 中的 CeleryKubernetesExecutor

Cloud Composer 中的 CeleryKubernetesExecutor 可让您为任务使用 KubernetesExecutor。无法在 Cloud Composer 中单独使用 KubernetesExecutor 和 CeleryKubernetesExecutor。

Cloud Composer 会在环境集群中运行您使用 KubernetesExecutor 执行的任务,并与 Airflow 工作器位于同一命名空间中。此类任务与 Airflow 工作器具有相同的绑定,并且可以访问项目中的资源。

由于包含这些任务的 Pod 在环境的集群中运行,因此您使用 KubernetesExecutor 执行的任务会采用 Cloud Composer 价格模式。Cloud Composer 计算 SKU(适用于 CPU、内存和存储)适用于这些 pod。

在以下情况下,我们建议使用 CeleryExecutor 运行任务:

  • 任务启动时间很重要。
  • 任务不需要运行时隔离,也不会占用大量资源。

在以下情况下,我们建议您使用 KubernetesExecutor 运行任务:

  • 任务需要运行时隔离。例如,这样一来,任务便不会争用内存和 CPU,因为它们在各自的 Pod 中运行。
  • 任务非常耗资源,并且您希望控制可用的 CPU 和内存资源。

KubernetesExecutor 与 KubernetesPodOperator 的比较

使用 KubernetesExecutor 运行任务与使用 KubernetesPodOperator 运行任务类似。任务在 Pod 中执行,从而实现 Pod 级任务隔离和更好的资源管理。

不过,二者间也有一些重要的区别:

  • KubernetesExecutor 仅在您环境的版本化 Cloud Composer 命名空间中运行任务。您无法在 Cloud Composer 中更改此命名空间。您可以指定 KubernetesPodOperator 运行 pod 任务的命名空间。
  • KubernetesExecutor 可以使用任何内置 Airflow 运算符。KubernetesPodOperator 仅执行由容器入口点定义的提供的脚本。
  • KubernetesExecutor 使用默认的 Cloud Composer Docker 映像,其中包含与 Cloud Composer 环境中定义的 Python、Airflow 配置选项替换项、环境变量和 PyPI 软件包相同的配置。

Docker 映像简介

默认情况下,KubernetesExecutor 使用 Cloud Composer 为 Celery 工作器使用的 Docker 映像启动任务。这是您的环境的 Cloud Composer 映像,其中包含您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量。

准备工作

  • 您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。

  • 在 Cloud Composer 3 中,无法使用除 CeleryKubernetesExecutor 以外的任何执行器。这意味着,您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或二者结合来运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。

配置 CeleryKubernetesExecutor

您可能需要替换与 KubernetesExecutor 相关的现有 Airflow 配置选项:

  • [kubernetes]worker_pods_creation_batch_size

    此选项用于定义每个调度程序循环中 Kubernetes 工作器 Pod 创建调用的次数。默认值为 1,因此每个调度程序心跳只会启动一个 pod。如果您大量使用 KubernetesExecutor,我们建议您增加此值。

  • [kubernetes]worker_pods_pending_timeout

    此选项定义了工作器在 Pending 状态(正在创建 Pod)下可以保持多长时间(以秒为单位),之后才会被视为失败。默认值为 5 分钟。

使用 KubernetesExecutor 或 CeleryExecutor 运行任务

您可以在一个 DAG 中使用 CeleryExecutor 和/或 KubernetesExecutor 运行任务:

  • 如需使用 KubernetesExecutor 运行任务,请在任务的 queue 参数中指定 kubernetes 值。
  • 如需使用 CeleryExecutor 运行任务,请省略 queue 参数。

以下示例使用 KubernetesExecutor 运行 task-kubernetes 任务,使用 CeleryExecutor 运行 task-celery 任务:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

运行与 KubernetesExecutor 相关的 Airflow CLI 命令

您可以使用 gcloud 运行多个与 KubernetesExecutor 相关的 Airflow CLI 命令

自定义工作器 pod 规格

您可以通过在任务的 executor_config 参数中传递工作器 Pod 规范来自定义工作器 Pod 规范。您可以使用此属性来定义自定义 CPU 和内存要求。

您可以替换用于运行任务的整个工作器 pod 规范。如需检索 KubernetesExecutor 使用的任务的 pod 规范,您可以运行 kubernetes generate-dag-yaml Airflow CLI 命令。

如需详细了解如何自定义工作器 pod 规范,请参阅 Airflow 文档

Cloud Composer 3 支持以下资源要求值:

资源 最小值 最大值 步骤
CPU 0.25 32 步长值:0.25、0.5、1、2、4、6、8、10、...、32。系统会将请求的值向上舍入到最接近的支持的步长值(例如,5 舍入为 6)。
内存 2G (GB) 128G (GB) 步长值:2、3、4、5、...、128。系统会将请求的值向上舍入到最接近的支持的步长值(例如,3.5G 到 4G)。
存储 - 100G (GB) 任意值。如果请求的存储空间超过 100 GB,则系统只会提供 100 GB。

如需详细了解 Kubernetes 中的资源单位,请参阅 Kubernetes 中的资源单位

以下示例演示了使用自定义工作器 pod 规范的任务:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

查看任务日志

由 KubernetesExecutor 执行的任务的日志可在 Logs 标签页中找到,其中还包含由 CeleryExecutor 运行的任务的日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页。

  4. 依次前往所有日志 > Airflow 日志 > 工作器

  5. 名为 airflow-k8s-worker 的工作器会执行 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。

后续步骤