CeleryKubernetesExecutor を使用する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Cloud Composer で CeleryKubernetesExecutor を有効にする方法と、DAG で KubernetesExecutor を使用する方法について説明します。

CeleryKubernetesExecutor について

CeleryKubernetesExecutor は、CeleryExecutor と KubernetesExecutor を同時に使用できるエグゼキュータの一種です。Airflow は、タスクに定義したキューに基づいてエグゼキュータを選択します。1 つの DAG で、一部のタスクを CeleryExecutor で実行し、他のタスクを KubernetesExecutor で実行できます。

  • CeleryExecutor は、タスクを高速かつスケーラブルに実行するように最適化されています。
  • KubernetesExecutor は、リソースを大量に消費するタスクの実行と、タスクの分離実行用に設計されています。

Cloud Composer の CeleryKubernetesExecutor

Cloud Composer の CeleryKubernetesExecutor を使用すると、タスクに KubernetesExecutor を使用できます。Cloud Composer で KubernetesExecutor を CeleryKubernetesExecutor とは別に使用することはできません。

Cloud Composer は、環境のクラスタ内の KubernetesExecutor を使用して実行するタスクを、Airflow ワーカーと同じ Namespace で実行します。このようなタスクは、Airflow ワーカーと同じバインディングを持ち、プロジェクト内のリソースにアクセスできます。

KubernetesExecutor で実行するタスクは、これらのタスクを含む Pod が環境のクラスタで実行されるため、Cloud Composer の料金モデルを使用します。これらの Pod には、Cloud Composer コンピューティング SKU(CPU、メモリ、ストレージ用)が適用されます。

次の場合は、CeleryExecutor でタスクを実行することをおすすめします。

  • タスクの起動時間が重要です。
  • タスクにはランタイムの分離は必要ありません。また、リソースを大量に消費することもありません。

次の場合は、KubernetesExecutor を使用してタスクを実行することをおすすめします。

  • タスクにはランタイムの分離が必要です。たとえば、タスクが独自の Pod で実行されるため、タスクがメモリや CPU と競合しないようにします。
  • タスクには追加のシステム ライブラリ(または PyPI パッケージ)が必要です。
  • タスクがリソースを大量に消費し、使用可能な CPU リソースとメモリリソースを制御する必要があります。

KubernetesExecutor と KubernetesPodOperator の比較

KubernetesExecutor でタスクを実行することは、KubernetesPodOperator を使用してタスクを実行することに似ています。タスクは Pod で実行されるため、Pod レベルでタスクを分離し、リソースをより適切に管理できます。

ただし、いくつか重要な違いがあります。

  • KubernetesExecutor は、環境のバージョニングされた Cloud Composer Namespace でのみタスクを実行します。Cloud Composer でこの Namespace を変更することはできません。KubernetesPodOperator が Pod タスクを実行する名前空間を指定できます。
  • KubernetesExecutor は、任意の組み込み Airflow 演算子を使用できます。KubernetesPodOperator は、コンテナのエントリポイントで定義された提供されているスクリプトのみを実行します。
  • KubernetesExecutor は、Cloud Composer 環境で定義されているのと同じ Python、Airflow 構成オプションのオーバーライド、環境変数、PyPI パッケージとともにデフォルトの Cloud Composer Docker イメージを使用します。

Docker イメージについて

デフォルトでは、KubernetesExecutor は、Cloud Composer が Celery ワーカーに使用する Docker イメージを使用してタスクを起動します。これは、環境の Cloud Composer イメージであり、環境に指定したすべての変更(カスタム PyPI パッケージや環境変数など)が含まれています。

始める前に

  • Cloud Composer 3 では CeleryKubernetesExecutor を使用できます。

  • Cloud Composer 3 では、CeleryKubernetesExecutor 以外のエグゼキュータを使用できません。つまり、1 つの DAG で CeleryExecutor、KubernetesExecutor、またはその両方を使用してタスクを実行できますが、KubernetesExecutor または CeleryExecutor のみを使用するように環境を構成することはできません。

CeleryKubernetesExecutor を構成する

KubernetesExecutor に関連する既存の Airflow 構成オプションをオーバーライドすることをおすすめします。

  • [kubernetes]worker_pods_creation_batch_size

    このオプションは、スケジューラ ループあたりの Kubernetes ワーカー Pod 作成呼び出しの数を定義します。デフォルト値は 1 であるため、スケジューラ ハートビートごとに 1 つの Pod のみが起動されます。KubernetesExecutor を頻繁に使用する場合は、この値を増やすことをおすすめします。

  • [kubernetes]worker_pods_pending_timeout

    このオプションは、ワーカーが Pending 状態(Pod の作成中)のままで、失敗と見なされるまでの時間を秒単位で定義します。デフォルト値は 5 分です。

KubernetesExecutor または CeleryExecutor を使用してタスクを実行する

1 つの DAG で CeleryExecutor、KubernetesExecutor、またはその両方を使用してタスクを実行できます。

  • KubernetesExecutor でタスクを実行するには、タスクの queue パラメータに kubernetes 値を指定します。
  • CeleryExecutor でタスクを実行するには、queue パラメータを省略します。

次の例では、KubernetesExecutor を使用して task-kubernetes タスクを実行し、CeleryExecutor を使用して task-celery タスクを実行します。

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

KubernetesExecutor に関連する Airflow CLI コマンドを実行する

gcloud を使用して、KubernetesExecutor に関連する Airflow CLI コマンドをいくつか実行できます。

ワーカー Pod 仕様をカスタマイズする

ワーカー Pod の仕様をカスタマイズするには、タスクの executor_config パラメータに渡します。これを使用して、カスタム CPU とメモリの要件を定義できます。

タスクの実行に使用されるワーカー Pod 仕様全体をオーバーライドできます。KubernetesExecutor で使用されるタスクの Pod 仕様を取得するには、kubernetes generate-dag-yaml Airflow CLI コマンドを実行します。

ワーカー Pod 仕様のカスタマイズの詳細については、Airflow のドキュメントをご覧ください。

次の例は、カスタム ワーカー Pod 仕様を使用するタスクを示しています。

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

タスクログを表示する

KubernetesExecutor によって実行されたタスクのログは、[ログ] タブで、CeleryExecutor によって実行されたタスクのログとともに確認できます。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [ログ] タブに移動します。

  4. [すべてのログ] > [Airflow ログ] > [ワーカー] に移動します。

  5. airflow-k8s-worker という名前のワーカーは、KubernetesExecutor タスクを実行します。特定のタスクのログを検索するには、検索のキーワードとして DAG ID またはタスク ID を使用します。

次のステップ