使用 CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍了如何在 Cloud Composer 中启用 CeleryKubernetesExecutor 以及如何在 DAG 中使用 KubernetesExecutor。

关于 CeleryKubernetesExecutor

CeleryKubernetesExecutor 是一种可同时使用 CeleryExecutor 和 KubernetesExecutor 的执行器。Airflow 根据您为任务定义的队列选择执行器。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务,使用 KubernetesExecutor 运行其他任务:

  • CeleryExecutor 经过优化,可快速执行任务。
  • KubernetesExecutor 旨在执行资源密集型任务和独立运行任务。

Cloud Composer 中的 CeleryKubernetesExecutor

Cloud Composer 中的 CeleryKubernetesExecutor 能够将 KubernetesExecutor 用于您的任务。在 Cloud Composer 中,KubernetesExecutor 无法与 CeleryKubernetesExecutor 分开使用。

Cloud Composer 会在环境集群(与 Airflow 工作器所在的命名空间)中运行您使用 KubernetesExecutor 执行的任务。此类任务与 Airflow 工作器具有相同的绑定,并且可以访问项目中的资源。

使用 KubernetesExecutor 执行的任务采用 Cloud Composer 价格模式,因为包含这些任务的 Pod 会在您环境的集群中运行。Cloud Composer 计算 SKU(针对 CPU、内存和存储空间)适用于这些 Pod。

在以下情况下,我们建议您使用 CeleryExecutor 运行任务:

  • 任务启动时间很重要。
  • 任务不需要运行时隔离,并且不需要占用大量资源。

在以下情况下,我们建议您使用 KubernetesExecutor 运行任务:

  • 任务需要运行时隔离。例如,这样任务就不会竞争内存和 CPU,因为它们在自己的 Pod 中运行。
  • 任务需要额外的系统库(或 PyPI 软件包)。
  • 任务会占用大量资源,因此您需要控制可用的 CPU 和内存资源。

KubernetesExecutor 与 KubernetesPodOperator 对比

使用 KubernetesExecutor 运行任务与使用 KubernetesPodOperator 运行任务类似。任务在 Pod 中执行,因此提供了 Pod 级别的任务隔离和更好的资源管理。

不过,它们也存在一些关键区别:

  • KubernetesExecutor 仅在您的环境的版本控制 Cloud Composer 命名空间中运行任务。您无法在 Cloud Composer 中更改此命名空间。您可以指定 KubernetesPodOperator 在其中运行 Pod 任务的命名空间。
  • KubernetesExecutor 可以使用任何内置 Airflow 操作器。 KubernetesPodOperator 仅执行由容器入口点定义的提供的脚本。
  • KubernetesExecutor 会使用默认的 Cloud Composer Docker 映像,其中包含您在 Cloud Composer 环境中定义的相同 Python、Airflow 配置选项替换项、环境变量和 PyPI 软件包。

Docker 映像简介

默认情况下,KubernetesExecutor 使用 Cloud Composer 用于 Celery 工作器的同一 Docker 映像来启动任务。这是您的环境的 Cloud Composer 映像,包含您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量。

准备工作

  • 您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。

  • 在 Cloud Composer 3 中,无法使用除 CeleryKubernetesExecutor 之外的任何执行器。这意味着您可以在一个 DAG 中使用 CeleryExecutor 和/或 KubernetesExecutor 运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。

配置 CeleryKubernetesExecutor

您可能希望替换与 KubernetesExecutor 相关的现有 Airflow 配置选项:

  • [kubernetes]worker_pods_creation_batch_size

    此选项定义每个调度器循环的 Kubernetes 工作器 Pod 创建调用次数。默认值为 1,因此每个调度器检测信号只启动一个 Pod。如果您大量使用 KubernetesExecutor,我们建议您提高此值。

  • [kubernetes]worker_pods_pending_timeout

    此选项定义工作器在被视为失败之前可以保持 Pending 状态(正在创建 Pod)的时间(以秒为单位)。默认值为 5 分钟。

使用 KubernetesExecutor 或 CeleryExecutor 运行任务

您可以在一个 DAG 中使用 CeleryExecutor 和/或 KubernetesExecutor 运行任务:

  • 如需使用 KubernetesExecutor 运行任务,请在任务的 queue 参数中指定 kubernetes 值。
  • 如需使用 CeleryExecutor 运行任务,请省略 queue 参数。

以下示例使用 KubernetesExecutor 运行 task-kubernetes 任务,使用 CeleryExecutor 运行 task-celery 任务:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

运行与 KubernetesExecutor 相关的 Airflow CLI 命令

您可以使用 gcloud 运行多个与 KubernetesExecutor 相关的 Airflow CLI 命令

自定义工作器 Pod 规范

您可以自定义工作器 Pod 规范,只需将其传入任务的 executor_config 参数即可。您可以使用此参数定义自定义 CPU 和内存要求。

您可以替换用于运行任务的整个工作器 Pod 规范。如需检索 KubernetesExecutor 使用的任务的 Pod 规范,您可以运行 kubernetes generate-dag-yaml Airflow CLI 命令。

如需详细了解如何自定义工作器 Pod 规范,请参阅 Airflow 文档

以下示例演示了使用自定义工作器 Pod 规范的任务:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

查看任务日志

Logs 标签页中提供 KubernetesExecutor 执行的任务的日志,以及 CeleryExecutor 运行的任务日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页。

  4. 依次点击所有日志 > Airflow 日志 > 工作器

  5. 名为 airflow-k8s-worker 的工作器执行 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。

后续步骤