使用 CeleryKubernetesExecutor

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍了如何在 Cloud Composer 以及如何在 DAG 中使用 KubernetesExecutor。

CeleryKubernetesExecutor 简介

CeleryKubernetesExecutor 是一个 可以同时使用 CeleryExecutor 和 KubernetesExecutor 的 Executor 类型 。Airflow 会根据您为 任务。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务,并使用 KubernetesExecutor 运行其他任务:

  • CeleryExecutor 经过优化,可快速执行任务。
  • KubernetesExecutor 专用于执行资源密集型任务和隔离运行任务。

Cloud Composer 中的 CeleryKubernetesExecutor

Cloud Composer 中的 CeleryKubernetesExecutor 可让您为任务使用 KubernetesExecutor。无法在 Cloud Composer 中将 KubernetesExecutor 与 CeleryKubernetesExecutor 分开使用。

Cloud Composer 运行您使用 KubernetesExecutor 执行的任务 在环境的集群中,与 Airflow 工作器位于同一命名空间中。此类 具有与 Airflow 相同的绑定 工作器,并且可以访问项目中的资源。

由于包含这些任务的 Pod 在环境的集群中运行,因此您使用 KubernetesExecutor 执行的任务会采用 Cloud Composer 价格模式。Cloud Composer 计算 SKU(适用于 CPU、内存和存储)适用于这些 pod。

在以下情况下,我们建议您使用 CeleryExecutor 运行任务:

  • 任务启动时间很重要。
  • 任务不需要运行时隔离,也不会占用大量资源。

在以下情况下,我们建议您使用 KubernetesExecutor 运行任务:

  • 任务需要运行时隔离。例如,这样任务不会 因为它们在各自的 Pod 中运行
  • 任务需要额外的系统库(或 PyPI 软件包)。
  • 任务非常耗资源,并且您希望控制可用的 CPU 和内存资源。

KubernetesExecutor 与 KubernetesPodOperator 的比较

使用 KubernetesExecutor 运行任务与使用 KubernetesPodOperator 运行任务类似。任务在 Pod 中执行,从而实现 Pod 级任务隔离和更好的资源管理。

不过,二者间也有一些重要的区别:

  • KubernetesExecutor 仅在具有版本控制的 Cloud Composer 中运行任务 命名空间中您无法在 Cloud Composer 中更改此命名空间。您可以指定 KubernetesPodOperator 在其中运行 Pod 任务的命名空间。
  • KubernetesExecutor 可以使用任何内置 Airflow 操作器。 KubernetesPodOperator 仅执行由容器的入口点定义的提供的脚本。
  • KubernetesExecutor 使用默认的 Cloud Composer Docker 映像,其中包含与 Cloud Composer 环境中定义的 Python、Airflow 配置选项替换项、环境变量和 PyPI 软件包相同的配置。

Docker 映像简介

默认情况下,KubernetesExecutor 使用 Cloud Composer 用于 Celery 工作器。这是 为您的环境安装 Cloud Composer 映像, 您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量

准备工作

  • 您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。

  • 无法使用除 CeleryKubernetesExecutor 之外的任何执行器 。这意味着,您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或二者结合来运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。

配置 CeleryKubernetesExecutor

您可能需要替换与 KubernetesExecutor 相关的现有 Airflow 配置选项:

  • [kubernetes]worker_pods_creation_batch_size

    此选项定义每个 Pod 的 Kubernetes 工作器 Pod 创建调用次数, 调度程序循环。默认值为 1,因此仅启动一个 Pod 每个调度器检测信号。如果您大量使用 KubernetesExecutor 建议提高此值。

  • [kubernetes]worker_pods_pending_timeout

    此选项定义了工作器在 Pending 状态(正在创建 Pod)下可以保持多长时间(以秒为单位),之后才会被视为失败。默认值为 5 分钟。

使用 KubernetesExecutor 或 CeleryExecutor 运行任务

您可以在一个 DAG 中使用 CeleryExecutor 和/或 KubernetesExecutor 运行任务:

  • 如需使用 KubernetesExecutor 运行任务,请在任务的 queue 参数中指定 kubernetes 值。
  • 如需使用 CeleryExecutor 运行任务,请省略 queue 参数。

以下示例使用以下代码运行 task-kubernetes 任务: KubernetesExecutor 和使用 CeleryExecutor 的 task-celery 任务:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

运行与 KubernetesExecutor 相关的 Airflow CLI 命令

您可以使用 gcloud 运行多个与 KubernetesExecutor 相关的 Airflow CLI 命令

自定义工作器 Pod 规范

您可以通过在任务的 executor_config 参数中传递工作器 Pod 规范来自定义工作器 Pod 规范。您可以用它来定义自定义 CPU 和内存 要求。

您可以替换用于运行任务的整个工作器 pod 规范。如需检索 KubernetesExecutor 使用的任务的 pod 规范,您可以运行 kubernetes generate-dag-yaml Airflow CLI 命令。

如需详细了解如何自定义工作器 pod 规范,请参阅 Airflow 文档

以下示例演示了使用自定义工作器 pod 规范的任务:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

查看任务日志

日志标签页中提供由 KubernetesExecutor 执行的任务的日志, 以及 CeleryExecutor 运行的任务日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页。

  4. 依次前往所有日志 > Airflow 日志 > 工作器

  5. 执行名为 airflow-k8s-worker 的工作器 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。

后续步骤