使用 CeleryKubernetesExecutor

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍了如何在 Cloud Composer 以及如何在 DAG 中使用 KubernetesExecutor。

关于 CeleryKubernetesExecutor

CeleryKubernetesExecutor 是一个 可以同时使用 CeleryExecutor 和 KubernetesExecutor 的 Executor 类型 。Airflow 会根据您为 任务。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务以及其他任务 使用 KubernetesExecutor:

  • CeleryExecutor 经过优化,可快速执行任务。
  • KubernetesExecutor 旨在执行资源密集型任务和 隔离运行任务。

Cloud Composer 中的 CeleryKubernetesExecutor

Cloud Composer 中的 CeleryKubernetesExecutor 能够 使用 KubernetesExecutor 执行任务。无法使用 与 Cloud Composer 中的 KubernetesExecutor 分开 CeleryKubernetesExecutor。

Cloud Composer 运行您使用 KubernetesExecutor 执行的任务 在环境的集群中,与 Airflow 工作器位于同一命名空间中。此类 具有与 Airflow 相同的绑定 工作器,并且可以访问项目中的资源。

使用 KubernetesExecutor 执行的任务会使用 Cloud Composer 价格模式, 在环境的集群中运行的任务。Cloud Composer 计算 SKU (针对 CPU、内存和存储空间)适用于这些 Pod。

在以下情况下,我们建议您使用 CeleryExecutor 运行任务:

  • 任务启动时间很重要。
  • 任务不需要运行时隔离,并且不需要占用大量资源。

在以下情况下,我们建议您使用 KubernetesExecutor 运行任务:

  • 任务需要运行时隔离。例如,这样任务不会 因为它们在各自的 Pod 中运行
  • 任务需要额外的系统库(或 PyPI 软件包)。
  • 任务会占用大量资源,因此您需要控制 CPU 和内存资源。

KubernetesExecutor 与 KubernetesPodOperator 对比

使用 KubernetesExecutor 运行任务类似于 使用 KubernetesPodOperator 运行任务的方法。任务在 从而实现 Pod 级任务隔离,更好地管理资源。

不过,它们也存在一些关键区别:

  • KubernetesExecutor 仅在具有版本控制的 Cloud Composer 中运行任务 命名空间中无法更改此命名空间 Cloud Composer。您可以指定 KubernetesPodOperator 在其中运行 Pod 任务的命名空间。
  • KubernetesExecutor 可以使用任何内置 Airflow 操作器。 KubernetesPodOperator 仅执行 容器的入口点
  • KubernetesExecutor 使用默认 Cloud Composer Docker 映像 Airflow 配置选项替换、环境 变量和在命令行中指定的 PyPI 软件包 Cloud Composer 环境。

Docker 映像简介

默认情况下,KubernetesExecutor 使用 Cloud Composer 用于 Celery 工作器。这是 为您的环境安装 Cloud Composer 映像, 您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量

准备工作

  • 您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。

  • 无法使用除 CeleryKubernetesExecutor 之外的任何执行器 。这意味着你可以使用 CeleryExecutor 和/或 KubernetesExecutor 或两者均存在于一个 DAG 中,但并非 将您的环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。

配置 CeleryKubernetesExecutor

您可能需要覆盖现有 Airflow 配置 与 KubernetesExecutor 相关的选项:

  • [kubernetes]worker_pods_creation_batch_size

    此选项定义每个 Pod 的 Kubernetes 工作器 Pod 创建调用次数, 调度程序循环。默认值为 1,因此仅启动一个 Pod 每个调度器检测信号。如果您大量使用 KubernetesExecutor 建议提高此值。

  • [kubernetes]worker_pods_pending_timeout

    此选项定义 worker 可以在 Pending 中停留多长时间(以秒为单位) Pod 正在创建中)的状态,才会被视为失败。默认 值为 5 分钟。

使用 KubernetesExecutor 或 CeleryExecutor 运行任务

您可以在一个 DAG 中使用 CeleryExecutor 和/或 KubernetesExecutor 运行任务:

  • 如需使用 KubernetesExecutor 运行任务,请在kubernetes 任务的 queue 参数。
  • 如需使用 CeleryExecutor 运行任务,请省略 queue 参数。

以下示例使用以下代码运行 task-kubernetes 任务: KubernetesExecutor 和使用 CeleryExecutor 的 task-celery 任务:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

运行与 KubernetesExecutor 相关的 Airflow CLI 命令

你可以运行多个 与 KubernetesExecutor 相关的 Airflow CLI 命令 使用 gcloud

自定义工作器 Pod 规范

您可以自定义工作器 Pod 规范,只需将其传入 executor_config 即可 参数。您可以用它来定义自定义 CPU 和内存 要求。

您可以替换用于运行任务的整个工作器 Pod 规范。接收者 检索 KubernetesExecutor 使用的任务的 Pod 规范,您就可以 运行 kubernetes generate-dag-yaml Airflow CLI 命令。

如需详细了解如何自定义工作器 Pod 规范,请参阅 Airflow 文档

以下示例演示了使用自定义工作器 Pod 规范的任务:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

查看任务日志

日志标签页中提供由 KubernetesExecutor 执行的任务的日志, 以及 CeleryExecutor 运行的任务日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页。

  4. 依次点击所有日志 > Airflow 日志 > 工作器

  5. 执行名为 airflow-k8s-worker 的工作器 KubernetesExecutor 任务。如需查找特定任务的日志,您可以 在搜索中使用 DAG ID 或任务 ID 作为关键字。

后续步骤