Utiliser CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment activer CeleryKubernetesExecutor dans Cloud Composer et comment utiliser KubernetesExecutor dans vos DAG.

À propos de CeleryKubernetesExecutor

CeleryKubernetesExecutor est un type d'exécuteur pouvant utiliser CeleryExecutor et KubernetesExecutor en même temps en temps réel. Airflow sélectionne l'exécuteur en fonction de la file d'attente que vous définissez pour le tâche. Dans un DAG, vous pouvez exécuter certaines tâches avec CeleryExecutor et d'autres avec KubernetesExecutor:

  • CeleryExecutor est optimisé pour l'exécution rapide et évolutive des tâches.
  • KubernetesExecutor est conçu pour l'exécution de tâches gourmandes en ressources l'exécution de tâches de manière isolée.

CeleryKubernetesExecutor dans Cloud Composer

CeleryKubernetesExecutor dans Cloud Composer permet de : utilisez KubernetesExecutor pour vos tâches. Il n'est pas possible d'utiliser KubernetesExecutor dans Cloud Composer séparément du CeleryKubernetesExecutor

Cloud Composer exécute les tâches que vous exécutez avec KubernetesExecutor dans le cluster de votre environnement, dans le même espace de noms que les nœuds de calcul Airflow. Telles ont les mêmes liaisons qu'Airflow et peuvent accéder aux ressources de votre projet.

Les tâches que vous exécutez avec KubernetesExecutor utilisent Modèle de tarification Cloud Composer, étant donné que les pods comportant ces de tâches exécutées dans le cluster de votre environnement. SKU de calcul Cloud Composer (pour le processeur, la mémoire et le stockage) s'appliquent à ces pods.

Nous vous recommandons d'exécuter des tâches avec CeleryExecutor dans les cas suivants:

  • Le temps de démarrage des tâches est important.
  • Les tâches ne nécessitent pas d'isolation de l'environnement d'exécution et ne consomment pas beaucoup de ressources.

Nous vous recommandons d'exécuter des tâches avec KubernetesExecutor dans les cas suivants:

  • Les tâches nécessitent une isolation de l'environnement d'exécution. Par exemple, pour que les tâches n'entrent pas en concurrence pour la mémoire et le processeur, car ils s'exécutent dans leurs propres pods.
  • Les tâches nécessitent des bibliothèques système supplémentaires (ou des packages PyPI).
  • Les tâches sont gourmandes en ressources et vous souhaitez contrôler les ressources de processeur et de mémoire.

Comparaison entre KubernetesExecutor et KubernetesPodOperator

L'exécution de tâches avec KubernetesExecutor est semblable Exécution de tâches à l'aide de KubernetesPodOperator. Les tâches sont exécutées dans les pods, ce qui permet une isolation des tâches au niveau du pod et une meilleure gestion des ressources.

Il existe toutefois quelques différences clés:

  • KubernetesExecutor n'exécute les tâches que dans le Cloud Composer avec gestion des versions. de votre environnement. Impossible de modifier cet espace de noms dans Cloud Composer. Vous pouvez spécifier un espace de noms dans lequel KubernetesPodOperator exécute les tâches du pod.
  • KubernetesExecutor peut utiliser n'importe quel opérateur Airflow intégré. KubernetesPodOperator n'exécute qu'un script fourni défini par la le point d'entrée du conteneur.
  • KubernetesExecutor utilise l'image Docker Cloud Composer par défaut avec les mêmes valeurs de configuration Python, les remplacements d'options de configuration Airflow, variables et packages PyPI définis dans votre Cloud Composer.

À propos des images Docker

Par défaut, KubernetesExecutor lance les tâches à l'aide de la même image Docker que celle utilisée Cloud Composer utilise les nœuds de calcul Celery. Il s'agit de la Image Cloud Composer pour votre environnement, avec toutes les modifications que vous avez spécifiées pour votre environnement, comme des PyPI personnalisés ; des packages ou des variables d'environnement.

Avant de commencer

  • Vous pouvez utiliser CeleryKubernetesExecutor dans Cloud Composer 3.

  • Il n'est pas possible d'utiliser un autre exécuteur que CeleryKubernetesExecutor dans Cloud Composer 3. Cela signifie que vous pouvez exécuter des tâches en utilisant CeleryExecutor, KubernetesExecutor ou les deux dans un même DAG vous pouvez configurer votre environnement pour n'utiliser que KubernetesExecutor CeleryExecutor.

Configurer CeleryKubernetesExecutor

Vous voudrez peut-être remplacer la configuration Airflow existante liées à KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Cette option définit le nombre d'appels de création de pods de nœuds de calcul Kubernetes par boucle du programmeur. Comme la valeur par défaut est 1, un seul pod est lancé. par pulsation du programmeur. Si vous utilisez beaucoup KubernetesExecutor, est recommandé d'augmenter cette valeur.

  • [kubernetes]worker_pods_pending_timeout

    Cette option définit, en secondes, la durée pendant laquelle un nœud de calcul peut rester dans le Pending. (le pod est en cours de création) avant d'être considéré comme en échec. La valeur par défaut est de 5 minutes.

Exécuter des tâches avec KubernetesExecutor ou CeleryExecutor

Vous pouvez exécuter des tâches à l'aide de CeleryExecutor et/ou de KubernetesExecutor dans un même DAG:

  • Pour exécuter une tâche avec KubernetesExecutor, spécifiez la valeur kubernetes dans le champ Paramètre queue d'une tâche.
  • Pour exécuter une tâche avec CeleryExecutor, omettez le paramètre queue.

L'exemple suivant exécute la tâche task-kubernetes à l'aide de KubernetesExecutor et la tâche task-celery à l'aide de CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Exécuter les commandes de CLI Airflow associées à KubernetesExecutor

Vous pouvez exécuter plusieurs Commandes CLI Airflow associées à KubernetesExecutor avec gcloud.

Personnaliser la spécification du pod de nœud de calcul

Vous pouvez personnaliser la spécification du pod de nœud de calcul en la transmettant dans le executor_config. d'une tâche. Vous pouvez l'utiliser pour définir des ressources de processeur et de mémoire exigences.

Vous pouvez remplacer l'intégralité de la spécification du pod de nœuds de calcul utilisée pour exécuter une tâche. À récupérer la spécification de pod d'une tâche utilisée par KubernetesExecutor, vous pouvez exécutez la CLI Airflow kubernetes generate-dag-yaml. .

Pour en savoir plus sur la personnalisation de la spécification des pods de nœuds de calcul, consultez la section Documentation Airflow

L'exemple suivant illustre une tâche qui utilise la spécification de pod de nœuds de calcul personnalisés:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Afficher les journaux des tâches

Les journaux des tâches exécutées par KubernetesExecutor sont disponibles dans l'onglet Logs (Journaux). ainsi que les journaux des tâches exécutées par CeleryExecutor:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Journaux.

  4. Accédez à Tous les journaux > Journaux Airflow. > Nœuds de calcul.

  5. Les nœuds de calcul nommés airflow-k8s-worker s'exécutent KubernetesExecutor. Pour rechercher les journaux d'une tâche spécifique, vous pouvez utiliser un ID de DAG ou un ID de tâche comme mot clé dans la recherche.

Étape suivante