CeleryKubernetesExecutor を使用する

Cloud Composer 1 | Cloud Composer 2

このページでは、Cloud Composer で CeleryKubernetesExecutor を有効にする方法と、DAG で KubernetesExecutor を使用する方法について説明します。

CeleryKubernetesExecutor について

CeleryKubernetesExecutor は、CeleryExecutor と KubernetesExecutor を同時に使用できるエグゼキュータの一種です。Airflow は、タスクに定義したキューに基づいてエグゼキュータを選択します。1 つの DAG で、CeleryExecutor を使用して一部のタスクを実行し、他のタスクを KubernetesExecutor を使用して実行できます。

  • CeleryExecutor は、タスクを高速かつスケーラブルに実行するために最適化されています。
  • KubernetesExecutor は、リソースを大量に消費するタスクの実行と、タスクを分離して実行するために設計されています。

Cloud Composer の CeleryKubernetesExecutor

Cloud Composer の CeleryKubernetesExecutor によって、タスクで KubernetesExecutor を使用できます。Cloud Composer で CeleryKubernetesExecutor とは別に KubernetesExecutor を使用することはできません。

Cloud Composer は、環境のクラスタ内の KubernetesExecutor を使用して実行するタスクを、Airflow ワーカーと同じ Namespace で実行します。このようなタスクには Airflow ワーカーと同じバインディングがあり、プロジェクト内のリソースにアクセスできます。

KubernetesExecutor で実行するタスクは、Cloud Composer 料金モデルを使用します。これらのタスクを持つ Pod は環境のクラスタで実行されるためです。Cloud Composer コンピューティング SKU(CPU、メモリ、ストレージ向け)はこれらの Pod に適用されます。

次のような場合は、CeleryExecutor を使用してタスクを実行することをおすすめします。

  • タスクの起動時間は重要です。
  • タスクはランタイムの分離を必要とせず、リソースを大量に消費しません。

次のような場合は、KubernetesExecutor を使用してタスクを実行することをおすすめします。

  • タスクにはランタイムの分離が必要です。たとえば、タスクが独自の Pod で実行されるため、タスクがメモリや CPU と競合しないようにします。
  • タスクには追加のシステム ライブラリ(または PyPI パッケージ)が必要です。
  • タスクはリソースを大量に消費するため、使用可能な CPU リソースとメモリリソースを制御する必要があります。

KubernetesExecutor と KubernetesPodOperator の比較

KubernetesExecutor でのタスクの実行は、KubernetesPodOperator を使用したタスクの実行と似ています。タスクは Pod で実行されるため、Pod レベルでタスクを分離し、リソースをより適切に管理できます。

ただし、いくつか重要な違いがあります。

  • KubernetesExecutor は、環境のバージョニングされた Cloud Composer Namespace でのみタスクを実行します。Cloud Composer でこの Namespace を変更することはできません。KubernetesPodOperator が Pod タスクを実行する Namespace を指定できます。
  • KubernetesExecutor では、任意の組み込み Airflow 演算子を使用できます。KubernetesPodOperator は、コンテナのエントリポイントで定義された提供されているスクリプトのみを実行します。
  • KubernetesExecutor は、Cloud Composer 環境で定義されているのと同じ Python、Airflow 構成オプションのオーバーライド、環境変数、PyPI パッケージとともにデフォルトの Cloud Composer Docker イメージを使用します。

Docker イメージについて

デフォルトでは、KubernetesExecutor は、Cloud Composer が Celery ワーカーに使用するのと同じ Docker イメージを使用してタスクを起動します。これは環境の Cloud Composer イメージです。カスタム PyPI パッケージや環境変数など、環境に対して指定したすべての変更が含まれています。

始める前に

  • Cloud Composer 3 で CeleryKubernetesExecutor を使用できます。

  • Cloud Composer 3 では、CeleryKubernetesExecutor 以外のエグゼキュータを使用することはできません。つまり、1 つの DAG で CeleryExecutor または KubernetesExecutor のいずれか、または両方を使用してタスクを実行できますが、KubernetesExecutor または CeleryExecutor のみを使用するように環境を構成することはできません。

CeleryKubernetesExecutor を構成する

KubernetesExecutor に関連する既存の Airflow 構成オプションをオーバーライドすることをおすすめします。

  • [kubernetes]worker_pods_creation_batch_size

    このオプションでは、スケジューラ ループあたりの Kubernetes ワーカー Pod の作成呼び出し回数を定義します。デフォルト値は 1 であるため、スケジューラのハートビートごとに 1 つの Pod のみが起動されます。KubernetesExecutor を頻繁に使用する場合は、この値を増やすことをおすすめします。

  • [kubernetes]worker_pods_pending_timeout

    このオプションは、ワーカーが Pending 状態(Pod が作成されている)を維持できる時間を秒単位で定義します。この期間が経過すると失敗したと見なされます。デフォルト値は 5 分です。

KubernetesExecutor または CeleryExecutor を使用してタスクを実行する

CeleryExecutor と KubernetesExecutor のいずれか、または両方を使用して、1 つの DAG でタスクを実行できます。

  • KubernetesExecutor を使用してタスクを実行するには、タスクの queue パラメータに kubernetes 値を指定します。
  • CeleryExecutor を使用してタスクを実行するには、queue パラメータを省略します。

次の例では、KubernetesExecutor を使用して task-kubernetes タスクを実行し、CeleryExecutor を使用して task-celery タスクを実行します。

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

KubernetesExecutor に関連する Airflow CLI コマンドを実行する

gcloud を使用して、KubernetesExecutor に関連する Airflow CLI コマンドをいくつか実行できます。

ワーカー Pod 仕様をカスタマイズする

ワーカー Pod 仕様をカスタマイズするには、タスクの executor_config パラメータでその仕様を渡します。これを使用して、カスタム CPU とメモリの要件を定義できます。

タスクの実行に使用されるワーカー Pod 仕様全体をオーバーライドできます。KubernetesExecutor が使用するタスクの Pod 仕様を取得するには、kubernetes generate-dag-yaml Airflow CLI コマンドを実行します。

ワーカー Pod 仕様のカスタマイズの詳細については、Airflow のドキュメントをご覧ください。

次の例は、カスタム ワーカー Pod 仕様を使用するタスクを示しています。

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

タスクログを表示する

KubernetesExecutor によって実行されたタスクのログは、CeleryExecutor によって実行されたタスクのログとともに、[Logs] タブで確認できます。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [ログ] タブに移動します。

  4. [すべてのログ] > [Airflow ログ] > [ワーカー] に移動します。

  5. airflow-k8s-worker という名前のワーカーが、KubernetesExecutor のタスクを実行します。特定のタスクのログを検索するには、検索で DAG ID またはタスク ID をキーワードとして使用できます。

次のステップ