使用 CeleryKubernetesExecutor

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页面介绍了如何在 Cloud Composer 中启用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。

关于 CeleryKubernetesExecutor

CeleryKubernetesExecutor 是一种可以同时使用 CeleryExecutor 和 KubernetesExecutor 的执行程序。Airflow 会根据您为任务定义的队列选择执行器。在一个 DAG 中,您可以运行一些使用 CeleryExecutor 的任务,以及一些使用 KubernetesExecutor 的任务:

  • CeleryExecutor 经过优化,可快速且可伸缩地执行任务。
  • KubernetesExecutor 旨在执行资源密集型任务并以隔离方式运行任务。

Cloud Composer 中的 CeleryKubernetesExecutor

Cloud Composer 中的 CeleryKubernetesExecutor 可让您为任务使用 KubernetesExecutor。在 Cloud Composer 中,无法单独使用 KubernetesExecutor,只能与 CeleryKubernetesExecutor 一起使用。

Cloud Composer 在环境的集群中运行您使用 KubernetesExecutor 执行的任务,这些任务与 Airflow 工作器位于同一命名空间中。此类任务与 Airflow 工作器具有相同的绑定,并且可以访问您项目中的资源。

使用 KubernetesExecutor 执行的任务会采用 Cloud Composer 价格模式,因为包含这些任务的 Pod 会在环境的集群中运行。Cloud Composer 计算 SKU(适用于 CPU、内存和存储)适用于这些 pod。

我们建议在以下情况下使用 CeleryExecutor 运行任务:

  • 任务启动时间非常重要。
  • 任务不需要运行时隔离,并且不占用大量资源。

我们建议在以下情况下使用 KubernetesExecutor 运行任务:

  • 任务需要运行时隔离。例如,由于任务在各自的 pod 中运行,因此不会争用内存和 CPU。
  • 任务属于资源密集型,并且您希望控制可用的 CPU 和内存资源。

KubernetesExecutor 与 KubernetesPodOperator 的比较

使用 KubernetesExecutor 运行任务与使用 KubernetesPodOperator 运行任务类似。任务在 pod 中执行,从而提供 pod 级任务隔离和更好的资源管理。

不过,这两者之间也存在一些重要的区别:

  • KubernetesExecutor 仅在您环境的已纳入版本的 Cloud Composer 命名空间中运行任务。在 Cloud Composer 中无法更改此命名空间。您可以指定 KubernetesPodOperator 运行 pod 任务的命名空间。
  • KubernetesExecutor 可以使用任何内置的 Airflow 运算符。KubernetesPodOperator 仅执行由容器的 entrypoint 定义的提供的脚本。
  • KubernetesExecutor 使用默认的 Cloud Composer Docker 映像,其中包含在 Cloud Composer 环境中定义的相同 Python、Airflow 配置选项替换、环境变量和 PyPI 软件包。

关于 Docker 映像

默认情况下,KubernetesExecutor 使用与 Cloud Composer 用于 Celery 工作器的 Docker 映像相同的映像来启动任务。这是您环境的 Cloud Composer 映像,其中包含您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量。

准备工作

  • 您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。

  • 在 Cloud Composer 3 中,无法使用 CeleryKubernetesExecutor 以外的任何执行器。这意味着,您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或同时使用两者来运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。

配置 CeleryKubernetesExecutor

您可能需要替换与 KubernetesExecutor 相关的现有 Airflow 配置选项:

  • [kubernetes]worker_pods_creation_batch_size

    此选项用于定义每个调度程序循环的 Kubernetes Worker Pod 创建调用次数。默认值为 1,因此每个调度器心跳仅启动一个 pod。如果您大量使用 KubernetesExecutor,建议您增加此值。

  • [kubernetes]worker_pods_pending_timeout

    此选项用于定义工作器在 Pending 状态(正在创建 Pod)下可停留的时长(以秒为单位),超过此时长即视为失败。默认值为 5 分钟。

使用 KubernetesExecutor 或 CeleryExecutor 运行任务

您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或两者同时运行任务:

  • 如需使用 KubernetesExecutor 运行任务,请在任务的 queue 参数中指定 kubernetes 值。
  • 如需使用 CeleryExecutor 运行任务,请省略 queue 参数。

以下示例使用 KubernetesExecutor 运行 task-kubernetes 任务,并使用 CeleryExecutor 运行 task-celery 任务:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

运行与 KubernetesExecutor 相关的 Airflow CLI 命令

您可以使用 gcloud 运行多个与 KubernetesExecutor 相关的 Airflow CLI 命令

自定义工作器 pod 规范

您可以通过在任务的 executor_config 参数中传递工作器 pod 规范来自定义该规范。您可以使用此属性定义自定义 CPU 和内存要求。

您可以替换用于运行任务的整个工作器 pod 规范。如需检索 KubernetesExecutor 使用的任务的 pod 规范,您可以运行 kubernetes generate-dag-yaml Airflow CLI 命令。

如需详细了解如何自定义工作器 pod 规范,请参阅 Airflow 文档

Cloud Composer 3 支持以下资源要求值:

资源 最小值 最大值 步骤
CPU 0.25 32 步进值:0.25、0.5、1、2、4、6、8、10、...、32。请求的值会向上舍入为最接近的支持的步数值(例如,5 舍入为 6)。
内存 2G(GB) 128G (GB) 步进值:2、3、4、5、...、128。请求的值会向上舍入为最接近的支持的步进值(例如,3.5G 舍入为 4G)。
存储 - 100G(GB) 任意值。如果请求的内存超过 100 GB,则仅提供 100 GB。

如需详细了解 Kubernetes 中的资源单位,请参阅 Kubernetes 中的资源单位

以下示例演示了使用自定义工作器 Pod 规范的任务:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

查看任务日志

由 KubernetesExecutor 执行的任务的日志可在日志标签页中找到,与由 CeleryExecutor 运行的任务的日志一起显示:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页。

  4. 依次前往所有日志 > Airflow 日志 > 工作器

  5. 名为 airflow-k8s-worker 的工作器执行 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。

后续步骤