Utiliser KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment utiliser KubernetesPodOperator pour déployer des pods Kubernetes à partir de Cloud Composer dans le cluster Google Kubernetes Engine appartenant à votre environnement Cloud Composer.

KubernetesPodOperator lance les pods Kubernetes dans le cluster de votre environnement. En comparaison, Les opérateurs Google Kubernetes Engine exécutent les pods Kubernetes dans un qui peut être un cluster distinct, sans lien avec votre environnement. Vous pouvez également créer et supprimer des clusters à l'aide d'opérateurs Google Kubernetes Engine.

KubernetesPodOperator est une bonne option si vous avez besoin :

  • de dépendances Python personnalisées qui ne sont pas disponibles via le dépôt PyPI public ;
  • de dépendances binaires qui ne sont pas disponibles dans l'image de nœud de calcul Cloud Composer issue de la banque d'images.

Avant de commencer

Configurer les ressources de votre environnement Cloud Composer

Lorsque vous créez un environnement Cloud Composer, vous spécifiez ses paramètres de performance, y compris les paramètres de performance du cluster de l'environnement. Le lancement de pods Kubernetes dans le cluster d'environnement peut la concurrence pour les ressources du cluster, telles que le processeur ou la mémoire. Dans la mesure où le programmeur et les nœuds de calcul Airflow appartiennent au même cluster GKE, ils ne fonctionneront pas correctement si cette concurrence entraîne un épuisement des ressources.

Pour éviter une pénurie de ressources, prenez l'une ou plusieurs des mesures suivantes :

Créer un pool de nœuds

La méthode privilégiée pour éviter l'épuisement des ressources dans l'environnement Cloud Composer est de créer un pool de nœuds et de configurer les pods Kubernetes pour qu'ils s'exécutent en exploitant exclusivement les ressources de ce pool.

Console

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Cliquez sur le nom de votre environnement.

  3. Sur la page Détails de l'environnement, accédez à l'onglet Configuration de l'environnement.

  4. Accédez à Ressources > Cluster GKE. cliquez sur le lien Afficher les détails du cluster.

  5. Créez un pool de nœuds comme décrit dans la section Ajouter un pool de nœuds.

gcloud

  1. Déterminez le nom du cluster de votre environnement:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Remplacez :

    • ENVIRONMENT_NAME par le nom de l'environnement.
    • LOCATION par la région où se trouve l'environnement.
  2. La sortie contient le nom du cluster de votre environnement. Par exemple, europe-west3-example-enviro-af810e25-gke.

  3. Créez un pool de nœuds comme décrit dans la section Ajouter un pool de nœuds.

Augmenter le nombre de nœuds dans votre environnement

L'augmentation du nombre de nœuds dans votre environnement Cloud Composer augmente la puissance de calcul disponible pour vos charges de travail. Cette augmentation ne fournit pas de ressources supplémentaires pour les tâches qui nécessitent plus de processeur ou de RAM que le type de machine spécifié.

Pour augmenter le nombre de nœuds, mettez à jour votre environnement.

Spécifier le type de machine approprié

Lors de la création de l'environnement Cloud Composer, vous pouvez spécifier un type de machine. Pour garantir la disponibilité des ressources, spécifiez un type de machine pour le type de calcul qui se produit dans votre environnement Cloud Composer.

Configuration minimale

Pour créer un opérateur KubernetesPodOperator, seuls les paramètres name, image à utiliser et task_id du pod sont obligatoires. /home/airflow/composer_kube_config qui contient des identifiants permettant de s'authentifier auprès de GKE.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Configuration de l'affinité de pod

Lorsque vous configurez le paramètre affinity dans KubernetesPodOperator, vous contrôler les nœuds sur lesquels les pods doivent être programmés, par exemple les nœuds uniquement dans un d'un pool de nœuds. Dans cet exemple, l'opérateur ne s'exécute que sur les pools de nœuds nommés pool-0 et pool-1. Vos nœuds d'environnement Cloud Composer 1 se trouvant dans default-pool, vos pods ne s'exécutent pas sur les nœuds de votre environnement.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Comme l'exemple est configuré, la tâche échoue. Si vous examinez les journaux, la tâche échoue, car les pools de nœuds pool-0 et pool-1 n'existent pas.

Pour vérifier que les pools de nœuds existent dans values, effectuez l'une des modifications de configuration suivantes :

  • Si vous avez créé un pool de nœuds précédemment, remplacez pool-0 et pool-1 par les noms de vos pools de nœuds, puis importez à nouveau votre DAG.

  • Créez un pool de nœuds nommé pool-0 ou pool-1. Vous pouvez créer les deux, mais la tâche n'a besoin que d'un pool pour réussir.

  • Remplacez pool-0 et pool-1 par default-pool, qui est le pool par défaut utilisé par Airflow. Ensuite, importez à nouveau votre DAG.

Une fois les modifications effectuées, attendez quelques minutes que votre environnement soit mis à jour. Exécutez ensuite à nouveau la tâche ex-pod-affinity et vérifiez que la tâche ex-pod-affinity réussit.

Configuration supplémentaire

Cet exemple présente des paramètres supplémentaires que vous pouvez configurer dans KubernetesPodOperator.

Pour en savoir plus sur les paramètres, consultez la documentation de référence Airflow pour KubernetesPodOperator. Pour en savoir plus sur l'utilisation des secrets Kubernetes ConfigMaps, consultez la page Utiliser les secrets et les ConfigMaps de Kubernetes. Pour sur l'utilisation de modèles Jinja avec KubernetesPodOperator, consultez Utilisez les modèles Jinja.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Utiliser des modèles Jinja

Airflow est compatible avec les modèles Jinja dans les DAG.

Vous devez déclarer les paramètres Airflow requis (task_id, name et image) avec l'opérateur. Comme le montre l'exemple suivant, Vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds, arguments, env_vars et config_file.

Le paramètre env_vars de l'exemple est défini à partir d'une variable Airflow nommée my_value. L'exemple de DAG obtient sa valeur à partir de la variable de modèle vars dans Airflow. Airflow dispose de plus de variables qui permettent d'accéder à différents types d'informations. Par exemple, vous pouvez utiliser la variable de modèle conf pour accéder aux valeurs des options de configuration d'Airflow. Pour en savoir plus et des variables disponibles dans Airflow, consultez Documentation de référence sur les modèles dans Airflow dans la documentation Google Cloud.

Sans modifier le DAG ni créer la variable env_vars, Dans l'exemple, la tâche ex-kube-templates échoue, car la variable n'a pas existent. Créez cette variable dans l'interface utilisateur d'Airflow ou avec Google Cloud CLI :

Interface utilisateur d'Airflow

  1. Accédez à l'interface utilisateur d'Airflow.

  2. Dans la barre d'outils, sélectionnez Admin > Variables.

  3. Sur la page List Variable (Variable de liste), cliquez sur Add a new record (Ajouter un enregistrement).

  4. Sur la page Add Variable (Ajouter une variable), saisissez les informations suivantes :

    • Key (Clé) : my_value
    • Val (Valeur) : example_value
  5. Cliquez sur Enregistrer.

Si votre environnement utilise Airflow 1, exécutez plutôt la commande suivante :

  1. Accédez à l'interface utilisateur d'Airflow.

  2. Dans la barre d'outils, sélectionnez Admin > Variables (Administration > Variables).

  3. Sur la page Variables, cliquez sur l'onglet Create (Créer).

  4. Sur la page Variable, saisissez les informations suivantes :

    • Key (Clé) : my_value
    • Val (Valeur) : example_value
  5. Cliquez sur Enregistrer.

gcloud

Saisissez la commande suivante :

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Si votre environnement utilise Airflow 1, exécutez la commande suivante:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Remplacez :

  • ENVIRONMENT par le nom de l'environnement.
  • LOCATION par la région où se trouve l'environnement.

L'exemple suivant montre comment utiliser des modèles Jinja avec KubernetesPodOperator :

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Utiliser les secrets et les ConfigMaps Kubernetes

Un secret Kubernetes est un objet qui contient des données sensibles. Un ConfigMap Kubernetes est un objet qui contient des données non confidentielles sous forme de paires clé-valeur.

Dans Cloud Composer 2, vous pouvez créer des Secrets et des ConfigMaps avec Google Cloud CLI, API ou Terraform, puis y accéder à partir KubernetesPodOperator.

À propos des fichiers de configuration YAML

Lorsque vous créez un Secret Kubernetes ou un ConfigMap à l'aide de Google Cloud CLI et vous devez fournir un fichier au format YAML. Ce fichier doit suivre le même utilisé par les Secrets et les ConfigMaps de Kubernetes. La documentation Kubernetes fournit de nombreux exemples de code de ConfigMaps et de secrets. Pour commencer, vous pouvez consultez les Distribuer les identifiants de manière sécurisée à l'aide de secrets et ConfigMaps.

Comme pour les secrets Kubernetes, utilisez l'en-tête base64 lorsque vous définissez des valeurs dans les secrets.

Pour encoder une valeur, vous pouvez utiliser la commande suivante (il s'agit de l'une des nombreuses façons pour obtenir une valeur encodée en base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Sortie :

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Les deux exemples de fichiers YAML suivants sont utilisés dans les exemples présentés plus loin dans ce guide. Exemple de fichier de configuration YAML pour un secret Kubernetes :

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Autre exemple montrant comment inclure des fichiers. Comme dans l'exemple précédent, commencez par encoder le contenu d'un fichier (cat ./key.json | base64), puis fournissez cette valeur dans le fichier YAML :

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Exemple de fichier de configuration YAML pour un ConfigMap. Vous n'avez pas besoin d'utiliser la représentation base64 dans les ConfigMaps :

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Gérer les secrets Kubernetes

Dans Cloud Composer 2, vous créez des secrets à l'aide de Google Cloud CLI et de kubectl:

  1. Obtenez des informations sur le cluster de votre environnement:

    1. Exécutez la commande suivante :

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Remplacez :

      • ENVIRONMENT par le nom de votre environnement.
      • LOCATION par la région où se trouve l'environnement Cloud Composer.

      Le résultat de cette commande utilise le format suivant : projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Pour obtenir l'ID du cluster GKE, copiez le résultat après /clusters/ (se termine par -gke).

    3. Pour obtenir la zone, copiez le résultat après /zones/.

  2. Connectez-vous à votre cluster GKE avec le code suivant : :

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Remplacez :

    • CLUSTER_ID : ID de cluster de l'environnement.
    • PROJECT_ID : ID du projet.
    • ZONE par la zone où se trouve le cluster de l'environnement.
  3. Créez des secrets Kubernetes :

    Les commandes suivantes illustrent deux approches différentes pour créer Secrets Kubernetes. L'approche --from-literal utilise des paires clé-valeur. L'approche --from-file utilise le contenu des fichiers.

    • Pour créer un secret Kubernetes en fournissant des paires clé/valeur, exécutez la commande la commande suivante. Cet exemple crée un secret nommé airflow-secrets comportant un champ sql_alchemy_conn avec la valeur de test_value

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Pour créer un secret Kubernetes en fournissant le contenu du fichier, exécutez la commande suivante. Cet exemple crée un secret nommé service-account dont le champ service-account.json est associé au valeur extraite du contenu d'un fichier ./key.json local.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Utiliser des secrets Kubernetes dans vos DAG

Cet exemple présente deux façons d'utiliser les secrets Kubernetes : en tant que variable d'environnement et en tant que volume installé par le pod.

Le premier secret, airflow-secrets, est défini à une variable d'environnement Kubernetes nommée SQL_CONN (par opposition à (variable d'environnement Airflow ou Cloud Composer).

Le second secret, service-account, installe service-account.json, un fichier contenant un jeton de compte de service, dans /var/secrets/google.

Voici à quoi ressemblent les objets Secret:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Le nom du premier secret Kubernetes est défini dans la variable secret_env. Ce secret est nommé airflow-secrets. Le paramètre deploy_type spécifie qu'il doit être exposé en tant que variable d'environnement. La variable d'environnement est SQL_CONN, comme spécifié dans le paramètre deploy_target. Enfin, la fonction la valeur de la variable d'environnement SQL_CONN est définie sur la valeur de la sql_alchemy_conn.

Le nom du deuxième secret Kubernetes est défini dans le fichier secret_volume. . Ce secret est nommé service-account. Il est exposé comme le volume, comme spécifié dans le paramètre deploy_type. Le chemin d'accès au fichier à installer, deploy_target, est /var/secrets/google. Enfin, la clé (key) du secret stocké dans deploy_target est service-account.json.

Voici à quoi ressemble la configuration de l'opérateur :

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Informations sur le fournisseur CNCF Kubernetes

KubernetesPodOperator est implémenté dans Fournisseur apache-airflow-providers-cncf-kubernetes.

Pour obtenir des notes de version détaillées sur le fournisseur Kubernetes CNCF, consultez Site Web CNCF du fournisseur Kubernetes

Version 6.0.0

Dans la version 6.0.0 du package fournisseur Kubernetes CNCF, la connexion kubernetes_default est utilisée par défaut dans KubernetesPodOperator.

Si vous avez spécifié une connexion personnalisée dans la version 5.0.0, cette connexion personnalisée est toujours utilisée par l'opérateur. Pour revenir à l'utilisation de kubernetes_default vous pouvez ajuster vos DAG en conséquence.

Version 5.0.0

Cette version introduit quelques modifications incompatibles avec les versions antérieures par rapport à la version 4.4.0. Les plus importantes concernent la connexion kubernetes_default, qui n'est pas utilisée dans la version 5.0.0.

  • La connexion kubernetes_default doit être modifiée. Le chemin d'accès à la configuration Kubernetes doit être défini sur /home/airflow/composer_kube_config (comme illustré dans la figure suivante). À la place, config_file doit à la configuration de KubernetesPodOperator (comme indiqué dans l'exemple de code suivant).
Champ &quot;Chemin de configuration Kube&quot; dans l&#39;UI d&#39;Airflow
Figure 1. Interface utilisateur d'Airflow, modification de la connexion kubernetes_default (cliquez pour agrandir)
  • Modifiez le code d'une tâche à l'aide de KubernetesPodOperator comme suit :
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Pour en savoir plus sur la version 5.0.0, consultez Notes de version du fournisseur Kubernetes CNCF

Dépannage

Cette section fournit des conseils pour résoudre les problèmes courants liés à KubernetesPodOperator problèmes:

Voir les journaux

Pour résoudre les problèmes, vous pouvez consulter les journaux dans l'ordre suivant :

  1. Journaux de tâches Airflow :

    1. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    3. Accédez à l'onglet DAG.

    4. Cliquez sur le nom du DAG, puis sur l'exécution du DAG pour afficher les détails et les journaux.

  2. Journaux du programmeur Airflow :

    1. Accédez à la page Détails de l'environnement.

    2. Accédez à l'onglet Journaux.

    3. Inspectez les journaux du planificateur Airflow.

  3. Journaux des pods dans la console Google Cloud, sous les charges de travail GKE. Ces journaux incluent le fichier YAML de définition des pods, les événements de pod et Détails du pod

Codes de retour non nuls

Lorsque vous utilisez KubernetesPodOperator (et GKEStartPodOperator), le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.

Un modèle courant consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.

Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.

Délais d'inactivité des pods

Le délai avant expiration par défaut pour KubernetesPodOperator est de 120 secondes, ce qui peut entraîner des délais d'inactivité avant le téléchargement d'images plus volumineuses. Vous pouvez augmenter le délai avant expiration en modifiant le paramètre startup_timeout_seconds lorsque vous devez créer l'opérateur KubernetesPodOperator.

Lorsqu'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur d'Airflow. Exemple :

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Les délais d'inactivité des pods peuvent également se produire Compte de service Cloud Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche à votre main. Pour le vérifier, examinez les erreurs au niveau du pod à l'aide de la méthode Tableaux de bord GKE pour examiner les journaux de vos une charge de travail spécifique, ou encore utiliser Cloud Logging.

Échec de l'établissement d'une nouvelle connexion

La mise à niveau automatique est activée par défaut dans les clusters GKE. Si un pool de nœuds se trouve dans un cluster en cours de mise à niveau, le message d'erreur suivant peut s'afficher :

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Pour vérifier si votre cluster effectue une mise à niveau, accédez à la page Clusters Kubernetes dans la console Google Cloud, puis recherchez l'icône de chargement à côté du nom du cluster de votre environnement.

Étape suivante