Utiliser KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment utiliser KubernetesPodOperator pour lancer des pods Kubernetes à partir de Cloud Composer dans le cluster Google Kubernetes Engine faisant partie de votre environnement Cloud Composer. }votre environnement dispose des ressources appropriées.

KubernetesPodOperator lance des pods Kubernetes dans le cluster de votre environnement. En comparaison, les opérateurs Google Kubernetes Engine exécutent les pods Kubernetes dans un cluster spécifié, qui peut être un cluster distinct sans lien avec votre environnement. Vous pouvez également créer et supprimer des clusters à l'aide des opérateurs Google Kubernetes Engine.

L'utilisation de KubernetesPodOperator est recommandée si vous avez besoin :

  • Dépendances Python personnalisées qui ne sont pas disponibles via le dépôt public PyPI
  • Dépendances binaires qui ne sont pas disponibles dans l'image du nœud de calcul Cloud Composer en stock

Cette page présente un exemple de DAG incluant les configurations KubernetesPodOperator suivantes:

Avant de commencer

S'assurer que votre environnement dispose de suffisamment de ressources

Lorsque vous créez un environnement Cloud Composer, vous spécifiez ses paramètres de performances, y compris les paramètres de performances du cluster de l'environnement. Le lancement de pods Kubernetes dans le cluster d'environnement peut favoriser la concurrence pour les ressources du cluster, telles que le processeur ou la mémoire. Étant donné que le programmeur et les nœuds de calcul Airflow se trouvent dans le même cluster GKE, ils ne fonctionneront pas correctement si la concurrence entraîne un manque de ressources.

Pour éviter une pénurie de ressources, prenez l'une ou plusieurs des mesures suivantes :

Créer un pool de nœuds

La meilleure façon d'éviter le manque de ressources dans l'environnement Cloud Composer consiste à créer un pool de nœuds et à configurer les pods Kubernetes pour qu'ils s'exécutent à l'aide de Uniquement les ressources de ce pool.

Console

  1. Dans Google Cloud Console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Cliquez sur le nom de votre environnement.

  3. Sur la page Détails de l'environnement, accédez à l'onglet Configuration de l'environnement.

  4. Dans la section Ressources > Cluster GKE, suivez le lien afficher les détails du cluster.

  5. Créez un pool de nœuds, comme décrit dans la section Ajouter un pool de nœuds.

gcloud

  1. Déterminez le nom du cluster de votre environnement:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Remplacez :

    • ENVIRONMENT_NAME par le nom de l'environnement.
    • LOCATION par la région Compute Engine où se trouve l'environnement.
  2. Le résultat contient le nom du cluster de votre environnement. Par exemple, europe-west3-example-enviro-af810e25-gke.

  3. Créez un pool de nœuds, comme décrit dans la section Ajouter un pool de nœuds.

Augmenter le nombre de nœuds dans votre environnement

L'augmentation du nombre de nœuds de votre environnement Cloud Composer augmente la puissance de calcul disponible pour vos charges de travail. Cette augmentation ne fournit pas de ressources supplémentaires pour les tâches qui nécessitent plus de processeur ou de RAM que le type de machine spécifié.

Pour augmenter le nombre de nœuds, mettez à jour votre environnement.

Spécifier le type de machine approprié

Lors de la création de l'environnement Cloud Composer, vous pouvez spécifier un type de machine. Pour garantir les ressources disponibles, spécifiez un type de machine pour le type de calcul effectué dans votre environnement Cloud Composer.

Configuration de KubernetesPodOperator

Pour reprendre cet exemple, placez le codekubernetes_pod_operator.py dans votre environnementdags/ dossier ouajoutez lesKubernetesPodOperator code à un DAG.

Les sections suivantes décrivent chacune des configurations de KubernetesPodOperator de l'exemple. Pour plus d'informations sur chaque variable de configuration, consultez la documentation de référence Airflow.

Airflow 2

import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = Secret(
    deploy_type='volume',
    # Path where we mount the secret as volume
    deploy_target='/var/secrets/google',
    # Name of Kubernetes Secret
    secret='service-account',
    # Key in the form of service account file name
    key='service-account.json')

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json '})
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    kubernetes_affinity_ex = KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={})

Airflow 1

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    deploy_type='volume',
    # Path where we mount the secret as volume
    deploy_target='/var/secrets/google',
    # Name of Kubernetes Secret
    secret='service-account',
    # Key in the form of service account file name
    key='service-account.json')

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json '})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

Configuration minimale

Pour créer un objet KubernetesPodOperator, seuls les éléments name, namespace, image et task_id sont requis.

Lorsque vous placez l'extrait de code suivant dans un DAG, la configuration utilise les valeurs par défaut dans /home/airflow/composer_kube_config. Vous n'avez pas besoin de modifier le code pour que la tâche pod-ex-minimum aboutisse.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

Configuration du modèle

Airflow est compatible avec la modélisation Jinja. Vous devez déclarer les variables requises (task_id, name, namespace, image) avec l'opérateur. Comme le montre l'exemple suivant, vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds, arguments, env_vars et config_file.

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

Sans modification du DAG ou de votre environnement, la tâche ex-kube-templates échoue en raison de deux erreurs. Les journaux indiquent que cette tâche échoue, car la variable appropriée n'existe pas (my_value). La deuxième erreur, que vous pouvez obtenir après la correction de la première erreur, indique que la tâche échoue, car core/kube_config est introuvable dans config.

Pour corriger ces deux erreurs, suivez les étapes décrites plus en détail.

Pour définir my_value avec gcloud ou l'interface utilisateur d'Airflow, procédez comme suit :

Interface utilisateur d'Airflow

Dans l'interface utilisateur Airflow 2:

  1. Accédez à l'UI Airflow.

  2. Dans la barre d'outils, sélectionnez Admin > Variables (Administration > Variables).

  3. Sur la page List Variable (Variable de liste), cliquez sur Add a new record (Ajouter un enregistrement).

  4. Sur la page Add Variable (Ajouter une variable), saisissez les informations suivantes :

    • Key (Clé) : my_value
    • Val (Valeur) : example_value
  5. Cliquez sur Enregistrer.

Dans l'interface utilisateur Airflow 1:

  1. Accédez à l'UI Airflow.

  2. Dans la barre d'outils, sélectionnez Admin > Variables (Administration > Variables).

  3. Sur la page Variables, cliquez sur l'onglet Create (Créer).

  4. Sur la page Variable, saisissez les informations suivantes :

    • Key (Clé) : my_value
    • Val (Valeur) : example_value
  5. Cliquez sur Enregistrer.

gcloud

Pour Airflow 2, saisissez la commande suivante :

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Pour Airflow 1, saisissez la commande suivante :

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Remplacez :

  • ENVIRONMENT par le nom de l'environnement.
  • LOCATION par la région Compute Engine où se trouve l'environnement.

Pour faire référence à un config_file personnalisé (un fichier de configuration Kubernetes), remplacez l'option de configuration Airflow kube_config par une configuration Kubernetes valide:

Section Clé Valeur
core kube_config /home/airflow/composer_kube_config

Patientez quelques minutes pendant la mise à jour de votre environnement. Exécutez ensuite à nouveau la tâche ex-kube-templates et vérifiez que la tâche ex-kube-templates aboutit.

Configuration des variables "secret"

Un secret Kubernetes est un objet qui contient des données sensibles. Vous pouvez transmettre des secrets aux pods Kubernetes à l'aide de KubernetesPodOperator. Les secrets doivent être définis dans Kubernetes, faute de quoi le lancement du pod est impossible.

Cet exemple présente deux façons d'utiliser les secrets Kubernetes: en tant que variable d'environnement et en tant que volume installé par le pod.

Le premier secret, airflow-secrets, est défini sur une variable d'environnement Kubernetes nommée SQL_CONN (par opposition à une variable d'environnement Airflow ou Cloud Composer).

Le deuxième secret, service-account, installe service-account.json, un fichier contenant un jeton de compte de service, sur /var/secrets/google.

Voici à quoi ressemblent ces secrets :

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = Secret(
    deploy_type='volume',
    # Path where we mount the secret as volume
    deploy_target='/var/secrets/google',
    # Name of Kubernetes Secret
    secret='service-account',
    # Key in the form of service account file name
    key='service-account.json')

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    deploy_type='volume',
    # Path where we mount the secret as volume
    deploy_target='/var/secrets/google',
    # Name of Kubernetes Secret
    secret='service-account',
    # Key in the form of service account file name
    key='service-account.json')

Le nom du premier secret Kubernetes est défini dans la variable secret. Ce secret particulier est nommé airflow-secrets. Il est exposé en tant que variable d'environnement, comme indiqué par deploy_type. La variable d'environnement sur laquelle il est défini, deploy_target, est SQL_CONN. Enfin, la valeur key du secret stocké dans le fichier deploy_target est sql_alchemy_conn.

Le nom du second secret Kubernetes est défini dans la variable secret. Ce secret particulier est nommé service-account. Il est exposé en tant que volume, comme indiqué par deploy_type. deploy_target est le chemin d'accès au fichier à installer, /var/secrets/google. Enfin, l'élément key du secret stocké dans le fichier deploy_target est service-account.json.

Voici à quoi ressemble la configuration de l'opérateur :

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json '})

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json '})

Sans apporter de modifications au DAG ou à votre environnement, la tâche ex-kube-secrets échoue. Si vous consultez les journaux, la tâche échoue en raison d'une erreur Pod took too long to start. Cette erreur se produit car Airflow ne trouve pas le secret spécifié dans la configuration, secret_env.

gcloud

Pour définir le secret à l'aide de gcloud, procédez comme suit :

  1. Obtenez des informations sur votre cluster d'environnement Cloud Composer.

    1. Exécutez la commande suivante :

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Remplacez :

      • ENVIRONMENT par le nom de votre environnement.
      • LOCATION par la région dans laquelle se trouve l'environnement Cloud Composer.

      Le résultat de cette commande est au format suivant : projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Pour obtenir l'ID du cluster GKE, copiez le résultat après /clusters/ (se termine par -gke).

    3. Pour obtenir la zone, copiez le résultat après /zones/.

  2. Connectez-vous à votre cluster GKE en exécutant la commande suivante:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Remplacez :

    • CLUSTER_ID par l'ID de votre cluster GKE.
    • PROJECT par l'ID de votre projet Google Cloud.
    • ZONE par la zone où se trouve votre GKE.
  3. Créez des secrets Kubernetes

    1. Créez un secret Kubernetes qui définit la valeur de sql_alchemy_conn sur test_value en exécutant la commande suivante:

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    2. Créez un secret Kubernetes qui définit la valeur de service-account.json sur le chemin d'accès local d'un fichier de clé de compte de service appelé key.json en exécutant la commande suivante:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      
  4. Après avoir défini les secrets, exécutez à nouveau la tâche ex-kube-secrets dans l'interface utilisateur d'Airflow.

  5. Vérifiez que la tâche ex-kube-secrets est réussie.

Configuration de l'affinité du pod

Lorsque vous configurez le paramètre affinity dans KubernetesPodOperator, vous contrôlez les nœuds sur lesquels les pods sont programmés, par exemple les nœuds d'un pool de nœuds spécifique. Dans cet exemple, l'opérateur ne s'exécute que sur les pools de nœuds nommés pool-0 et pool-1. Vos nœuds d'environnement Cloud Composer 1 se trouvent dans default-pool. Par conséquent, vos pods ne s'exécutent pas sur les nœuds de votre environnement.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

Tel que l'exemple est actuellement configuré, la tâche échoue. Si vous examinez les journaux, la tâche échoue, car les pools de nœuds pool-0 et pool-1 n'existent pas.

Pour vous assurer que les pools de nœuds existent dans values, effectuez l'une des modifications de configuration suivantes:

  • Si vous avez créé un pool de nœuds précédemment, remplacez pool-0 et pool-1 par les noms de vos pools de nœuds, puis importez à nouveau votre DAG.

  • Créez un pool de nœuds nommé pool-0 ou pool-1. Vous pouvez créer les deux, mais la tâche n'en a besoin qu'une seule pour réussir.

  • Remplacez pool-0 et pool-1 par default-pool, qui est le pool par défaut utilisé par Airflow. Ensuite, importez à nouveau votre DAG.

Une fois les modifications effectuées, attendez quelques minutes que votre environnement se mette à jour. Exécutez ensuite à nouveau la tâche ex-pod-affinity et vérifiez que la tâche ex-pod-affinity aboutit.

Configuration complète

Cet exemple présente toutes les variables que vous pouvez configurer dans KubernetesPodOperator. Vous n'avez pas besoin de modifier le code pour que la tâche ex-all-configs aboutisse.

Pour en savoir plus sur chaque variable, consultez la documentation de référence sur Airflow KubernetesPodOperator.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={})

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

Dépannage

Conseils pour résoudre les problèmes liés aux pods

Outre la vérification des journaux de tâches dans l'interface utilisateur d'Airflow, vous pouvez également consulter les journaux suivants:

  • Sortie du programmeur et des nœuds de calcul Airflow :

    1. Dans Google Cloud Console, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Cliquez sur le lien DAG correspondant à votre environnement.

    3. Dans le bucket de votre environnement, montez d'un niveau.

    4. Examinez les journaux dans le dossier logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Journaux détaillés des pods dans Cloud Console, dans les charges de travail GKE. Ces journaux incluent le fichier YAML de définition des pods, les événements des pods et les détails des pods.

Codes de retour non nuls si GKEStartPodOperator est également utilisé

Lorsque vous utilisez KubernetesPodOperator et GKEStartPodOperator, le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.

Un modèle courant lors de l'utilisation de KubernetesPodOperator et de GKEStartPodOperator consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.

Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.

Délais d'inactivité des pods

Le délai d'inactivité par défaut de KubernetesPodOperator est de 120 secondes, ce qui signifie que ce délai peut être dépassé avant que des images volumineuses ne puissent être téléchargées. Vous pouvez augmenter le délai d'inactivité en modifiant le paramètre startup_timeout_seconds lorsque vous créez l'opérateur KubernetesPodOperator.

Lorsqu'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur d'Airflow. Exemple :

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Les délais d'expiration de pods peuvent également se produire lorsque le compte de service Cloud Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche en question. Pour le vérifier, examinez les erreurs au niveau du pod à l'aide des tableaux de bord GKE, afin d'examiner les journaux associés à votre charge de travail, ou utilisez Cloud Logging.

L'établissement d'une nouvelle connexion a échoué

La mise à niveau automatique est activée par défaut dans les clusters GKE. Si un pool de nœuds se trouve dans un cluster en cours de mise à niveau, l'erreur suivante peut s'afficher:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Pour vérifier si votre cluster est en cours de mise à niveau, accédez à la page Clusters Kubernetes dans Google Cloud Console et recherchez l'icône de chargement à côté du nom du cluster de votre environnement.

Étape suivante