Utiliser KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment utiliser KubernetesPodOperator pour déployer des pods Kubernetes à partir de Cloud Composer dans le cluster Google Kubernetes Engine faisant partie de votre environnement Cloud Composer, et pour vous assurer que votre environnement dispose des ressources appropriées.

KubernetesPodOperator lance les pods Kubernetes dans le cluster de votre environnement. Les opérateurs Google Kubernetes Engine, quant à eux, exécutent des pods Kubernetes dans un cluster spécifié, qui peut être un cluster distinct et non lié à votre environnement. Vous pouvez également créer et supprimer des clusters à l'aide d'opérateurs Google Kubernetes Engine.

L'utilisation de KubernetesPodOperator est recommandée si vous avez besoin :

  • de dépendances Python personnalisées qui ne sont pas disponibles via le dépôt PyPI public ;
  • de dépendances binaires qui ne sont pas disponibles dans l'image de nœud de calcul Cloud Composer issue de la banque d'images.

Cette page présente un exemple de DAG Airflow incluant les configurations KubernetesPodOperator suivantes:

Avant de commencer

  • Dans Cloud Composer 2, le cluster de votre environnement évolue automatiquement. Les charges de travail supplémentaires que vous exécutez à l'aide de KubernetesPodOperator évoluent indépendamment de votre environnement. Votre environnement n'est pas affecté par l'augmentation de la demande de ressources, mais son cluster peut évoluer à la hausse en fonction de la demande de ressources. La tarification des charges de travail supplémentaires que vous exécutez dans le cluster de votre environnement suit le modèle de tarification de Cloud Composer 2 et utilise les codes SKU Cloud Composer Compute.

  • Les clusters Cloud Composer 2 utilisent Workload Identity. Par défaut, les pods qui s'exécutent dans un espace de noms nouvellement créé ou dans l'espace de noms composer-user-workloads ne peuvent pas accéder aux ressources Google Cloud. Lorsque vous utilisez Workload Identity, les comptes de service Kubernetes associés aux espaces de noms doivent être mappés à des comptes de service Google Cloud afin d'activer l'autorisation d'identité de service pour les requêtes adressées aux API Google et à d'autres services.

    De ce fait, si vous exécutez des pods dans l'espace de noms composer-user-workloads ou un nouvel espace de noms dans le cluster de votre environnement, les liaisons IAM appropriées entre les comptes de service Kubernetes et Google Cloud ne sont pas créées, et ces pods ne peuvent pas accéder aux ressources de votre projet Google Cloud.

    Si vous souhaitez que vos pods aient accès aux ressources Google Cloud, utilisez l'espace de noms composer-user-workloads ou créez votre propre espace de noms, comme décrit plus loin.

    Pour fournir un accès aux ressources de votre projet, suivez les instructions dans Workload Identity et configurez les liaisons:

    1. Créez un espace de noms distinct dans le cluster de votre environnement.
    2. Créez une liaison entre le compte de service Kubernetes composer-user-workloads/<namespace_name> et le compte de service de votre environnement.
    3. Ajoutez l'annotation de compte de service de votre environnement au compte de service Kubernetes.
    4. Lorsque vous utilisez KubernetesPodOperator, spécifiez l'espace de noms et le compte de service Kubernetes dans les paramètres namespace et service_account_name.
  • Si vous utilisez la version 5.0.0 du fournisseur CNCF Kubernetes, suivez les instructions de la section CNCF Kubernetes Provider.

  • Cloud Composer 2 utilise des clusters GKE avec Workload Identity. Le serveur de métadonnées GKE met quelques secondes à commencer à accepter les requêtes sur un pod nouvellement créé. Par conséquent, les tentatives d'authentification à l'aide de Workload Identity dans les premières secondes de la vie d'un pod peuvent échouer. Pour en savoir plus sur cette limitation, cliquez ici.

  • Cloud Composer 2 utilise des clusters Autopilot qui introduisent la notion de classes de calcul. Par défaut, si aucune classe n'est sélectionnée, la classe general-purpose est utilisée lorsque vous créez des pods à l'aide de KubernetesPodOperator.

    • Chaque classe est associée à des propriétés et des limites de ressources spécifiques. Pour en savoir plus, consultez la documentation Autopilot. Par exemple, les pods exécutés dans la classe general-purpose peuvent utiliser jusqu'à 110 Gio de mémoire.

Configuration de KubernetesPodOperator

Pour suivre cet exemple, placez l'intégralité du fichier kubernetes_pod_operator.py dans le dossier dags/ de votre environnement ou ajoutez le code pertinent KubernetesPodOperator à un DAG.

Les sections suivantes décrivent chacune des configurations de KubernetesPodOperator de l'exemple. Pour plus d'informations sur chaque variable de configuration, consultez la documentation de référence Airflow.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Configuration minimale

Pour créer un KubernetesPodOperator, seuls les éléments name du pod, namespace où exécuter le pod, image et task_id sont requis.

Lorsque vous placez l'extrait de code suivant dans un DAG, la configuration utilise les valeurs par défaut dans /home/airflow/composer_kube_config. Il n'est pas nécessaire de modifier le code pour que la tâche pod-ex-minimum réussisse.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuration du modèle

Airflow est compatible avec la modélisation Jinja. Vous devez déclarer les variables requises (task_id, name, namespace, image) avec l'opérateur. Comme le montre l'exemple suivant, vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds, arguments, env_vars et config_file.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Sans modification du DAG ou de votre environnement, la tâche ex-kube-templates échoue en raison de deux erreurs. Les journaux indiquent que cette tâche échoue car la variable appropriée n'existe pas (my_value). La deuxième erreur, que vous pouvez obtenir après avoir corrigé la première erreur, indique que la tâche échoue car core/kube_config est introuvable dans config.

Pour corriger ces deux erreurs, suivez les étapes décrites plus loin.

Pour définir my_value avec gcloud ou l'interface utilisateur d'Airflow, procédez comme suit :

Interface utilisateur d'Airflow

Dans l'interface utilisateur d'Airflow 2:

  1. Accédez à l'interface utilisateur d'Airflow.

  2. Dans la barre d'outils, sélectionnez Admin > Variables (Administration > Variables).

  3. Sur la page List Variable (Variable de liste), cliquez sur Add a new record (Ajouter un enregistrement).

  4. Sur la page Add Variable (Ajouter une variable), saisissez les informations suivantes :

    • Key (Clé) : my_value
    • Val (Valeur) : example_value
  5. Cliquez sur Enregistrer.

gcloud

Pour Airflow 2, saisissez la commande suivante :

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Remplacez :

  • ENVIRONMENT par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.

Pour faire référence à un config_file personnalisé (fichier de configuration Kubernetes), remplacez l'option de configuration Airflow kube_config par une configuration Kubernetes valide:

Section Clé Valeur
core kube_config /home/airflow/composer_kube_config

Patientez quelques minutes pendant la mise à jour de votre environnement. Exécutez ensuite à nouveau la tâche ex-kube-templates et vérifiez que la tâche ex-kube-templates réussit.

Configuration des variables de secret

Un secret Kubernetes est un objet qui contient des données sensibles. Vous pouvez transmettre des secrets aux pods Kubernetes à l'aide de KubernetesPodOperator. Les secrets doivent être définis dans Kubernetes, faute de quoi le lancement du pod est impossible.

Cet exemple présente deux façons d'utiliser les secrets Kubernetes: en tant que variable d'environnement et en tant que volume installé par le pod.

Le premier secret, airflow-secrets, est défini sur une variable d'environnement Kubernetes nommée SQL_CONN (par opposition à une variable d'environnement Airflow ou Cloud Composer).

Le second secret, service-account, installe service-account.json, un fichier contenant un jeton de compte de service, dans /var/secrets/google.

Voici à quoi ressemblent ces secrets :

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Le nom du premier secret Kubernetes est défini dans la variable secret. Ce secret particulier est nommé airflow-secrets. Il est exposé en tant que variable d'environnement, comme indiqué par deploy_type. La variable d'environnement sur laquelle il est défini, deploy_target, est SQL_CONN. Enfin, la clé (key) du secret stocké dans deploy_target est sql_alchemy_conn.

Le nom du second secret Kubernetes est défini dans la variable secret. Ce secret particulier est nommé service-account. Il est exposé en tant que volume, comme indiqué par deploy_type. Le chemin d'accès au fichier à installer, deploy_target, est /var/secrets/google. Enfin, la clé (key) du secret stocké dans deploy_target est service-account.json.

La configuration de l'opérateur se présente comme suit :

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Sans modification du DAG ou de votre environnement, la tâche ex-kube-secrets échoue. Si vous consultez les journaux, la tâche échoue en raison d'une erreur Pod took too long to start. Cette erreur se produit car Airflow ne trouve pas le secret spécifié dans la configuration, secret_env.

gcloud

Pour définir le secret à l'aide de gcloud, procédez comme suit :

  1. Obtenez des informations sur le cluster de votre environnement Cloud Composer.

    1. Exécutez la commande ci-dessous.

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Remplacez :

      • ENVIRONMENT par le nom de votre environnement.
      • LOCATION par la région où se trouve l'environnement Cloud Composer.

      Le résultat de cette commande utilise le format suivant : projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>.

    2. Pour obtenir l'ID du cluster GKE, copiez le résultat après /clusters/ (se termine par -gke).

  2. Connectez-vous à votre cluster GKE en exécutant la commande suivante :

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    Remplacez :

    • CLUSTER_ID par votre ID de cluster GKE.
    • PROJECT par l'ID de votre projet Google Cloud.
    • LOCATION par la région où se trouve l'environnement Cloud Composer.

  3. Créer des secrets Kubernetes

    1. Créez un secret Kubernetes qui définit la valeur de sql_alchemy_conn sur test_value en exécutant la commande suivante :

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    2. Créez un secret Kubernetes qui définit la valeur de service-account.json sur le chemin d'accès local d'un fichier de clé de compte de service appelé key.json en exécutant la commande suivante :

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

  4. Après avoir défini les secrets, exécutez à nouveau la tâche ex-kube-secrets dans l'interface utilisateur d'Airflow.

  5. Vérifiez que la tâche ex-kube-secrets réussit.

Configuration complète

Cet exemple présente toutes les variables que vous pouvez configurer dans KubernetesPodOperator. Il n'est pas nécessaire de modifier le code pour que la tâche ex-all-configs réussisse.

Pour en savoir plus sur chaque variable, consultez la documentation de référence Airflow au sujet de KubernetesPodOperator.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Informations sur le fournisseur CNCF Kubernetes

GKEStartPodOperator et KubernetesPodOperator sont mis en œuvre dans le fournisseur apache-airflow-providers-cncf-kubernetes.

Pour consulter les notes de version relatives aux échecs du fournisseur Kubernetes CNCF, reportez-vous au site Web du fournisseur CNCF Kubernetes.

Version 6.0.0

Dans la version 6.0.0 du package de fournisseur Kubernetes CNCF, la connexion kubernetes_default est utilisée par défaut dans le KubernetesPodOperator.

Si vous avez spécifié une connexion personnalisée dans la version 5.0.0, cette connexion personnalisée est toujours utilisée par l'opérateur. Pour revenir à l'utilisation de la connexion kubernetes_default, vous pouvez ajuster vos DAG en conséquence.

Version 5.0.0

Cette version introduit quelques modifications incompatibles avec les versions antérieures par rapport à la version 4.4.0. Les plus importants, que vous devez connaître, sont liés à la connexion kubernetes_default, qui n'est pas utilisée dans la version 5.0.0.

  • La connexion kubernetes_default doit être modifiée : le chemin de configuration Kube doit être défini sur /home/airflow/composer_kube_config (voir la figure 1) ou config_file doit être ajouté à la configuration KubernetesPodOperator (comme indiqué ci-dessous).
Champ du chemin d'accès à la configuration Kube dans l'interface utilisateur d'Airflow
Figure 1 : Interface utilisateur Airflow, modification de la connexion kubernetes_default (cliquez pour agrandir)
  • Modifiez le code d'une tâche à l'aide de KubernetesPodOperator comme suit :
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Pour en savoir plus sur la version 5.0.0, consultez les notes de version du fournisseur Kubernetes CNCF.

Dépannage

Conseils pour résoudre les problèmes liés aux pods

En plus de consulter les journaux des tâches dans l'interface utilisateur d'Airflow, vous devez également consulter les journaux suivants:

  • Sortie du programmeur et des nœuds de calcul Airflow :

    1. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Suivez le lien DAG pour votre environnement.

    3. Dans le bucket de votre environnement, augmentez un niveau.

    4. Examinez les journaux dans le dossier logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Journaux détaillés des pods dans la console Google Cloud sous les charges de travail GKE. Ces journaux incluent le fichier YAML de définition des pods, les événements des pods et les détails des pods.

Codes de retour non nuls si GKEStartPodOperator est également utilisé

Lorsque vous utilisez KubernetesPodOperator et GKEStartPodOperator, le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.

Un schéma courant lors de l'utilisation de KubernetesPodOperator et de GKEStartPodOperator consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.

Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.

Délais d'inactivité des pods

Le délai d'inactivité par défaut de KubernetesPodOperator est de 120 secondes, ce qui signifie que ce délai peut être dépassé avant que des images volumineuses ne puissent être téléchargées. Vous pouvez augmenter le délai d'inactivité en modifiant le paramètre startup_timeout_seconds lorsque vous créez l'opérateur KubernetesPodOperator.

Lorsqu'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur d'Airflow. Exemple :

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Un dépassement du délai d'inactivité d'un pod peut également se produire lorsque le compte de service Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche en question. Pour vérifier cela, examinez les erreurs au niveau du pod à l'aide des tableaux de bord GKE, afin de consulter les journaux de votre charge de travail en question, ou utilisez Cloud Logging.

Échec de l'établissement d'une nouvelle connexion

La mise à niveau automatique est activée par défaut dans les clusters GKE. Si un pool de nœuds se trouve dans un cluster en cours de mise à niveau, le message d'erreur suivant peut s'afficher :

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Pour vérifier si votre cluster est en cours de mise à niveau, dans la console Google Cloud, accédez à la page Clusters Kubernetes, puis recherchez l'icône de chargement à côté du nom du cluster de votre environnement.

Étapes suivantes