Utilizzo di KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina descrive come utilizzare KubernetesPodOperator per eseguire il deployment di pod Kubernetes da Cloud Composer nel cluster Google Kubernetes Engine che fa parte del tuo ambiente Cloud Composer.

KubernetesPodOperator avvia pod Kubernetes nel cluster del tuo ambiente. In confronto, Gli operatori di Google Kubernetes Engine eseguono i pod Kubernetes in un che può essere un cluster separato non correlato al tuo completamente gestito di Google Cloud. Puoi anche creare ed eliminare cluster utilizzando di Google Kubernetes Engine.

KubernetesPodOperator è una buona opzione se hai bisogno di:

  • Dipendenze Python personalizzate che non sono disponibili tramite la piattaforma PyPI pubblica repository Git.
  • Dipendenze binarie non disponibili nell'immagine worker Cloud Composer di serie.

Prima di iniziare

  • Ti consigliamo di utilizzare la versione più recente di Cloud Composer. Questa versione deve essere supportata come parte di le norme relative al ritiro e all'assistenza.
  • Assicurati che i tuoi ha risorse sufficienti. L'avvio di pod in un ambiente con risorse limitate può causare errori dello scheduler e dei worker di Airflow.

Configura le risorse dell'ambiente Cloud Composer

Quando crei un ambiente Cloud Composer, ne specifichi la funzione inclusi quelli per le prestazioni dell'ambiente in un cluster Kubernetes. L'avvio di pod Kubernetes nel cluster dell'ambiente può causare competizione per le risorse del cluster, come CPU o memoria. Poiché il flusso di lavoro scheduler e worker si trovano nello stesso cluster GKE, gli scheduler e i worker non funzionano correttamente in caso di concorrenza carenza di risorse.

Per evitare la carenza di risorse, esegui una o più delle seguenti azioni:

Crea un pool di nodi

Il modo migliore per evitare la carenza di risorse nell'ambiente Cloud Composer è creare un nuovo pool di nodi e configurare i pod Kubernetes in modo che vengano eseguiti utilizzando solo le risorse di quel pool.

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Fai clic sul nome dell'ambiente.

  3. Nella pagina Dettagli dell'ambiente, vai alla scheda Configurazione dell'ambiente.

  4. Nella sezione Risorse > Cluster GKE, segui il link Visualizza i dettagli del cluster.

  5. Crea un pool di nodi come descritto in Aggiunta di un pool di nodi.

gcloud

  1. Determina il nome del cluster del tuo ambiente:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Sostituisci:

    • ENVIRONMENT_NAME con il nome dell'ambiente.
    • LOCATION con la regione in cui si trova l'ambiente.
  2. L'output contiene il nome del cluster dell'ambiente. Ad esempio: può essere europe-west3-example-enviro-af810e25-gke.

  3. Crea un pool di nodi come descritto in Aggiunta di un pool di nodi.

Aumenta il numero di nodi nel tuo ambiente

Aumentare il numero di nodi nel tuo ambiente Cloud Composer aumenta la potenza di calcolo disponibile per i carichi di lavoro. Questo aumento non fornisce risorse aggiuntive per le attività che richiedono più CPU o RAM rispetto a quanto fornito dal tipo di macchina specificato.

Per aumentare il numero di nodi, aggiorna l'ambiente.

Specifica il tipo di macchina appropriato

Durante la creazione dell'ambiente Cloud Composer, puoi specificare un tipo di macchina. Per garantire la disponibilità delle risorse, specifica machine type per il tipo di elaborazione che si verificano nel tuo ambiente Cloud Composer.

Configurazione minima

Per creare un KubernetesPodOperator, sono obbligatori solo i parametri name, image da utilizzare e task_id del pod. /home/airflow/composer_kube_config contiene le credenziali per l'autenticazione su GKE.

Flusso d'aria 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Flusso d'aria 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Configurazione dell'affinità dei pod

Quando configuri il parametro affinity in KubernetesPodOperator, controlla su quali nodi pianificare i pod, ad esempio solo i nodi di un determinato pool di nodi. In questo esempio, l'operatore viene eseguito solo sui pool di nodi denominati pool-0 e pool-1. I nodi del tuo ambiente Cloud Composer 1 si trovano in default-pool, in modo che i pod non vengano eseguiti sui nodi del tuo ambiente.

Flusso d'aria 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

In base alla configurazione dell'esempio, l'attività non riesce. Se esamini i log, l'attività non riesce perché i pool di nodi pool-0 e pool-1 non esistono.

Per assicurarti che i pool di nodi in values esistano, apporta una delle seguenti modifiche alla configurazione:

  • Se hai creato un pool di nodi in precedenza, sostituisci pool-0 e pool-1 con dei tuoi pool di nodi e carica di nuovo il DAG.

  • Crea un pool di nodi denominato pool-0 o pool-1. Puoi creare entrambi, ma l'attività ne richiede solo uno per essere completata.

  • Sostituisci pool-0 e pool-1 con default-pool, che è il pool predefinito usate da Airflow. Quindi, carica di nuovo il DAG.

Dopo aver apportato le modifiche, attendi alcuni minuti per l'aggiornamento dell'ambiente. Poi esegui di nuovo l'attività ex-pod-affinity e verifica che ex-pod-affinity dell'attività.

Configurazione aggiuntiva

Questo esempio mostra parametri aggiuntivi che puoi configurare nel KubernetesPodOperator.

Per ulteriori informazioni sui parametri, consulta la documentazione di riferimento di Airflow per KubernetesPodOperator. Per informazioni sull'uso dei secret e per gli oggetti ConfigMap, consulta Utilizzare i secret e gli oggetti ConfigMap di Kubernetes. Per informazioni sull'utilizzo dei modelli Jinja con KubernetesPodOperator, consulta Utilizzare i modelli Jinja.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Flusso d'aria 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Utilizzare i modelli Jinja

Airflow supporta i modelli Jinja nei DAG.

Devi dichiarare i parametri Airflow richiesti (task_id, name e image) con l'operatore. Come mostrato nell'esempio seguente, puoi creare un modello per tutti gli altri parametri con Jinja, tra cui cmds, arguments, env_vars e config_file.

Il parametro env_vars nell'esempio viene impostato da un Variabile Airflow denominata my_value. Il DAG di esempio ricava il proprio valore dalla variabile del modello vars in Airflow. Airflow ha più variabili che forniscono l'accesso a diversi tipi di informazioni. Ad esempio, puoi utilizzare la variabile del modello conf per accedere ai valori delle opzioni di configurazione di Airflow. Per ulteriori informazioni e per delle variabili disponibili in Airflow, consulta Riferimento ai modelli in Airflow documentazione.

Se non modifichi il DAG o non crei la variabile env_vars, l'attività ex-kube-templates nell'esempio non va a buon fine perché la variabile non esiste. Crea questa variabile nella UI di Airflow o con Google Cloud CLI:

Interfaccia utente di Airflow

  1. Vai all'interfaccia utente di Airflow.

  2. Nella barra degli strumenti, seleziona Amministrazione > Voci.

  3. Nella pagina List Variable (Variabile elenco), fai clic su Add a new record (Aggiungi un nuovo record).

  4. Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:

    • Chiave:my_value
    • Val: example_value
  5. Fai clic su Salva.

Se il tuo ambiente utilizza Airflow 1, esegui invece il seguente comando:

  1. Vai all'interfaccia utente di Airflow.

  2. Nella barra degli strumenti, seleziona Amministrazione > Variabili.

  3. Nella pagina Variabili, fai clic sulla scheda Crea.

  4. Nella pagina Variabile, inserisci le seguenti informazioni:

    • Chiave:my_value
    • Val: example_value
  5. Fai clic su Salva.

gcloud

Inserisci questo comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Se il tuo ambiente utilizza Airflow 1, esegui invece il seguente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Sostituisci:

  • ENVIRONMENT con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

L'esempio seguente mostra come utilizzare i modelli Jinja con KubernetesPodOperator:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Utilizzare secret e ConfigMap di Kubernetes

Un secret Kubernetes è un oggetto che contiene dati sensibili. Un cluster Kubernetes ConfigMap è un oggetto che contiene dati non riservati in coppie chiave/valore.

In Cloud Composer 2, puoi creare Secret e ConfigMap utilizzando Google Cloud CLI, API o Terraform e accedervi da KubernetesPodOperator.

Informazioni sui file di configurazione YAML

Quando crei un secret o un ConfigMap Kubernetes utilizzando l'API e Google Cloud CLI, fornisci un file in formato YAML. Questo file deve essere conforme allo stesso usato da Secret e ConfigMap di Kubernetes. documentazione di Kubernetes fornisce molti esempi di codice di ConfigMap e Secret. Per iniziare, consulta la pagina Distribuzione sicura delle credenziali tramite Secret e ConfigMaps.

Come per i secret di Kubernetes, utilizza la classe base64 quando definisci i valori nei secret.

Per codificare un valore, puoi utilizzare il comando seguente (uno dei molti modi per ottenere un valore con codifica Base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Output:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

I seguenti due esempi di file YAML vengono utilizzati negli esempi più avanti in questa guida. Esempio di file di configurazione YAML per un segreto Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Un altro esempio che mostra come includere i file. Come nell'esperienza precedente esempio, codifica prima i contenuti di un file (cat ./key.json | base64), quindi fornisci questo valore nel file YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Un file di configurazione YAML di esempio per un ConfigMap. Non è necessario usare il modello base64 in ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Gestire i secret di Kubernetes

In Cloud Composer 2, crei i secret utilizzando Google Cloud CLI e kubectl:

  1. Ottieni informazioni sul cluster del tuo ambiente:

    1. Esegui questo comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Sostituisci:

      • ENVIRONMENT con il nome dell'ambiente.
      • LOCATION con la regione in cui si trova l'ambiente Cloud Composer.

      L'output di questo comando utilizza il seguente formato: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Per ottenere l'ID cluster GKE, copia l'output dopo /clusters/ (termina con -gke).

    3. Per ottenere la zona, copia l'output dopo /zones/.

  2. Connettiti al tuo cluster GKE con quanto segue :

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Sostituisci:

    • CLUSTER_ID: ID cluster dell'ambiente.
    • PROJECT_ID: l'ID progetto.
    • ZONE con la zona in cui si trova il cluster dell'ambiente.
  3. Crea i secret di Kubernetes:

    I seguenti comandi mostrano due diversi approcci alla creazione i secret di Kubernetes. L'approccio --from-literal utilizza coppie chiave-valore. L'approccio --from-file utilizza i contenuti dei file.

    • Per creare un secret di Kubernetes fornendo coppie chiave-valore, esegui il comando . Questo esempio crea un secret denominato airflow-secrets con un campo sql_alchemy_conn con il valore test_value.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Per creare un secret di Kubernetes fornendo i contenuti del file, esegui . In questo esempio viene creato un secret denominato service-account che ha il campo service-account.json con preso dai contenuti di un file ./key.json locale.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Utilizzare i secret di Kubernetes nei DAG

Questo esempio mostra due modi per utilizzare i secret di Kubernetes: come variabile di ambiente e come volume montato dal pod.

Il primo secret, airflow-secrets, è impostato a una variabile di ambiente Kubernetes denominata SQL_CONN (anziché una la variabile di ambiente Airflow o Cloud Composer).

Il secondo secret, service-account, monta service-account.json, un file con un token dell'account di servizio, su /var/secrets/google.

Ecco come sono gli oggetti Secret:

Flusso d'aria 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Flusso d'aria 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Il nome del primo secret di Kubernetes è definito nella variabile secret_env. Questo secret si chiama airflow-secrets. Il parametro deploy_type specifica deve essere esposto come variabile di ambiente. Il nome della variabile di ambiente è SQL_CONN, come specificato nel parametro deploy_target. Infine, del valore della variabile di ambiente SQL_CONN è impostato sul valore Chiave sql_alchemy_conn.

Il nome del secondo secret Kubernetes è definito nella variabile secret_volume. Questo secret è denominato service-account. Viene visualizzato come volume, come specificato nel parametro deploy_type. Il percorso del file in la montatura, deploy_target, è /var/secrets/google. Infine, il valore key del Il secret archiviato in deploy_target è service-account.json.

Ecco come si presenta la configurazione dell'operatore:

Flusso d'aria 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Flusso d'aria 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Informazioni sul provider Kubernetes CNCF

KubernetesPodOperator è implementato Provider apache-airflow-providers-cncf-kubernetes.

Per note di rilascio dettagliate per il provider Kubernetes CNCF, consulta Sito web del provider Kubernetes del CNC.

Versione 6.0.0

Nella versione 6.0.0 del pacchetto Provider Kubernetes CNCF, la connessione kubernetes_default viene utilizzata per impostazione predefinita in KubernetesPodOperator.

Se hai specificato una connessione personalizzata nella versione 5.0.0, è ancora utilizzato dall'operatore. Per tornare a utilizzare kubernetes_default potresti voler regolare i DAG di conseguenza.

Versione 5.0.0

Questa versione introduce alcune modifiche non compatibili con le versioni precedenti rispetto alla versione 4.4.0. I più importanti sono correlati la connessione kubernetes_default non utilizzata nella versione 5.0.0.

  • È necessario modificare la connessione kubernetes_default. Il percorso della configurazione Kubernetes deve essere impostato su /home/airflow/composer_kube_config (come mostrato nella figura seguente). In alternativa, config_file deve essere aggiunto alla configurazione di KubernetesPodOperator (come mostrato nell'esempio di codice seguente).
di Gemini Advanced.
Campo percorso configurazione kube nella UI di Airflow
Figura 1. UI di Airflow, modifica della connessione kubernetes_default (fai clic per ingrandire)
di Gemini Advanced.
  • Modifica il codice di un'attività utilizzando KubernetesPodOperator nel seguente modo:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Per ulteriori informazioni sulla versione 5.0.0, consulta le note di rilascio del provider Kubernetes CNCF.

Risoluzione dei problemi

Questa sezione fornisce consigli per la risoluzione dei problemi comuni di KubernetesPodOperator problemi:

Visualizza i log

Durante la risoluzione dei problemi, puoi controllare i log nel seguente ordine:

  1. Log delle attività Airflow:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

    3. Vai alla scheda DAG.

    4. Fai clic sul nome del DAG, quindi sulla relativa esecuzione per visualizzarne i dettagli e i log.

  2. Log dello scheduler Airflow:

    1. Vai alla pagina Dettagli dell'ambiente.

    2. Vai alla scheda Log.

    3. Controlla i log dello scheduler di Airflow.

  3. Log dei pod nella console Google Cloud, nella sezione Carichi di lavoro GKE. Questi log includono il file YAML di definizione dei pod, gli eventi dei pod Dettagli pod.

Codici di reso diversi da zero

Quando utilizzi KubernetesPodOperator (e GKEStartPodOperator), il parametro del punto di ingresso del container determina se l'attività possono avere successo o meno. I codici di ritorno diversi da zero indicano un errore.

Un pattern comune è eseguire uno script shell come punto di ingresso del container raggruppare più operazioni all'interno del container.

Se stai scrivendo uno script di questo tipo, ti consigliamo di includere il comando set -e nella parte superiore dello script in modo che i comandi non riusciti nello script interrompano lo script e propaghino l'errore all'istanza dell'attività Airflow.

Timeout dei pod

Il timeout predefinito per KubernetesPodOperator è 120 secondi, può causare dei timeout prima del download di immagini di dimensioni maggiori. Puoi aumentare il timeout modificando il parametro startup_timeout_seconds quando crei KubernetesPodOperator.

Quando un pod scade, il log specifico dell'attività è disponibile nell'interfaccia utente di Airflow. Ad esempio:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

I timeout dei pod possono verificarsi anche Account di servizio Cloud Composer non disponga delle autorizzazioni IAM necessarie per eseguire l'attività mano. Per verificarlo, esamina gli errori a livello di pod utilizzando Dashboard di GKE per esaminare i log per particolare carico di lavoro o usare Cloud Logging.

Impossibile stabilire una nuova connessione

L'upgrade automatico è abilitato per impostazione predefinita nei cluster GKE. Se un pool di nodi si trova in un cluster in fase di upgrade, potresti vedere quanto segue errore:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Per verificare se è in corso l'upgrade del cluster, nella console Google Cloud vai alla Cluster Kubernetes e cerca l'icona di caricamento accanto al tuo il nome del cluster dell'ambiente.

Passaggi successivi