Utiliser KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment utiliser KubernetesPodOperator pour déployer Pods Kubernetes depuis Cloud Composer vers Google Kubernetes Engine cluster faisant partie de votre environnement Cloud Composer.

KubernetesPodOperator lance Pods Kubernetes dans le cluster de votre environnement. En comparaison, Les opérateurs Google Kubernetes Engine exécutent les pods Kubernetes dans un qui peut être un cluster distinct, sans lien avec votre environnement. Vous pouvez aussi créer et supprimer des clusters Opérateurs Google Kubernetes Engine.

KubernetesPodOperator est une bonne option si vous avez besoin des éléments suivants:

  • de dépendances Python personnalisées qui ne sont pas disponibles via le dépôt PyPI public ;
  • de dépendances binaires qui ne sont pas disponibles dans l'image de nœud de calcul Cloud Composer issue de la banque d'images.

Avant de commencer

Consultez la liste suivante des différences entre KubernetesPodOperator dans Cloud Composer 3 et Cloud Composer 2, et assurez-vous que vos DAG sont compatibles :

  • Il n'est pas possible de créer d'espaces de noms personnalisés dans Cloud Composer 3. Pods toujours s'exécuter dans l'espace de noms composer-user-workloads, même si une autre est spécifié. Les pods de cet espace de noms ont accès aux ressources du projet et au réseau VPC (si cette option est activée) configuration.

  • Les Secrets et les ConfigMaps de Kubernetes ne peuvent pas être créés à l'aide de l'API Kubernetes. À la place, Cloud Composer fournit des commandes Google Cloud CLI, Ressources Terraform et API Cloud Composer pour gérer les secrets Kubernetes et aux ConfigMaps. Pour en savoir plus, consultez la section Utiliser des secrets et des ConfigMaps Kubernetes.

  • Comme dans Cloud Composer 2, la configuration de l'affinité de pod n'est pas disponible. Si vous souhaitez utiliser l'affinité de pod, utilisez plutôt les opérateurs GKE pour lancer des pods dans un autre cluster.

À propos de KubernetesPodOperator dans Cloud Composer 3

Cette section décrit le fonctionnement de KubernetesPodOperator dans Cloud Composer 3.

Utilisation des ressources

Dans Cloud Composer 3, le cluster de votre environnement effectue un scaling automatique. Les charges de travail supplémentaires que vous exécutez à l'aide de KubernetesPodOperator, indépendamment de votre environnement. Votre environnement n'est pas affecté par l'augmentation de la demande de ressources, mais son cluster peut évoluer à la hausse en fonction de la demande de ressources.

Les tarifs des charges de travail supplémentaires que vous exécutez dans le cluster de votre environnement suit le modèle de tarification de Cloud Composer 3 et utilise SKU Cloud Composer 3.

Cloud Composer 3 utilise des clusters Autopilot, qui introduisent la notion des classes de calcul:

  • Cloud Composer n'est compatible qu'avec la classe de calcul general-purpose.

  • Par défaut, si aucune classe n'est sélectionnée, la classe general-purpose est défini lorsque vous créez des pods à l'aide de KubernetesPodOperator.

  • Chaque classe est associée à des propriétés et limites de ressources spécifiques, Pour en savoir plus à leur sujet, consultez Documentation Autopilot. Pour Par exemple, les pods qui s'exécutent dans la classe general-purpose peuvent utiliser Gio de mémoire.

Accès aux ressources du projet

Dans Cloud Composer 3, le cluster de votre environnement se trouve dans le projet locataire, les pods sont exécutés dans le dans un espace de noms isolé.

Dans Cloud Composer 3, les pods s'exécutent toujours dans l'espace de noms composer-user-workloads, même si un autre espace de noms est spécifié. Les pods de ce namespace peuvent accéder aux ressources Google Cloud de votre projet et à votre réseau VPC (s'il est activé) sans configuration supplémentaire. Le compte de service de votre environnement est utilisé pour accéder à ces ressources. Il n'est pas possible de spécifier un autre compte de service.

Configuration minimale

Pour créer un opérateur KubernetesPodOperator, seuls les paramètres name, image à utiliser et task_id du pod sont obligatoires. /home/airflow/composer_kube_config qui contient des identifiants permettant de s'authentifier auprès de GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuration supplémentaire

Cet exemple présente des paramètres supplémentaires que vous pouvez configurer dans KubernetesPodOperator.

Pour en savoir plus sur les paramètres, consultez la Référence Airflow pour KubernetesPodOperator. Pour en savoir plus sur l'utilisation des secrets et des ConfigMaps Kubernetes, consultez Utiliser des secrets et des ConfigMaps Kubernetes. Pour sur l'utilisation de modèles Jinja avec KubernetesPodOperator, consultez Utilisez les modèles Jinja.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Utiliser des modèles Jinja

Airflow est compatible avec les modèles Jinja dans les DAG.

Vous devez déclarer les paramètres Airflow requis (task_id, name et image) avec l'opérateur. Comme le montre l'exemple suivant, vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds, arguments, env_vars et config_file.

Le paramètre env_vars de l'exemple est défini à partir d'une variable Airflow nommée my_value. Exemple de DAG Elle obtient sa valeur à partir de la variable de modèle vars dans Airflow. Airflow offre plus de fonctionnalités qui permettent d'accéder à différents types d'informations. Par exemple, vous pouvez utiliser la variable de modèle conf pour accéder aux valeurs des options de configuration d'Airflow. Pour en savoir plus et des variables disponibles dans Airflow, consultez Documentation de référence sur les modèles dans Airflow dans la documentation Google Cloud.

Sans modifier le DAG ni créer la variable env_vars, la tâche ex-kube-templates de l'exemple échoue, car la variable n'existe pas. Créez cette variable dans l'interface utilisateur d'Airflow ou avec Google Cloud CLI :

Interface utilisateur d'Airflow

  1. Accédez à l'interface utilisateur d'Airflow.

  2. Dans la barre d'outils, sélectionnez Admin > Variables.

  3. Sur la page List Variable (Variable de liste), cliquez sur Add a new record (Ajouter un enregistrement).

  4. Sur la page Add Variable (Ajouter une variable), saisissez les informations suivantes :

    • Key (Clé) : my_value
    • Val (Valeur) : example_value
  5. Cliquez sur Enregistrer.

gcloud

Saisissez la commande suivante :

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Remplacez :

  • ENVIRONMENT par le nom de l'environnement.
  • LOCATION par la région où se trouve l'environnement.

L'exemple suivant montre comment utiliser des modèles Jinja avec KubernetesPodOperator :

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Utiliser les secrets et les ConfigMaps Kubernetes

Un secret Kubernetes est un objet qui contient des données sensibles. Un cluster Kubernetes ConfigMap est un objet qui contient des données non confidentielles dans des paires clé-valeur.

Dans Cloud Composer 3, vous pouvez créer des secrets et des ConfigMaps à l'aide de la Google Cloud CLI, de l'API ou de Terraform, puis y accéder depuis KubernetesPodOperator :

  • Avec la Google Cloud CLI et l'API, vous devez fournir un fichier de configuration YAML.
  • Avec Terraform, vous définissez les secrets et les ConfigMaps en tant que ressources distinctes dans les fichiers de configuration Terraform.

À propos des fichiers de configuration YAML

Lorsque vous créez un Secret Kubernetes ou un ConfigMap à l'aide de Google Cloud CLI et vous devez fournir un fichier au format YAML. Ce fichier doit suivre le même utilisé par les Secrets et les ConfigMaps de Kubernetes. La documentation Kubernetes fournit de nombreux exemples de code de ConfigMaps et de secrets. Pour commencer, vous pouvez consultez les Distribuer les identifiants de manière sécurisée à l'aide de secrets et ConfigMaps.

Comme pour les secrets Kubernetes, utilisez l'en-tête base64 lorsque vous définissez des valeurs dans les secrets.

Pour encoder une valeur, vous pouvez utiliser la commande suivante (il s'agit de l'une des nombreuses façons d'obtenir une valeur encodée en base64) :

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Sortie :

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Les deux exemples de fichiers YAML suivants sont utilisés dans les exemples présentés plus loin dans ce guide. Exemple de fichier de configuration YAML pour un secret Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Autre exemple montrant comment inclure des fichiers. Comme dans l'exemple précédent, commencez par encoder le contenu d'un fichier (cat ./key.json | base64), puis fournissez cette valeur dans le fichier YAML :

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Exemple de fichier de configuration YAML pour un ConfigMap. Vous n'avez pas besoin d'utiliser le code représentation dans les ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Gérer les secrets Kubernetes

gcloud

Créer un secret

Pour créer un secret Kubernetes, exécutez la commande suivante:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.
  • SECRET_FILE : chemin d'accès à un fichier YAML local contenant la configuration du secret.

Exemple :

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Mettre à jour un secret

Pour mettre à jour un secret Kubernetes, exécutez la commande suivante. Le nom du secret sera extrait du fichier YAML spécifié et son contenu sera remplacé.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION : région où se trouve l'environnement.
  • SECRET_FILE : chemin d'accès à un fichier YAML local contenant la configuration du secret. Indiquez le nom du secret dans le champ metadata > name dans ce fichier.

Répertorier les secrets

Pour obtenir la liste des secrets et de leurs champs pour un environnement, exécutez la commande suivante. Les valeurs clés de la sortie seront remplacées par les astérisques.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION : région où se trouve l'environnement.

Obtenir les détails du secret

Pour obtenir des informations détaillées sur un secret, exécutez la commande suivante. Les valeurs de clé dans la sortie seront remplacées par des astérisques.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Remplacez les éléments suivants :

  • SECRET_NAME : nom du secret, tel qu'il a été défini dans le champ name metadata> du fichier YAML avec la configuration du secret.
  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.

Supprimer un secret

Pour supprimer un secret, exécutez la commande suivante:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: nom du secret, tel qu'il a été défini dans metadata > name du fichier YAML avec le code secret configuration.
  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.

API

Créer un secret

  1. Créez une requête API environments.userWorkloadsSecrets.create.

  2. Dans cette requête :

    1. Dans le corps de la requête, dans le champ name, spécifiez l'URI de la un nouveau secret.
    2. Dans le corps de la requête, dans le champ data, spécifiez les clés et les valeurs encodées en base64 pour le secret.

Exemple :

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Mettre à jour un secret

  1. Créez un API environments.userWorkloadsSecrets.update requête.

  2. Dans cette requête :

    1. Dans le corps de la requête, dans le champ name, spécifiez l'URI du secret.
    2. Dans le corps de la requête, dans le champ data, spécifiez des clés et encodées en base64 pour le secret. Les valeurs seront remplacées.

Exemple :

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

List Secrets

Créer une API environments.userWorkloadsSecrets.list requête. Les valeurs des clés dans le résultat seront remplacées par des astérisques. Il est possible d'utiliser la pagination avec cette requête, reportez-vous à la documentation de référence de la requête plus de détails.

Exemple :

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Obtenir les détails du secret

Créer une API environments.userWorkloadsSecrets.get requête. Les valeurs de clé dans la sortie seront remplacées par des astérisques.

Exemple :

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Supprimer un secret

Créez un API environments.userWorkloadsSecrets.delete requête.

Exemple :

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

La ressource google_composer_user_workloads_secret définit un secret Kubernetes, avec des clés et des valeurs définies dans le bloc data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME : nom de la ressource de l'environnement, qui contient la définition de l'environnement dans Terraform. Le nom de l'environnement réel est également spécifié dans cette ressource.
  • LOCATION: région dans laquelle se trouve l'environnement.
  • SECRET_NAME: nom du secret.
  • KEY_NAME : une ou plusieurs clés pour ce secret.
  • KEY_VALUE : valeur de la clé encodée en base64. Vous pouvez utiliser base64encode pour encoder la valeur (voir l'exemple).

Les deux exemples de secrets Kubernetes suivants sont utilisés dans des exemples plus loin dans ce guide.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Autre exemple montrant comment inclure des fichiers. Vous pouvez utiliser la fonction file pour lire le contenu du fichier en tant que chaîne, puis l'encoder en base64 :

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Utiliser des secrets Kubernetes dans vos DAG

Cet exemple présente deux façons d'utiliser les secrets Kubernetes : en tant que variable d'environnement et en tant que volume installé par le pod.

Le premier secret, airflow-secrets, est défini à une variable d'environnement Kubernetes nommée SQL_CONN (par opposition à (variable d'environnement Airflow ou Cloud Composer).

Le second secret, service-account, installe service-account.json, un fichier contenant un jeton de compte de service, dans /var/secrets/google.

Voici à quoi ressemblent les objets Secret:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Le nom du premier secret Kubernetes est défini dans la variable secret_env. Ce secret est nommé airflow-secrets. Le paramètre deploy_type spécifie qu'il doit être exposé en tant que variable d'environnement. La variable d'environnement est SQL_CONN, comme spécifié dans le paramètre deploy_target. Enfin, la valeur de la variable d'environnement SQL_CONN est définie sur la valeur de la clé sql_alchemy_conn.

Le nom du deuxième secret Kubernetes est défini dans le fichier secret_volume. . Ce secret est nommé service-account. Il est exposé en tant que volume, comme spécifié dans le paramètre deploy_type. Le chemin d'accès du fichier vers "mount", deploy_target, est /var/secrets/google. Enfin, le key du Le secret stocké dans deploy_target est service-account.json.

Voici à quoi ressemble la configuration de l'opérateur :

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Gérer les ConfigMaps Kubernetes

gcloud

Créer un ConfigMap

Pour créer un ConfigMap, exécutez la commande suivante :

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION : région où se trouve l'environnement.
  • CONFIG_MAP_FILE: chemin d'accès à un fichier YAML local contenant l'objet ConfigMap configuration.

Exemple :

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

Mettre à jour un ConfigMap

Pour mettre à jour un ConfigMap, exécutez la commande suivante. Le nom des ConfigMaps sera extrait du fichier YAML spécifié et le contenu des ConfigMaps sera remplacé.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION : région où se trouve l'environnement.
  • CONFIG_MAP_FILE : chemin d'accès à un fichier YAML local contenant la configuration du ConfigMap. Spécifiez le nom du ConfigMap dans le fichier metadata > name dans ce fichier.

Lister les ConfigMaps

Pour obtenir la liste des ConfigMaps et de leurs champs pour un environnement, exécutez la commande suivante. Les valeurs de clé de la sortie sont affichées telles quelles.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.

Obtenir les détails d'un ConfigMap

Pour obtenir des informations détaillées sur un ConfigMap, exécutez la commande suivante. Clé les valeurs de sortie seront affichées telles quelles.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Remplacez les éléments suivants :

  • CONFIG_MAP_NAME : nom du ConfigMap, tel qu'il a été défini dans le champ metadata > name du fichier YAML avec la configuration du ConfigMap.
  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.

Supprimer un ConfigMap

Pour supprimer un ConfigMap, exécutez la commande suivante :

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME : nom du ConfigMap, tel qu'il a été défini dans le champ metadata > name du fichier YAML avec la configuration du ConfigMap.
  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.

API

Créer un ConfigMap

  1. Créez une requête API environments.userWorkloadsConfigMaps.create.

  2. Dans cette requête :

    1. Dans le corps de la requête, dans le champ name, spécifiez l'URI du nouveau ConfigMap.
    2. Dans le corps de la requête, dans le champ data, spécifiez des clés et pour le ConfigMap.

Exemple :

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

Mettre à jour un ConfigMap

  1. Créez un environments.userWorkloadsConfigMaps.update Requête API.

  2. Dans cette requête :

    1. Dans le corps de la requête, dans le champ name, spécifiez l'URI du ConfigMap.
    2. Dans le corps de la requête, dans le champ data, spécifiez des clés et pour le ConfigMap. Les valeurs seront remplacées.

Exemple :

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

Lister les ConfigMaps

Créez une requête API environments.userWorkloadsConfigMaps.list. Les valeurs clés de la sortie seront affichées telles quelles. Il est possible d'utiliser la pagination avec cette requête, reportez-vous à la documentation de référence de la requête plus de détails.

Exemple :

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Obtenir les détails d'un ConfigMap

Créez un API environments.userWorkloadsConfigMaps.get requête. Les valeurs de clé de la sortie sont affichées telles quelles.

Exemple :

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Supprimer un ConfigMap

Créez une requête API environments.userWorkloadsConfigMaps.delete.

Exemple :

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

La google_composer_user_workloads_config_map ressource définit un ConfigMap, avec des clés et des valeurs définies dans le Bloc data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME : nom de la ressource de l'environnement, qui contient la définition de l'environnement dans Terraform. Le nom de l'environnement réel est également spécifié dans cette ressource.
  • LOCATION: région dans laquelle se trouve l'environnement.
  • CONFIG_MAP_NAME : nom du ConfigMap.
  • KEY_NAME: une ou plusieurs clés pour ce ConfigMap.
  • KEY_VALUE : valeur de la clé.

Exemple :

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

Utiliser des ConfigMaps dans vos DAG

Cet exemple montre comment utiliser des ConfigMaps dans vos DAG.

Dans l'exemple suivant, un ConfigMap est transmis dans le paramètre configmaps. Toutes les clés de ce ConfigMap sont disponibles en tant que variables d'environnement:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

L'exemple suivant montre comment installer un ConfigMap en tant que volume:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Informations sur le fournisseur Kubernetes CNCF

KubernetesPodOperator est implémenté dans Fournisseur apache-airflow-providers-cncf-kubernetes.

Pour obtenir des notes de version détaillées sur le fournisseur Kubernetes CNCF, consultez le site Web du fournisseur Kubernetes CNCF.

Dépannage

Cette section fournit des conseils pour résoudre les problèmes courants liés à KubernetesPodOperator problèmes:

Voir les journaux

Lorsque vous résolvez des problèmes, vous pouvez consulter les journaux dans l'ordre suivant:

  1. Journaux de tâches Airflow :

    1. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    3. Accédez à l'onglet DAG.

    4. Cliquez sur le nom du DAG, puis sur l'exécution du DAG pour afficher les détails et les journaux.

  2. Journaux du programmeur Airflow :

    1. Accédez à la page Détails de l'environnement.

    2. Accédez à l'onglet Journaux.

    3. Inspectez les journaux du planificateur Airflow.

  3. Journaux des charges de travail de l'utilisateur :

    1. Accédez à la page Détails de l'environnement.

    2. Accédez à l'onglet Surveillance.

    3. Sélectionnez Charges de travail de l'utilisateur.

    4. Examinez la liste des charges de travail exécutées. Vous pouvez afficher les journaux des informations sur l'utilisation des ressources pour chaque charge de travail.

Codes de retour non nuls

Lorsque vous utilisez KubernetesPodOperator (et GKEStartPodOperator), le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.

Un schéma courant consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.

Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.

Délais d'inactivité des pods

Le délai avant expiration par défaut pour KubernetesPodOperator est de 120 secondes, ce qui peut entraîner des délais d'inactivité avant le téléchargement d'images plus volumineuses. Vous pouvez augmenter le délai avant expiration en modifiant le paramètre startup_timeout_seconds lorsque vous devez créer l'opérateur KubernetesPodOperator.

Lorsqu'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur d'Airflow. Exemple :

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Les délais d'inactivité des pods peuvent également se produire Compte de service Cloud Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche à votre main. Pour vérifier cela, examinez les erreurs au niveau du pod à l'aide des tableaux de bord GKE, afin de consulter les journaux de votre charge de travail en question, ou utilisez Cloud Logging.

Étape suivante