Cloud Composer 1 | Cloud Composer 2

Questa pagina descrive come utilizzare KubernetesPodOperator per eseguire il deployment dei pod Kubernetes da Cloud Composer nel cluster Google Kubernetes Engine che fa parte del tuo ambiente Cloud Composer, nonché per garantire che l'ambiente disponga delle risorse appropriate.

KubernetesPodOperator avvia i pod Kubernetes nel cluster del tuo ambiente. In confronto, gli operatori di Google Kubernetes Engine eseguono i pod Kubernetes in un cluster specificato, che può essere un cluster separato non correlato al tuo ambiente. Puoi anche creare ed eliminare i cluster utilizzando gli operatori di Google Kubernetes Engine.

KubernetesPodOperator è una buona opzione se hai bisogno di:

  • Dipendenze Python personalizzate che non sono disponibili tramite il repository PyPI pubblico.
  • Dipendenze binarie non disponibili nell'immagine worker di Cloud Composer.

Questa pagina ti guida in un esempio di DAG Airflow che include le seguenti configurazioni di KubernetesPodOperator:

Prima di iniziare

Configura le risorse dell'ambiente Cloud Composer

Quando crei un ambiente Cloud Composer, ne specifichi i parametri delle prestazioni, inclusi i parametri delle prestazioni per il cluster dell'ambiente. Il lancio di pod Kubernetes nel cluster di ambiente può causare competizioni per le risorse del cluster, come CPU o memoria. Poiché lo scheduler e i worker di Airflow si trovano nello stesso cluster GKE, gli scheduler e i worker non funzioneranno correttamente se la concorrenza determina il blocco delle risorse.

Per evitare l'ingestione di risorse, esegui una o più delle seguenti azioni:

Crea un pool di nodi

Il modo migliore per evitare il blocco di risorse nell'ambiente Cloud Composer è creare un nuovo pool di nodi e configurare i pod Kubernetes per l'esecuzione utilizzando solo le risorse di quel pool.


  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Fai clic sul nome dell'ambiente.

  3. Nella pagina Dettagli ambiente, vai alla scheda Configurazione dell'ambiente.

  4. Nella sezione Risorse > Cluster GKE, segui il link Visualizza i dettagli del cluster.

  5. Crea un pool di nodi come descritto in Aggiungere un pool di nodi.


  1. Determina il nome del cluster del tuo ambiente:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \


    • ENVIRONMENT_NAME con il nome dell'ambiente.
    • LOCATION con la regione in cui si trova l'ambiente.
  2. L'output contiene il nome del cluster del tuo ambiente. Ad esempio, può essere europe-west3-example-enviro-af810e25-gke.

  3. Crea un pool di nodi come descritto in Aggiungere un pool di nodi.

Aumenta il numero di nodi nel tuo ambiente

L'aumento del numero di nodi nel tuo ambiente Cloud Composer aumenta la potenza di calcolo disponibile per i tuoi carichi di lavoro. Questo aumento non fornisce risorse aggiuntive per le attività che richiedono più CPU o RAM rispetto a quella fornita dal tipo di macchina specificato.

Per aumentare il numero di nodi, aggiorna l'ambiente.

Specifica il tipo di macchina appropriato

Durante la creazione dell'ambiente di Cloud Composer, puoi specificare un tipo di macchina. Per garantire le risorse disponibili, specifica un tipo di macchina per il tipo di calcolo utilizzato nel tuo ambiente Cloud Composer.

Configurazione di KubernetesPodOperator

Per continuare con questo esempio, inserisci l'intero file kubernetes_pod_operator.py nella cartella dags/ del tuo ambiente o aggiungi il codice KubernetesPodOperator pertinente a un DAG.

Le sezioni seguenti spiegano ogni configurazione KubernetesPodOperator nell'esempio. Per informazioni su ogni variabile di configurazione, consulta la documentazione di riferimento di Airflow.

Airflow 2

import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = Secret(
    # Expose the secret as environment variable.
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    # Name of the Kubernetes Secret
    # Key of a secret stored in this Secret object
secret_volume = Secret(
    # Path where we mount the secret as volume
    # Name of Kubernetes Secret
    # Key in the form of service account file name
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        # Name of task you want to run, used to generate Pod ID.
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # The namespace to run within Kubernetes, default namespace is
        # `default`. In Composer 1 there is the potential for
        # the resource starvation of Airflow workers and scheduler
        # within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources,
        # and using Composer 2 will mean the environment will autoscale.
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
    kubenetes_template_ex = KubernetesPodOperator(
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    kubernetes_affinity_ex = KubernetesPodOperator(
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
            "nodeAffinity": {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [
                            "matchExpressions": [
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # 'cloud.google.com/gke-nodepool' with the value of
                                    # the nodepool's name.
                                    "key": "cloud.google.com/gke-nodepool",
                                    "operator": "In",
                                    # The label key's value that pods can be scheduled
                                    # on.
                                    "values": [
    kubernetes_full_pod = KubernetesPodOperator(
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 120.
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
            limits={"memory": "250M", "cpu": "100m"},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        # List of Volume objects to pass to the Pod.
        # List of VolumeMount objects to pass to the Pod.
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator

Airflow 1

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    # Name of the Kubernetes Secret
    # Key of a secret stored in this Secret object
secret_volume = secret.Secret(
    # Path where we mount the secret as volume
    # Name of Kubernetes Secret
    # Key in the form of service account file name

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        # Name of task you want to run, used to generate Pod ID.
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
            "nodeAffinity": {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [
                            "matchExpressions": [
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # 'cloud.google.com/gke-nodepool' with the value of
                                    # the nodepool's name.
                                    "key": "cloud.google.com/gke-nodepool",
                                    "operator": "In",
                                    # The label key's value that pods can be scheduled
                                    # on.
                                    "values": [
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 120.
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={"limit_memory": "250M", "limit_cpu": "100m"},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        # List of Volume objects to pass to the Pod.
        # List of VolumeMount objects to pass to the Pod.
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/

Configurazione minima

Per creare un KubernetesPodOperator, sono richiesti solo i campi name, namespace del pod in cui eseguire il pod, image da utilizzare e task_id.

Quando inserisci il seguente snippet di codice in un DAG, la configurazione utilizza i valori predefiniti in /home/airflow/composer_kube_config. Non è necessario modificare il codice per completare l'attività pod-ex-minimum.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    # Name of task you want to run, used to generate Pod ID.
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    # Name of task you want to run, used to generate Pod ID.
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)

Configurazione modello

Airflow supporta l'utilizzo di Jinja Templating. Devi dichiarare le variabili richieste (task_id, name, namespace e image) con l'operatore. Come mostrato nell'esempio seguente, puoi modellare tutti gli altri parametri con Jinja, tra cui cmds, arguments, env_vars e config_file.

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",

Se non modifichi il DAG o l'ambiente, l'attività ex-kube-templates non va a buon fine a causa di due errori. I log mostrano che questa attività non è riuscita perché la variabile appropriata non esiste (my_value). Il secondo errore, ottenuto dopo la correzione del primo errore, indica che l'attività non riesce perché core/kube_config non è stato trovato in config.

Per correggere entrambi gli errori, segui la procedura descritta in dettaglio.

Per impostare my_value con gcloud o la UI di Airflow:

UI di Airflow

Nell'interfaccia utente di Airflow 2:

  1. Vai all'UI di Airflow.

  2. Nella barra degli strumenti, seleziona Amministrazione > Variabili.

  3. Nella pagina Variabile elenco, fai clic su Aggiungi un nuovo record.

  4. Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:

    • Chiave:my_value
    • Valore: example_value
  5. Fai clic su Salva.

Nell'interfaccia utente di Airflow 1:

  1. Vai all'UI di Airflow.

  2. Nella barra degli strumenti, seleziona Amministrazione > Variabili.

  3. Nella pagina Variabili, fai clic sulla scheda Crea.

  4. Nella pagina Variabile, inserisci le seguenti informazioni:

    • Chiave:my_value
    • Valore: example_value
  5. Fai clic su Salva.


Per Airflow 2, inserisci il seguente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Per Airflow 1, inserisci il seguente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value


  • ENVIRONMENT con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

Per fare riferimento a un elemento config_file personalizzato (un file di configurazione di Kubernetes), esegui l'override dell'opzione di configurazione di Airflow da kube_config a una configurazione Kubernetes valida:

Sezione Chiave Valore
core kube_config /home/airflow/composer_kube_config

Attendi qualche minuto per l'aggiornamento dell'ambiente. Poi esegui di nuovo l'attività ex-kube-templates e verifica che l'attività ex-kube-templates sia riuscita.

Configurazione delle variabili secret

Un secret di Kubernetes è un oggetto che contiene dati sensibili. Puoi passare i secret ai pod Kubernetes utilizzando KubernetesPodOperator. I secret devono essere definiti in Kubernetes, altrimenti il pod non si avvia.

Questo esempio mostra due modi di utilizzare i secret di Kubernetes: come variabile di ambiente e come volume montato dal pod.

Il primo secret, airflow-secrets, è impostato su una variabile di ambiente Kubernetes denominata SQL_CONN (e non su una variabile di ambiente Airflow o Cloud Composer).

Il secondo secret, service-account, monta service-account.json, un file con un token dell'account di servizio, in /var/secrets/google.

Ecco come si presentano i segreti:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    # Name of the Kubernetes Secret
    # Key of a secret stored in this Secret object
secret_volume = Secret(
    # Path where we mount the secret as volume
    # Name of Kubernetes Secret
    # Key in the form of service account file name

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    # Name of the Kubernetes Secret
    # Key of a secret stored in this Secret object
secret_volume = secret.Secret(
    # Path where we mount the secret as volume
    # Name of Kubernetes Secret
    # Key in the form of service account file name

Il nome del primo secret di Kubernetes è definito nella variabile secret. Questo secret in particolare è denominato airflow-secrets. È esposto come variabile di ambiente, come indicato da deploy_type. La variabile di ambiente che imposta, deploy_target, è SQL_CONN. Infine, il key del secret archiviato nel deploy_target è sql_alchemy_conn.

Il nome del secondo secret di Kubernetes è definito nella variabile secret. Questo secret in particolare è denominato service-account. Viene esposto come volume, come previsto da deploy_type. Il percorso del file da montare, deploy_target, è /var/secrets/google. Infine, il key del secret archiviato nella deploy_target è service-account.json.

Ecco come si presenta la configurazione dell'operatore:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",

Se non vengono apportate modifiche al DAG o al tuo ambiente, l'attività ex-kube-secrets non va a buon fine. Se esamini i log, l'attività non riesce a causa di un errore Pod took too long to start. Questo errore si verifica perché Airflow non riesce a trovare il secret specificato nella configurazione secret_env.


Per impostare il secret utilizzando gcloud:

  1. Recupera informazioni sul tuo cluster di ambiente Cloud Composer.

    1. Esegui questo comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \


      • ENVIRONMENT con il nome del tuo ambiente.
      • LOCATION con la regione in cui si trova l'ambiente Cloud Composer.

      L'output di questo comando utilizza il seguente formato: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Per ottenere l'ID cluster GKE, copia l'output dopo /clusters/ (termina con -gke).

    3. Per ottenere la zona, copia l'output dopo /zones/.

  2. Esegui la connessione al tuo cluster GKE eseguendo questo comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE


    • CLUSTER_ID con l'ID del tuo cluster GKE.
    • PROJECT con l'ID del tuo progetto Google Cloud.
    • ZONE con la zona in cui si trova GKE.
  3. Creare secret Kubernetes.

    1. Crea un secret Kubernetes che imposti il valore di sql_alchemy_conn su test_value eseguendo questo comando:

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
    2. Crea un secret Kubernetes che imposti il valore di service-account.json su un percorso locale di un file di chiavi dell'account di servizio chiamato key.json eseguendo questo comando:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
  4. Dopo aver impostato i secret, esegui di nuovo l'attività ex-kube-secrets nell'interfaccia utente di Airflow.

  5. Verifica che l'attività ex-kube-secrets sia riuscita.

Configurazione di affinità pod

Quando configuri il parametro affinity in KubernetesPodOperator, puoi controllare su quali nodi pianificare i pod, ad esempio i nodi solo in un determinato pool di nodi. In questo esempio, l'operatore viene eseguito solo sui pool di nodi denominati pool-0 e pool-1. I nodi dell'ambiente di Cloud Composer 1 si trovano nell'default-pool, per cui i tuoi pod non vengono eseguiti sui nodi nel tuo ambiente.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                        "matchExpressions": [
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                        "matchExpressions": [
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [

Poiché l'esempio è attualmente configurato, l'attività non va a buon fine. Se esamini i log, l'attività non riesce perché i pool di nodi pool-0 e pool-1 non esistono.

Per assicurarti che i pool di nodi in values esistano, apporta una delle seguenti modifiche alla configurazione:

  • Se hai creato un pool di nodi in precedenza, sostituisci pool-0 e pool-1 con i nomi dei tuoi pool di nodi e carica di nuovo il DAG.

  • Crea un pool di nodi denominato pool-0 o pool-1. Puoi crearle entrambe, ma per avere successo è sufficiente una sola attività.

  • Sostituisci pool-0 e pool-1 con default-pool, che è il pool predefinito utilizzato da Airflow. Quindi, carica nuovamente il DAG.

Dopo aver apportato le modifiche, attendi qualche minuto per l'aggiornamento dell'ambiente. Quindi esegui di nuovo l'attività ex-pod-affinity e verifica che l'attività ex-pod-affinity sia riuscita.

Configurazione completa

Questo esempio mostra tutte le variabili che puoi configurare in KubernetesPodOperator. Non è necessario modificare il codice per completare l'attività ex-all-configs.

Per i dettagli su ogni variabile, consulta il riferimento Airflow KubernetesPodOperator.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        limits={"memory": "250M", "cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    # List of Volume objects to pass to the Pod.
    # List of VolumeMount objects to pass to the Pod.
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    # List of Volume objects to pass to the Pod.
    # List of VolumeMount objects to pass to the Pod.
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/

Informazioni sul provider Kubernetes CNCF

GKEStartPodOperator e KubernetesPodOperator sono implementati all'interno del provider apache-airflow-providers-cncf-kubernetes.

Per le note di rilascio non riuscite per il provider Kubernetes del CNCF, fai riferimento al sito web del provider Kubernetes di CNCF.

Versione 6.0.0

Nella versione 6.0.0 del pacchetto provider Kubernetes del CNCF, la connessione kubernetes_default viene utilizzata per impostazione predefinita in KubernetesPodOperator.

Se hai specificato una connessione personalizzata nella versione 5.0.0, questa connessione personalizzata viene ancora utilizzata dall'operatore. Per tornare a utilizzare la connessione kubernetes_default, potresti voler modificare i DAG di conseguenza.

Versione 5.0.0

Questa versione introduce alcune modifiche incompatibili con le versioni precedenti rispetto alla versione 4.4.0. Quelli più importanti sono relativi alla connessione kubernetes_default, che non viene utilizzata nella versione 5.0.0.

  • La connessione kubernetes_default deve essere modificata. Il percorso di configurazione kube deve essere impostato su /home/airflow/composer_kube_config (come mostrato nella Figura 1). In alternativa, devi aggiungere config_file alla configurazione KubernetesPodOperator (come mostrato nel seguente esempio di codice).
Campo del percorso di configurazione kube nella UI di Airflow
Figura 1. UI di Airflow, modifica della connessione kubernetes_default (fai clic per ingrandire)
  • Modifica il codice di un'attività utilizzando KubernetesPodOperator nel seguente modo:
  # config_file parameter - can be skipped if connection contains this setting
  # definition of connection to be used by the operator

Per maggiori informazioni sulla versione 5.0.0, consulta le Note di rilascio del provider CNCF Kubernetes

Risoluzione dei problemi

Suggerimenti per la risoluzione degli errori dei pod

Oltre a controllare i log delle attività nella UI di Airflow, controlla anche i seguenti log:

  • Output dello scheduler e dei worker di Airflow:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai a Ambienti

    2. Segui il link dei DAG per il tuo ambiente.

    3. Sali di un livello nel bucket del tuo ambiente.

    4. Esamina i log nella cartella logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Log dettagliati dei pod nella console Google Cloud all'interno dei carichi di lavoro GKE. Questi log includono il file YAML di definizione dei pod, gli eventi dei pod e i dettagli dei pod.

Codici di reso diversi da zero quando utilizzi anche GKEStartPodOperator

Quando utilizzi KubernetesPodOperator e GKEStartPodOperator, il codice restituito del punto di ingresso del container determina se l'attività viene considerata riuscita o meno. I codici di ritorno diversi da zero indicano un errore.

Un pattern comune quando si utilizzano KubernetesPodOperator e GKEStartPodOperator è l'esecuzione di uno script shell come punto di ingresso del container per raggruppare più operazioni all'interno del container.

Se stai scrivendo uno script di questo tipo, ti consigliamo di includere il comando set -e nella parte superiore dello script in modo che i comandi non riusciti nello script terminino lo script e propaghino l'errore all'istanza dell'attività Airflow.

Timeout dei pod

Il timeout predefinito per KubernetesPodOperator è 120 secondi, che può comportare timeout prima del download di immagini più grandi. Puoi aumentare il timeout modificando il parametro startup_timeout_seconds quando crei KubernetesPodOperator.

Quando si verifica il timeout di un pod, il log specifico dell'attività è disponibile nella UI di Airflow. Ad esempio:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

I timeout dei pod possono verificarsi anche quando l'account di servizio Cloud Composer non dispone delle autorizzazioni IAM necessarie per eseguire l'attività. Per verificarlo, esamina gli errori a livello di pod utilizzando le dashboard GKE per esaminare i log per il tuo carico di lavoro specifico oppure usa Cloud Logging.

Impossibile stabilire una nuova connessione

L'upgrade automatico è abilitato per impostazione predefinita nei cluster GKE. Se un pool di nodi si trova in un cluster in fase di upgrade, potresti visualizzare il seguente errore:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Per verificare se è in corso l'upgrade del cluster, nella console Google Cloud vai alla pagina Cluster Kubernetes e cerca l'icona di caricamento accanto al nome del cluster del tuo ambiente.

