Utilizzo di KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

In questa pagina viene descritto come utilizzare KubernetesPodOperator per il deployment Pod Kubernetes da Cloud Composer a Google Kubernetes Engine che fa parte del tuo ambiente Cloud Composer e per garantire che il tuo ambiente disponga delle risorse appropriate.

KubernetesPodOperator di avvii Pod Kubernetes nel cluster del tuo ambiente. In confronto, Gli operatori di Google Kubernetes Engine eseguono i pod Kubernetes in un che può essere un cluster separato non correlato completamente gestito di Google Cloud. Puoi anche creare ed eliminare cluster utilizzando di Google Kubernetes Engine.

KubernetesPodOperator è una buona soluzione se hai bisogno di:

  • Dipendenze Python personalizzate che non sono disponibili tramite la piattaforma PyPI pubblica repository Git.
  • Dipendenze binarie che non sono disponibili nello stock Immagine worker di Cloud Composer.

Questa pagina illustra un esempio di DAG Airflow che include quanto segue KubernetesPodOperator configurazioni:

Prima di iniziare

  • In Cloud Composer 3, il cluster del tuo ambiente scala automaticamente. Carichi di lavoro aggiuntivi che esegui utilizzando la scalabilità di KubernetesPodOperator in modo indipendente dal tuo ambiente. Il tuo ambiente non è influenzato dall'aumento della richiesta di risorse, ma fa lo scale up e lo scale down del cluster del tuo ambiente a seconda della risorsa domanda. I prezzi per i carichi di lavoro aggiuntivi che esegui in che il cluster dell'ambiente segue Modello di prezzi di Cloud Composer 3 e utilizzi SKU di Cloud Composer 3.

  • In Cloud Composer 3, il cluster del tuo ambiente si trova nel progetto tenant. Tuttavia, KubernetesPodOperator esegue allo stesso modo, senza dover apportare modifiche al codice rispetto a Cloud Composer 2. I pod vengono eseguiti nel cluster dell'ambiente, in uno spazio dei nomi isolato, ma con accesso alla tua rete VPC (se abilitata).

  • Cloud Composer 3 utilizza i cluster GKE con Workload Identity. Per impostazione predefinita, i pod eseguiti in spazi dei nomi appena creati o nell'composer-user-workloads non può accedere alle risorse Google Cloud. Quando si utilizza Workload Identity, account di servizio Kubernetes associati a devono essere mappati agli account di servizio Google Cloud, abilitare l'autorizzazione delle identità dei servizi per le richieste alle API di Google e ad altre i servizi di machine learning.

    Per questo motivo, se esegui pod nello spazio dei nomi composer-user-workloads uno spazio dei nomi appena creato nel cluster dell'ambiente, Associazioni IAM tra Kubernetes e Google Cloud che non vengono creati e questi pod non possono accedere alle risorse del tuo progetto Google Cloud.

    Se vuoi che i tuoi pod abbiano accesso alle risorse Google Cloud, quindi utilizza lo spazio dei nomi composer-user-workloads o creane uno personalizzato come descritto in maggiore dettaglio.

    Per fornire l'accesso alle risorse del tuo progetto, segui le indicazioni di Workload Identity e imposta le associazioni:

    1. Crea uno spazio dei nomi separato nel cluster del tuo ambiente.
    2. Crea un'associazione tra composer-user-workloads/<namespace_name> account di servizio Kubernetes e l'account di servizio del tuo ambiente.
    3. Aggiungi l'annotazione dell'account di servizio del tuo ambiente a Kubernetes l'account di servizio.
    4. Quando utilizzi KubernetesPodOperator, specifica lo spazio dei nomi e l'account di servizio Kubernetes in namespace e Parametri service_account_name.
  • Cloud Composer 3 utilizza i cluster GKE con il carico di lavoro Identità. Il server di metadati GKE impiega alcuni secondi per iniziare ad accettare richieste su un pod appena creato. Di conseguenza, il tentativo di a eseguire l'autenticazione con Workload Identity entro i primi secondi di una Il ciclo di vita del pod potrebbe interrompersi. Per ulteriori informazioni su questa limitazione, consulta: Restrizioni di Workload Identity.

  • Cloud Composer 3 utilizza i cluster Autopilot che introducono la nozione di classi di computing:

    • Per impostazione predefinita, se non è selezionato alcun corso, il corso general-purpose sarà dei pod durante la creazione dei pod utilizzando KubernetesPodOperator.

    • Ogni classe è associata a proprietà e limiti di risorse specifici, Puoi leggere informazioni in merito in Documentazione di Autopilot. Ad esempio, i pod in esecuzione nella classe general-purpose possono utilizzare fino a 110 GiB di memoria.

Configurazione di KubernetesPodOperator

Per seguire questo esempio, inserisci l'intero kubernetes_pod_operator.py file nella cartella dags/ del tuo ambiente aggiungi il codice KubernetesPodOperator pertinente a un DAG.

Le sezioni seguenti spiegano ciascuna configurazione di KubernetesPodOperator dell'esempio. Per informazioni su ogni variabile di configurazione, consulta il riferimento di Airflow.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Configurazione minima

Per creare KubernetesPodOperator, solo i pod name, namespace, dove eseguire il pod, sono obbligatori image per l'uso e task_id.

Quando inserisci il seguente snippet di codice in un DAG, la configurazione utilizza valori predefiniti in /home/airflow/composer_kube_config. Non è necessario modificare codice per completare l'attività pod-ex-minimum.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configurazione modello

Airflow supporta Modelli Jinja. Devi dichiarare le variabili richieste (task_id, name, namespace, e image) con l'operatore. Come mostrato nell'esempio seguente, modelli tutti gli altri parametri con Jinja, inclusi cmds, arguments, env_vars e config_file.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Senza modificare il DAG o il tuo ambiente, l'attività ex-kube-templates non riesce a causa di due errori. I log mostrano che questa attività non è riuscita perché non esiste la variabile appropriata (my_value). Il secondo errore, che dopo aver corretto il primo errore, indica che l'attività non riesce perché Impossibile trovare core/kube_config in config.

Per correggere entrambi gli errori, segui i passaggi descritti più avanti.

Per impostare my_value con gcloud o con la UI di Airflow:

UI di Airflow

Nell'interfaccia utente di Airflow 2:

  1. Vai alla UI di Airflow.

  2. Nella barra degli strumenti, seleziona Amministrazione > Variabili.

  3. Nella pagina Elenca variabili, fai clic su Aggiungi un nuovo record.

  4. Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:

    • Chiave:my_value
    • Val: example_value
  5. Fai clic su Salva.

gcloud

Per Airflow 2, inserisci il comando seguente:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Sostituisci:

  • ENVIRONMENT con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

Per fare riferimento a un config_file personalizzato (un file di configurazione di Kubernetes), sostituisci l'opzione di configurazione kube_config Airflow in modo che configurazione Kubernetes valida:

Sezione Chiave Valore
core kube_config /home/airflow/composer_kube_config

Attendi qualche minuto per l'aggiornamento dell'ambiente. Poi esegui di nuovo l'attività ex-kube-templates e verifica che ex-kube-templates attività riuscita.

Configurazione completa

Questo esempio mostra tutte le variabili che puoi configurare nel KubernetesPodOperator. Non è necessario modificare il codice per l'attività ex-all-configs per completare l'operazione.

Per maggiori dettagli su ogni variabile, consulta Riferimento KubernetesPodOperator Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Informazioni sul provider Kubernetes CNCF

GKEStartPodOperator e KubernetesPodOperator sono implementati Provider apache-airflow-providers-cncf-kubernetes.

Per le note di rilascio non superate per il provider Kubernetes CNCF, fai riferimento al sito web del provider Kubernetes CNCF.

Versione 6.0.0

Nella versione 6.0.0 del pacchetto Provider Kubernetes CNCF, la connessione kubernetes_default viene utilizzata per impostazione predefinita KubernetesPodOperator.

Se hai specificato una connessione personalizzata nella versione 5.0.0, è ancora utilizzato dall'operatore. Per tornare a utilizzare kubernetes_default potresti voler regolare i DAG di conseguenza.

Versione 5.0.0

Questa versione introduce alcune modifiche non compatibili con le versioni precedenti rispetto alla versione 4.4.0. I più importanti sono correlati la connessione kubernetes_default non utilizzata nella versione 5.0.0.

  • La connessione kubernetes_default deve essere modificata. Percorso configurazione kube deve essere impostato su /home/airflow/composer_kube_config (come mostrato nella Figura 1) In alternativa, è necessario aggiungere config_file alla configurazione KubernetesPodOperator (come mostrato nel codice seguente esempio).
. Campo percorso configurazione kube nella UI di Airflow
Figura 1. UI di Airflow, modifica della connessione kubernetes_default (fai clic per ingrandire)
.
    .
  • Modifica il codice di un'attività utilizzando KubernetesPodOperator nel seguente modo:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Per ulteriori informazioni sulla versione 5.0.0, consulta le note di rilascio del provider Kubernetes CNC.

Risoluzione dei problemi

Suggerimenti per la risoluzione degli errori dei pod

Oltre a controllare i log delle attività nella UI di Airflow, controlla anche seguenti log:

  • Output dello scheduler e dei worker di Airflow:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Segui il link DAG per il tuo ambiente.

    3. Nel bucket del tuo ambiente, sali di un livello.

    4. Esamina i log nel logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE> .

  • Log dettagliati dei pod nella console Google Cloud per i carichi di lavoro GKE. Questi log includono il pod il file YAML della definizione, gli eventi dei pod e i dettagli dei pod.

Codici di reso diversi da zero se utilizzi anche GKEStartPodOperator

Quando utilizzi KubernetesPodOperator e GKEStartPodOperator, il un codice restituito del punto di ingresso del container determina se l'attività possono avere successo o meno. I codici di reso diversi da zero indicano un errore.

Un pattern comune quando si utilizzano KubernetesPodOperator e GKEStartPodOperator deve eseguire uno script shell come container punto di ingresso per raggruppare più operazioni all'interno del container.

Se stai scrivendo uno script di questo tipo, ti consigliamo di includere Il comando set -e nella parte superiore dello script in modo che i comandi con errori nello script terminino lo script di propagare l'errore all'istanza dell'attività Airflow.

Timeout pod

Il timeout predefinito per KubernetesPodOperator è 120 secondi, che può causare dei timeout prima del download di immagini di dimensioni maggiori. Puoi aumenta il timeout modificando startup_timeout_seconds durante la creazione del parametro KubernetesPodOperator.

Quando si verifica il timeout di un pod, il log specifico dell'attività è disponibile in la UI di Airflow. Ad esempio:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

I timeout dei pod possono verificarsi anche Account di servizio Cloud Composer non abbia le autorizzazioni IAM necessarie per eseguire l'attività mano. Per verificarlo, esamina gli errori a livello di pod utilizzando Dashboard di GKE per esaminare i log particolare carico di lavoro o usare Cloud Logging.

Impossibile stabilire una nuova connessione

L'upgrade automatico è abilitato per impostazione predefinita nei cluster GKE. Se un pool di nodi si trova in un cluster in fase di upgrade, potresti vedere quanto segue errore:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Per verificare se è in corso l'upgrade del cluster, nella console Google Cloud vai alla Cluster Kubernetes e cerca l'icona di caricamento accanto al tuo il nome del cluster dell'ambiente.

Passaggi successivi