Utiliser KubernetesPodOperator

Cette page explique comment utiliser KubernetesPodOperator pour lancer les pods Kubernetes à partir de Cloud Composer dans le cluster Google Kubernetes Engine appartenant à votre environnement Cloud Composer et comment vous assurer que votre environnement dispose des ressources appropriées.

Ressources de l'environnement Cloud Composer dans le projet locataire et le projet client avec une flèche indiquant que les pods lancés seront dans le même cluster Kubernetes Engine que les nœuds de calcul Airflow, Redis, le programmeur Airflow et le proxy Cloud SQL
Emplacement de lancement des pods Kubernetes dans Cloud Composer (cliquez sur l'image pour l'agrandir)

L'utilisation de KubernetesPodOperator est recommandée si vous avez besoin :

  • de dépendances Python personnalisées qui ne sont pas disponibles via le dépôt PyPI public ;
  • de dépendances binaires qui ne sont pas disponibles dans l'image de nœud de calcul Cloud Composer issue de la banque d'images.

Cette page présente un exemple de DAG incluant les configurations KubernetesPodOperator suivantes :

Avant de commencer

Garantir des ressources adaptées à votre environnement

Lorsque vous créez un environnement Cloud Composer, vous spécifiez la puissance de calcul pour l'environnement, et une certaine portion de cette puissance est allouée au cluster GKE. Le lancement de pods Kubernetes dans l'environnement peut entraîner une concurrence entre les programmes pour accéder à certaines ressources, comme le processeur ou la mémoire. Dans la mesure où le programmeur et les nœuds de calcul Airflow appartiennent au même cluster GKE, ils ne fonctionneront pas correctement si cette concurrence entraîne un épuisement des ressources.

Pour éviter une pénurie de ressources, prenez l'une ou plusieurs des mesures suivantes :

Créer un pool de nœuds

La méthode privilégiée pour éviter l'épuisement des ressources dans l'environnement Cloud Composer est de créer un pool de nœuds et de configurer les pods Kubernetes pour qu'ils s'exécutent en exploitant exclusivement les ressources de ce pool.

Pour créer un pool de nœuds dans un cluster existant, procédez comme suit :

Console

  1. Dans Cloud Console, accédez au menu de GKE.

    Accéder au menu de GKE

  2. Sélectionnez le cluster souhaité.

  3. Cliquez sur Modifier.

  4. Dans Pools de nœuds, cliquez sur Ajouter un pool de nœuds.

  5. Configurez votre pool de nœuds.

  6. (Facultatif) Activez les options avancées, telles que les mises à jour automatiques et l'autoscaling.

  7. Cliquez sur Save.

gcloud

Saisissez la commande suivante :

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    --zone ZONE \
    ADDITIONAL_FLAGS 

Où :

  • POOL_NAME correspond au nom souhaité du pool de nœuds.
  • CLUSTER correspond au nom du cluster dans lequel vous souhaitez créer le pool de nœuds.
  • PROJECT_ID correspond au nom du projet Cloud Composer.
  • ZONE correspond à la zone où se trouve le cluster GKE.

    Pour obtenir la liste des options, consultez la documentation sur la commande gcloud container node-pools create.

    Si l'opération aboutit, la requête node-pools create renvoie les informations relatives au pool de nœuds :

    Creating node pool example-pool...done.
    Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
    NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
    example-pool  n1-standard-1  100           1.2.4

Augmenter le nombre de nœuds dans votre environnement

L'augmentation du nombre de nœuds dans votre environnement Cloud Composer augmente la puissance de calcul disponible pour vos nœuds de calcul. Cette augmentation ne fournit pas de ressources supplémentaires pour les tâches qui nécessitent plus de processeur ou de RAM que le type de machine spécifié.

Pour augmenter le nombre de nœuds, mettez à jour votre environnement.

Spécifier le type de machine approprié

Lors de la création de l'environnement Cloud Composer, vous pouvez spécifier un type de machine. Pour garantir la disponibilité des ressources, spécifiez le type de machine idéal pour le type de calcul qui se produit dans votre environnement Cloud Composer.

Configuration de KubernetesPodOperator

Pour suivre cet exemple, placez l'intégralité du fichier kubernetes_pod_operator.py dans le dossier/DAG de votre environnement ou ajoutez le code KubernetesPodOperator pertinent à un DAG.

Les sections suivantes décrivent chacune des configurations de KubernetesPodOperator de l'exemple. Pour plus d'informations sur chaque variable de configuration, consultez la documentation de référence Airflow.

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': 1, 'limit_cpu': 1},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

Configuration minimale

Pour créer un opérateur KubernetesPodOperator, seules les variables name, namespace, image et task_id sont obligatoires.

Lorsque vous placez l'extrait de code suivant dans un DAG, la configuration utilise les valeurs par défaut dans /home/airflow/composer_kube_config. Il n'est pas nécessaire de modifier le code pour que la tâche pod-ex-minimum réussisse.

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

Configuration du modèle

Airflow est compatible avec la modélisation Jinja. Vous devez déclarer les variables requises (task_id, name, namespace, image) avec l'opérateur. Comme le montre l'exemple suivant, vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds, arguments, env_vars et config_file.

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/code.html#default-variables

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

Sans modification du DAG ou de votre environnement, la tâche ex-kube-templates échoue en raison de deux erreurs. Voyons maintenant comment déboguer la tâche et résoudre les erreurs.

  1. Vérifiez que la tâche ex-kube-templates échoue.
  2. Vérifiez les journaux de la tâche ex-kube-templates.

    Les journaux indiquent que cette tâche échoue car la variable appropriée n'existe pas (my_value).

  3. Pour définir my_value avec gcloud ou l'interface utilisateur d'Airflow, procédez comme suit :

    gcloud

    Saisissez la commande suivante :

    gcloud composer environments run ENVIRONMENT \
        --location LOCATION \
        variables -- \
        --set my_value example_value 

    Où :

    • ENVIRONMENT correspond au nom de l'environnement Cloud Composer.
    • LOCATION correspond à la région où se trouve l'environnement Cloud Composer

    Interface utilisateur d'Airflow

    1. Dans la barre d'outils, cliquez sur Admin > Variables (Administration > Variables).
    2. Cliquez sur Create (Créer).
    3. Saisissez les informations suivantes :
      • Key (Clé) : my_value
      • Val (Valeur) : example_value
    4. Cliquez sur Save.
  4. Exécutez à nouveau la tâche ex-kube-templates.

  5. Vérifiez l'état de la tâche ex-kube-templates.

    La tâche ex-kube-templates continue d'échouer. Si vous consultez les journaux, la tâche échoue désormais car core/kubeconfig est introuvable dans config. Pour faire référence à un config_file personnalisé (fichier de configuration Kubernetes), vous devez définir la variable kube_config du fichier airflow.cfg sur une configuration Kubernetes valide.

  6. Pour définir la variable kube_config, entrez la commande suivante :

    gcloud composer environments update ENVIRONMENT \
        --location LOCATION \
        --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

    Où :

    • ENVIRONMENT correspond au nom de l'environnement Cloud Composer.
    • LOCATION correspond à la région où se trouve l'environnement Cloud Composer.
  7. Patientez quelques minutes pendant la mise à jour de votre environnement.

  8. Exécutez à nouveau la tâche ex-kube-templates.

  9. Vérifiez que la tâche ex-kube-templates réussit.

Configuration des variables "secret"

Un secret Kubernetes est un objet qui contient une petite quantité de données sensibles. Vous pouvez transmettre des secrets aux pods Kubernetes à l'aide de KubernetesPodOperator. Les secrets doivent être définis dans Kubernetes, faute de quoi le lancement du pod est impossible.

Dans cet exemple, nous déployons le secret Kubernetes airflow-secrets, dans une variable d'environnement Kubernetes nommée SQL_CONN (par opposition à une variable d'environnement Airflow ou Cloud Composer).

Voici à quoi ressemble ce secret :

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

Le nom du secret Kubernetes est défini dans la variable secret. Ce secret particulier est nommé airflow-secrets. Il est exposé en tant que variable d'environnement, comme indiqué par deploy_type. La variable d'environnement sur laquelle il est défini, deploy_target, est SQL_CONN. Enfin, la clé (key) du secret que nous stockons dans deploy_target est sql_alchemy_conn.

Voici à quoi ressemble la configuration de l'opérateur :

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})

Sans modification du DAG ou de votre environnement, la tâche ex-kube-secrets échoue. Voyons maintenant comment déboguer la tâche et résoudre les erreurs.

  1. Vérifiez que la tâche ex-kube-secrets échoue.
  2. Vérifiez les journaux de la tâche ex-kube-secrets.

    En consultant les journaux, vous pourrez constater que la tâche échoue en raison d'une erreur Pod took too long to start. Cette erreur se produit car Airflow ne trouve pas le secret spécifié dans la configuration, secret_env.

  3. Pour définir le secret à l'aide de gcloud, procédez comme suit :

    1. Pour afficher les détails de votre environnement Cloud Composer, exécutez la commande suivante :

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"

      où :

      • ENVIRONMENT correspond au nom de l'environnement Cloud Composer.
      • LOCATION correspond à la région où se trouve l'environnement Cloud Composer

        La sortie doit être semblable à ceci :projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. Pour obtenir l'ID du cluster GKE, copiez et collez la sortie après /clusters/ (se termine par -gke) dans un endroit où vous pourrez la récupérer plus tard. Cette sortie correspond à votre identifiant de cluster.

    3. Pour obtenir la zone, copiez et collez la sortie après /zones/ dans un endroit où vous pourrez la récupérer plus tard.

    4. Connectez-vous à votre cluster GKE en exécutant la commande suivante :

      gcloud container clusters get-credentials CLUSTER_ID \
      --zone ZONE \
      --project PROJECT
      

      Où :

      • CLUSTER_ID correspond à votre ID de cluster GKE.
      • ZONE correspond à la zone où se trouve votre GKE.
      • PROJECT est l'ID de votre projet Google Cloud.
    5. Créez un secret Kubernetes qui définit la valeur de sql_alchemy_conn sur test_value en exécutant la commande suivante :

      kubectl create secret generic airflow-secrets \
      --from-literal sql_alchemy_conn=test_value
      
  4. Après avoir défini le secret, exécutez à nouveau la tâche ex-kube-secrets.

  5. Vérifiez que la tâche ex-kube-secrets réussit.

Configuration de l'affinité du pod

Lorsque vous configurez le paramètre affinity dans KubernetesPodOperator, vous contrôlez les nœuds sur lesquels les pods sont programmés, par exemple les nœuds d'un pool de nœuds spécifique. Dans cet exemple, l'opérateur ne s'exécute que sur les pools de nœuds pool-0 et pool-1.

Ressources de l'environnement Cloud Composer dans le projet locataire et le projet client avec une flèche indiquant que les pods lancés seront dans le même cluster Kubernetes Engine que les nœuds de calcul Airflow, Redis, le programmeur Airflow et le proxy Cloud SQL, mais dans des pools particuliers, pool-0 et pool-1, qui apparaissent sous forme de zones distinctes à l'intérieur du cluster Kubernetes Engine.
Emplacement de lancement des pods Kubernetes de Cloud Composer avec affinité de pod (cliquez sur l'image pour l'agrandir)


kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

Tel que l'exemple est actuellement configuré, la tâche échoue. Voyons maintenant comment déboguer la tâche et résoudre les erreurs.

  1. Vérifiez que la tâche ex-pod-affinity échoue.
  2. Vérifiez les journaux de la tâche ex-pod-affinity.

    En consultant les journaux, vous pourrez constater que la tâche échoue car les pools de nœuds pool-0 et pool-1 n'existent pas.

  3. Pour vérifier que les pools de nœuds existent dans values, effectuez l'une des modifications de configuration suivantes :

    • Si vous avez créé un pool de nœuds précédemment, remplacez pool-0 et pool-1 par les noms de vos pools de nœuds, puis importez à nouveau votre DAG.
    • Créez un pool de nœuds nommé pool-0 ou pool-1. Vous pouvez créer les deux, mais la tâche n'a besoin que d'un pool pour réussir.
    • Remplacez pool-0 et pool-1 par default-pool, qui est le pool par défaut utilisé par Airflow. Ensuite, importez à nouveau votre DAG. Remarque : Par défaut, les pods Kubernetes sont programmés dans default-pool. Si vous ajoutez des pools ultérieurement, ceux-ci seront limités à default-pool.
  4. Patientez quelques minutes pendant la mise à jour de votre environnement.

  5. Exécutez à nouveau la tâche ex-pod-affinity.

  6. Vérifiez que la tâche ex-pod-affinity réussit.

Configuration complète

Cet exemple présente toutes les variables que vous pouvez configurer dans KubernetesPodOperator. Il n'est pas nécessaire de modifier le code pour que la tâche ex-all-configs réussisse.

Pour en savoir plus sur chaque variable, consultez la documentation de référence Airflow au sujet de KubernetesPodOperator.

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': 1, 'limit_cpu': 1},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

Gérer les DAG

Afficher l'état d'une tâche

  1. Accédez à l'interface Web d'Airflow.
  2. Sur la page DAG, cliquez sur le nom du DAG, par exemple composer_sample_kubernetes_pod.
  3. Sur la page des informations relatives au DAG, cliquez sur Graph View (Vue graphique).
  4. Vérifiez l'état :

    • Échec : la tâche apparaît dans un encadré rouge, comme pour ex-kube-templates ci-dessous. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention State: Failed (État : Échec).

    • Réussite : la tâche apparaît dans un encadré vert, comme pour pod-ex-minimum ci-dessous. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention State: Success (État : Réussite).

Vérifier les journaux d'une tâche

  1. Dans l'interface utilisateur d'Airflow, affichez l'état de la tâche.
  2. Dans la vue graphique du DAG, cliquez sur le nom de la tâche.
  3. Dans le menu contextuel des instances de la tâche, cliquez sur View Log (Afficher le journal).

Ré-exécuter une tâche

  1. Pour revenir au DAG :
    1. Dans la barre d'outils de l'interface utilisateur d'Airflow, cliquez sur DAG.
    2. Cliquez sur le nom du DAG, par exemple composer_samples_kubernetes_pod.
  2. Pour exécuter la tâche à nouveau, procédez comme suit :
    1. Cliquez sur le nom de la tâche.
    2. Cliquez sur Clear (Effacer), puis sur OK. La tâche s'exécute à nouveau automatiquement.

Dépannage

Conseils pour résoudre les problèmes liés aux pods

En plus de vérifier les journaux des tâches, vous devez également consulter les journaux suivants :

  • Sortie du programmeur et des nœuds de calcul Airflow :

    1. Accédez au bucket Cloud Storage de l'environnement Cloud Composer. Il s'agit du bucket dans lequel se trouvent les DAG.
    2. Examinez les journaux sous logs/DAG_NAME/TASK_ID/EXECUTION_DATE.
  • Journaux détaillés des pods dans Cloud Console, dans les charges de travail GKE. Ces journaux incluent le fichier YAML de définition des pods, les événements des pods et les détails des pods.

Codes de retour non nuls si GKEPodOperator est également utilisé

Lorsque vous utilisez KubernetesPodOperator et GKEPodOperator, le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.

Un schéma courant lors de l'utilisation de KubernetesPodOperator et de GKEPodOperator consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.

Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.

La tâche échoue malgré la réussite du pod

Dans les environnements Cloud Composer qui exécutent composer-1.4.1-airflow-* ou une version antérieure, procédez comme suit :

Si une tâche Airflow s'exécute pendant une heure et que les journaux de la tâche se terminent par kubernetes.client.rest.ApiException: (401) et Reason: Unauthorized, la tâche Kubernetes sous-jacente continue de s'exécuter après ce point et peut même réussir. Cependant, Airflow signale que la tâche a échoué.

Pour résoudre ce problème, ajoutez une dépendance explicite du package PyPI sur kubernetes>=8.0.1.

Délais d'inactivité des pods

Le délai d'inactivité par défaut de KubernetesPodOperator est de 120 secondes, ce qui signifie que ce délai peut être dépassé avant que des images volumineuses ne puissent être téléchargées. Vous pouvez augmenter le délai d'inactivité en modifiant le paramètre startup_timeout_seconds lorsque vous créez l'opérateur KubernetesPodOperator.

Lorsque le délai d'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur Web d'Airflow. Exemple :

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Un dépassement du délai d'inactivité d'un pod peut également se produire lorsque le compte de service Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche en question. Pour vérifier cela, examinez les erreurs au niveau du pod à l'aide des tableaux de bord GKE, afin de consulter les journaux de votre charge de travail en question, ou utilisez Stackdriver Logging.

L'établissement d'une nouvelle connexion a échoué

La mise à niveau automatique est activée par défaut dans les clusters GKE. Si un pool de nœuds se trouve dans un cluster en cours de mise à niveau, le message d'erreur suivant peut s'afficher :

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse

Pour vérifier si votre cluster effectue une mise à niveau, accédez au menu GKE dans Cloud Console et recherchez l'icône de chargement à côté du nom du cluster de votre environnement.

Ressources associées