使用 CeleryKubernetesExecutor

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer„�

本頁說明如何在 Cloud Composer 中啟用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。

關於 CeleryKubernetesExecutor

CeleryKubernetesExecutor 是一種執行器,可同時使用 CeleryExecutor 和 KubernetesExecutor。Airflow 會根據您為工作定義的佇列選取執行器。在一個 DAG 中,您可以使用 CeleryExecutor 執行部分工作,並使用 KubernetesExecutor 執行其他工作:

  • CeleryExecutor 經過最佳化調整,可快速且大規模地執行工作。
  • KubernetesExecutor 專為執行耗用大量資源的工作而設計,且可獨立執行工作。

Cloud Composer 中的 CeleryKubernetesExecutor

Cloud Composer 中的 CeleryKubernetesExecutor 可讓您使用 KubernetesExecutor 執行工作。在 Cloud Composer 中,無法將 KubernetesExecutor 與 CeleryKubernetesExecutor 分開使用。

Cloud Composer 會在環境的叢集中執行您透過 KubernetesExecutor 執行的工作,並與 Airflow 工作站位於同一個命名空間。這類工作與 Airflow 工作人員具有相同的繫結,且可存取專案中的資源。

使用 KubernetesExecutor 執行的工作會採用 Cloud Composer 計費模式,因為這些工作的 Pod 會在環境的叢集中執行。這些 Pod 適用於 Cloud Composer 運算 SKU (CPU、記憶體和儲存空間)。

建議在下列情況下使用 CeleryExecutor 執行工作:

  • 工作啟動時間非常重要。
  • 工作不需要執行階段隔離,也不會耗用大量資源。

建議您在下列情況下使用 KubernetesExecutor 執行工作:

  • 工作需要執行階段隔離。舉例來說,由於工作會在自己的 Pod 中執行,因此不會爭用記憶體和 CPU。
  • 工作耗用大量資源,您想控管可用的 CPU 和記憶體資源。

KubernetesExecutor 與 KubernetesPodOperator 的比較

使用 KubernetesExecutor 執行工作,與使用 KubernetesPodOperator 執行工作類似。工作會在 Pod 中執行,因此可提供 Pod 層級的工作隔離,並提升資源管理成效。

不過,兩者之間還是有一些重要差異:

  • KubernetesExecutor 只會在環境的 Cloud Composer 命名空間中執行工作。您無法在 Cloud Composer 中變更這個命名空間。您可以指定 KubernetesPodOperator 執行 Pod 工作的命名空間。
  • KubernetesExecutor 可以使用任何內建的 Airflow 運算子。KubernetesPodOperator 只會執行容器的進入點定義的指令碼。
  • KubernetesExecutor 會使用預設的 Cloud Composer Docker 映像檔,以及在 Cloud Composer 環境中定義的相同 Python、Airflow 設定選項覆寫、環境變數和 PyPI 套件。

關於 Docker 映像檔

根據預設,KubernetesExecutor 會使用與 Cloud Composer 用於 Celery 工作站的 Docker 映像檔,啟動工作。這是環境的 Cloud Composer 映像檔,其中包含您為環境指定的所有變更,例如自訂 PyPI 套件或環境變數。

事前準備

  • 您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。

  • 在 Cloud Composer 3 中,您只能使用 CeleryKubernetesExecutor,也就是說,您可以在一個 DAG 中使用 CeleryExecutor、KubernetesExecutor 或兩者,但無法將環境設為只使用 KubernetesExecutor 或 CeleryExecutor。

設定 CeleryKubernetesExecutor

您可能想覆寫與 KubernetesExecutor 相關的現有 Airflow 設定選項:

  • [kubernetes]worker_pods_creation_batch_size

    這個選項會定義每個排程器迴圈的 Kubernetes 工作站 Pod 建立呼叫次數。預設值為 1,因此每個排程器心跳只會啟動一個 Pod。如果大量使用 KubernetesExecutor,建議增加這個值。

  • [kubernetes]worker_pods_pending_timeout

    這個選項會以秒為單位定義工作人員可停留在 Pending 狀態 (正在建立 Pod) 的時間長度,超過這個時間就會視為失敗。預設值為 5 分鐘。

使用 KubernetesExecutor 或 CeleryExecutor 執行工作

您可以在一個 DAG 中使用 CeleryExecutor、KubernetesExecutor 或兩者執行工作:

  • 如要使用 KubernetesExecutor 執行工作,請在工作的 queue 參數中指定 kubernetes 值。
  • 如要使用 CeleryExecutor 執行工作,請省略 queue 參數。

下列範例會使用 KubernetesExecutor 執行 task-kubernetes 工作,並使用 CeleryExecutor 執行 task-celery 工作:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

執行與 KubernetesExecutor 相關的 Airflow CLI 指令

您可以使用 gcloud 執行多個與 KubernetesExecutor 相關的 Airflow CLI 指令

自訂工作站 Pod 規格

您可以透過工作中的 executor_config 參數傳遞工作站 Pod 規格,藉此自訂規格。您可以使用這項功能定義自訂 CPU 和記憶體需求。

您可以覆寫用於執行工作的整個工作站 Pod 規格。如要擷取 KubernetesExecutor 使用的工作的 Pod 規格,可以執行 kubernetes generate-dag-yaml Airflow CLI 指令。

如要進一步瞭解如何自訂工作站 Pod 規格,請參閱 Airflow 說明文件

Cloud Composer 3 支援下列資源需求值:

資源 下限 上限 步驟
CPU 0.25 32 間距值:0.25、0.5、1、2、4、6、8、10、...、32。系統會將要求的值無條件進位至最接近的支援步階值 (例如 5 進位至 6)。
記憶體 2G (GB) 128G (GB) 間距值:2、3、4、5、...、128。系統會將要求的值無條件進位至最接近的支援步驟值 (例如 3.5G 進位至 4G)。
儲存空間 - 100G (GB) 任何值。如果要求的容量超過 100 GB,系統只會提供 100 GB。

如要進一步瞭解 Kubernetes 中的資源單位,請參閱「Kubernetes 中的資源單位」。

以下範例顯示使用自訂工作站 Pod 規格的工作:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

查看工作記錄

KubernetesExecutor 執行的工作記錄會顯示在「記錄」分頁中,與 CeleryExecutor 執行的工作記錄一起顯示:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

  3. 前往「記錄」分頁。

  4. 依序前往「所有記錄」>「Airflow 記錄」 >「工作人員」

  5. 名為 airflow-k8s-worker 的工作站會執行 KubernetesExecutor 工作。如要尋找特定工作的記錄,可以在搜尋時使用 DAG ID 或工作 ID 做為關鍵字。

後續步驟