Utiliser CeleryKubernetesExecutor

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page explique comment activer CeleryKubernetesExecutor dans Cloud Composer et comment utiliser KubernetesExecutor dans vos DAG.

À propos de CeleryKubernetesExecutor

CeleryKubernetesExecutor est un type d'exécutant qui peut utiliser CeleryExecutor et KubernetesExecutor en même temps. Airflow sélectionne l'exécuteur en fonction de la file d'attente que vous définissez pour la tâche. Dans un DAG, vous pouvez exécuter certaines tâches avec CeleryExecutor et d'autres avec KubernetesExecutor:

  • CeleryExecutor est optimisé pour l'exécution rapide et évolutive des tâches.
  • KubernetesExecutor est conçu pour exécuter des tâches gourmandes en ressources et exécuter des tâches de manière isolée.

CeleryKubernetesExecutor dans Cloud Composer

CeleryKubernetesExecutor dans Cloud Composer vous permet d'utiliser KubernetesExecutor pour vos tâches. Il n'est pas possible d'utiliser KubernetesExecutor dans Cloud Composer indépendamment de CeleryKubernetesExecutor.

Cloud Composer exécute les tâches que vous exécutez avec KubernetesExecutor dans le cluster de votre environnement, dans le même espace de noms que les nœuds de calcul Airflow. Ces tâches ont les mêmes liaisons que les nœuds de calcul Airflow et peuvent accéder aux ressources de votre projet.

Les tâches que vous exécutez avec KubernetesExecutor utilisent le modèle de tarification de Cloud Composer, car les pods contenant ces tâches s'exécutent dans le cluster de votre environnement. Les SKU Compute de Cloud Composer (pour le processeur, la mémoire et le stockage) s'appliquent à ces pods.

Nous vous recommandons d'exécuter des tâches avec CeleryExecutor lorsque:

  • Le temps de démarrage de la tâche est important.
  • Les tâches ne nécessitent pas d'isolation d'exécution et ne sont pas gourmandes en ressources.

Nous vous recommandons d'exécuter des tâches avec KubernetesExecutor lorsque:

  • Les tâches nécessitent une isolation d'exécution. Par exemple, pour que les tâches ne se disputent pas la mémoire et le processeur, car elles s'exécutent dans leurs propres pods.
  • Les tâches sont gourmandes en ressources, et vous souhaitez contrôler les ressources de processeur et de mémoire disponibles.

KubernetesExecutor par rapport à KubernetesPodOperator

Exécuter des tâches avec KubernetesExecutor est semblable à exécuter des tâches à l'aide de KubernetesPodOperator. Les tâches sont exécutées dans des pods, ce qui permet d'assurer une isolation des tâches au niveau des pods et une meilleure gestion des ressources.

Toutefois, il existe quelques différences importantes:

  • KubernetesExecutor n'exécute les tâches que dans l'espace de noms Cloud Composer versionné de votre environnement. Il n'est pas possible de modifier cet espace de noms dans Cloud Composer. Vous pouvez spécifier un espace de noms dans lequel KubernetesPodOperator exécute les tâches de pod.
  • KubernetesExecutor peut utiliser n'importe quel opérateur Airflow intégré. KubernetesPodOperator n'exécute qu'un script fourni défini par le point d'entrée du conteneur.
  • KubernetesExecutor utilise l'image Docker Cloud Composer par défaut avec les mêmes remplacements d'options de configuration Python et Airflow, les mêmes variables d'environnement et les mêmes packages PyPI que ceux définis dans votre environnement Cloud Composer.

À propos des images Docker

Par défaut, KubernetesExecutor lance des tâches à l'aide de la même image Docker que Cloud Composer utilise pour les nœuds de calcul Celery. Il s'agit de l'image Cloud Composer de votre environnement, avec toutes les modifications que vous avez spécifiées pour votre environnement, telles que les packages PyPI personnalisés ou les variables d'environnement.

Avant de commencer

  • Vous pouvez utiliser CeleryKubernetesExecutor dans Cloud Composer 3.

  • Il n'est pas possible d'utiliser un autre exécuteur que CeleryKubernetesExecutor dans Cloud Composer 3. Cela signifie que vous pouvez exécuter des tâches à l'aide de CeleryExecutor, de KubernetesExecutor ou des deux dans un même DAG, mais vous ne pouvez pas configurer votre environnement pour n'utiliser que KubernetesExecutor ou CeleryExecutor.

Configurer CeleryKubernetesExecutor

Vous pouvez ignorer les options de configuration Airflow existantes associées à KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Cette option définit le nombre d'appels de création de pods de nœuds de calcul Kubernetes par boucle d'ordonnanceur. La valeur par défaut est 1. Par conséquent, un seul pod est lancé par battement de cœur de l'ordonnanceur. Si vous utilisez beaucoup KubernetesExecutor, nous vous recommandons d'augmenter cette valeur.

  • [kubernetes]worker_pods_pending_timeout

    Cette option définit, en secondes, la durée pendant laquelle un nœud de calcul peut rester dans l'état Pending (pod en cours de création) avant d'être considéré comme ayant échoué. La valeur par défaut est de cinq minutes.

Exécuter des tâches avec KubernetesExecutor ou CeleryExecutor

Vous pouvez exécuter des tâches à l'aide de CeleryExecutor, de KubernetesExecutor ou des deux dans un même DAG:

  • Pour exécuter une tâche avec KubernetesExecutor, spécifiez la valeur kubernetes dans le paramètre queue d'une tâche.
  • Pour exécuter une tâche avec CeleryExecutor, omettez le paramètre queue.

L'exemple suivant exécute la tâche task-kubernetes à l'aide de KubernetesExecutor et la tâche task-celery à l'aide de CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Exécuter des commandes de CLI Airflow liées à KubernetesExecutor

Vous pouvez exécuter plusieurs commandes de CLI Airflow liées à KubernetesExecutor à l'aide de gcloud.

Personnaliser la spécification du pod de nœuds de calcul

Vous pouvez personnaliser la spécification du pod de nœud de calcul en la transmettant dans le paramètre executor_config d'une tâche. Vous pouvez l'utiliser pour définir des exigences personnalisées en termes de processeur et de mémoire.

Vous pouvez remplacer l'intégralité de la spécification du pod de nœuds de calcul utilisé pour exécuter une tâche. Pour récupérer la spécification de pod d'une tâche utilisée par KubernetesExecutor, vous pouvez exécuter la commande de CLI Airflow kubernetes generate-dag-yaml.

Pour en savoir plus sur la personnalisation de la spécification du pod de worker, consultez la documentation Airflow.

Cloud Composer 3 accepte les valeurs suivantes pour les exigences en termes de ressources:

Ressource Minimum Maximum Étape
Processeur 0,25 32 Valeurs d'incrément: 0,25, 0,5, 1, 2, 4, 6, 8, 10, etc., 32. Les valeurs demandées sont arrondies à la valeur d'étape compatible la plus proche (par exemple, 5 à 6).
Mémoire 2 Go (Go) 128 Go Valeurs d'incrément: 2, 3, 4, 5, etc., 128. Les valeurs demandées sont arrondies à la valeur d'étape compatible la plus proche (par exemple, 3,5 G à 4 G).
Stockage - 100 Go (Go) N'importe quelle valeur. Si vous demandez plus de 100 Go, seuls 100 Go vous sont fournis.

Pour en savoir plus sur les unités de ressources dans Kubernetes, consultez la section Unités de ressources dans Kubernetes.

L'exemple suivant illustre une tâche qui utilise une spécification de pod de nœuds de calcul personnalisée:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

Afficher les journaux de tâches

Les journaux des tâches exécutées par KubernetesExecutor sont disponibles dans l'onglet Journaux, ainsi que les journaux des tâches exécutées par CeleryExecutor:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Journaux.

  4. Accédez à Tous les journaux > Journaux Airflow > Nœuds de calcul.

  5. Les nœuds de calcul nommés airflow-k8s-worker exécutent des tâches KubernetesExecutor. Pour rechercher les journaux d'une tâche spécifique, vous pouvez utiliser un ID de DAG ou un ID de tâche comme mot clé dans la recherche.

Étape suivante