Usa KubernetesPodOperator

En esta página, se describe cómo usar KubernetesPodOperator para iniciar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer y para garantizar que el entorno tenga los recursos adecuados.

Recursos del entorno de Cloud Composer en el proyecto del usuario y el proyecto del cliente con una flecha que muestra que los pods iniciados estarán en el mismo Kubernetes Engine que los trabajadores de Airflow, Redis, el programador de Airflow y el proxy de Cloud SQL
Ubicación del inicio del pod de Kubernetes de Cloud Composer (haz clic para ampliar)

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en la imagen de archivo del trabajador de Cloud Composer.

En esta página, se muestra un ejemplo de DAG que incluye las siguientes configuraciones de KubernetesPodOperator:

Antes de comenzar

Garantiza los recursos adecuados para tu entorno

Cuando creas un entorno de Cloud Composer, especificas la cantidad de potencia de procesamiento para el entorno y se asigna cierta cantidad al clúster de GKE. Iniciar pods de Kubernetes en el entorno puede hacer que los programas compitan entre ellos por recursos, como CPU o memoria. Debido a que el programador y los trabajadores de Airflow están en el mismo clúster de GKE, los programadores y los trabajadores no funcionarán correctamente si la competencia provoca escasez de recursos.

Para evitar escasez de recursos, realiza una o más de las siguientes acciones:

Crea un grupo de nodos

La forma preferida de evitar la escasez de recursos en el entorno de Cloud Composer es crear un nuevo grupo de nodos y configurar los pods de Kubernetes para que se ejecuten solo con recursos de ese grupo.

Para crear un grupo de nodos en un clúster existente, realiza los siguientes pasos:

Console

  1. En Cloud Console, ve al menú de GKE.

    Visitar el menú de GKE.

  2. Selecciona el clúster deseado.

  3. Haz clic en Editar.

  4. Desde Grupos de nodos, haz clic en Agregar grupo de nodos.

  5. Configura tu grupo de nodos.

  6. Si lo deseas, puedes habilitar las opciones avanzadas, como las actualizaciones automáticas y el ajuste de escala automático.

  7. Haga clic en Save.

gcloud

Ingresa el siguiente comando:

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    --zone ZONE \
    ADDITIONAL_FLAGS 

Donde:

  • POOL_NAME es el nombre deseado del grupo de nodos.
  • CLUSTER es el nombre del clúster en el que se creará el grupo de nodos.
  • PROJECT_ID es el nombre del proyecto de Cloud Composer.
  • ZONE es la zona en la que se encuentra el clúster de GKE.

    Para ver la lista de opciones, consulta la documentación de gcloud container node-pools create.

    Una solicitud node-pools create correcta muestra la información del grupo de nodos:

    Creating node pool example-pool...done.
    Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
    NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
    example-pool  n1-standard-1  100           1.2.4

Aumenta la cantidad de nodos de tu entorno

Si aumentas la cantidad de nodos de tu entorno de Cloud Composer, aumentará la capacidad de procesamiento disponible para tus trabajadores. Este aumento no proporciona recursos adicionales para las tareas que requieren más CPU o RAM de las que proporciona el tipo de máquina especificado.

Para aumentar el recuento de nodos, actualiza tu entorno.

Especifica el tipo de máquina adecuado

Durante la creación de entornos de Cloud Composer, puedes especificar un tipo de máquina. A fin de garantizar que haya recursos disponibles, especifica el tipo de máquina ideal para el tipo de procesamiento que se produce en tu entorno de Cloud Composer.

Configuración de KubernetesPodOperator

Para continuar con este ejemplo, coloca todo el archivo kubernetes_pod_operator.py en la carpeta dags/ de tu entorno o agrega el código KubernetesPodOperator relevante a un DAG.

Las siguientes secciones explican cada configuración de KubernetesPodOperator en el ejemplo. Para obtener información sobre cada variable de configuración, consulta la referencia de Airflow.

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': 1, 'limit_cpu': 1},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

Configuración mínima

Para crear un KubernetesPodOperator, solo se requieren name, namespace, image y task_id.

Cuando colocas el siguiente fragmento de código en un DAG, la configuración utiliza los valores predeterminados en /home/airflow/composer_kube_config. No es necesario modificar el código para que la tarea pod-ex-minimum se realice correctamente.

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

Configuración de plantilla

Airflow admite el uso de plantillas de Jinja. Debes declarar las variables obligatorias (task_id, name, namespace, image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/code.html#default-variables

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

Sin cambiar el DAG ni tu entorno, la tarea ex-kube-templates falla debido a dos errores. Veamos cómo depurar la tarea y resolver los errores.

  1. Verifica que la tarea ex-kube-templates falle.
  2. Comprueba los registros de la tarea ex-kube-templates.

    Los registros muestran que esta tarea presenta errores porque la variable adecuada no existe (my_value).

  3. Para configurar my_value con gcloud o la IU de Airflow:

    gcloud

    Ingresa el siguiente comando:

    gcloud composer environments run ENVIRONMENT \
        --location LOCATION \
        variables -- \
        --set my_value example_value 

    Donde:

    • ENVIRONMENT es el nombre del entorno de Cloud Composer.
    • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.

    IU de Airflow

    1. En la barra de herramientas, haz clic en Admin > Variables.
    2. Haga clic en Crear.
    3. Ingresa la siguiente información:
      • Key: my_value
      • Val: example_value
    4. Haga clic en Save.
  4. Vuelve a ejecutar la tarea ex-kube-templates.

  5. Verifica el estado de la tarea ex-kube-templates.

    La tarea ex-kube-templates sigue fallando. Si observas los registros, la tarea ahora falla porque core/kubeconfig no se encuentra en config. Para hacer referencia a un config_file (un archivo de configuración de Kubernetes) personalizado, debes configurar la variable kube_config del archivo airflow.cfg en una configuración de Kubernetes válida.

  6. Para configurar la variable kube_config, ingresa el siguiente comando:

    gcloud composer environments update ENVIRONMENT \
        --location LOCATION \
        --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

    Donde:

    • ENVIRONMENT es el nombre del entorno de Cloud Composer.
    • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.
  7. Espera unos minutos a que se actualice tu entorno.

  8. Vuelve a ejecutar la tarea ex-kube-templates.

  9. Verifica que la tarea ex-kube-templates se realice correctamente.

Configuración de variables secretas

Un secreto de Kubernetes es un objeto que contiene una pequeña cantidad de datos sensibles. Puedes pasar secretos a los pods de Kubernetes con KubernetesPodOperator. Los secretos deben estar definidos en Kubernetes o el pod no se iniciará.

En este ejemplo, implementamos el secreto de Kubernetes, airflow-secrets, en una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de una variable de entorno de Airflow o Cloud Composer).

El secreto tiene el siguiente aspecto:

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

El nombre del secreto de Kubernetes se define en la variable secret. Este secreto específico se llama airflow-secrets. Se expone como una variable de entorno, según lo determina el deploy_type. La variable de entorno que establece, deploy_target, es SQL_CONN. Por último, la clave (key) del secreto que almacenamos en deploy_target es sql_alchemy_conn.

La configuración del operador tiene el siguiente aspecto:

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})

Si no realizas ningún cambio en el DAG ni en tu entorno, la tarea ex-kube-secrets falla. Veamos cómo depurar la tarea y resolver los errores.

  1. Verifica que la tarea ex-kube-secrets falle.
  2. Comprueba los registros de la tarea ex-kube-secrets.

    Si observas los registros, verás que la tarea falla debido a un error Pod took too long to start. Este error se produce porque Airflow no puede encontrar el secreto especificado en la configuración, secret_env.

  3. Para configurar el secreto con gcloud, haz lo siguiente:

    1. Ejecuta el siguiente comando para ver los detalles de tu entorno de Cloud Composer:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"

      Donde:

      • ENVIRONMENT es el nombre del entorno de Cloud Composer.
      • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.

        Se mostrará un resultado similar al siguiente: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. Para obtener el ID del clúster de GKE, copia el resultado después de /clusters/ (finaliza en -gke) y pégalo en algún lugar donde puedas recuperarlo más tarde. Este resultado es el ID de tu clúster.

    3. Para obtener la zona, copia el resultado después de /zones/ y pégalo en algún lugar donde puedas recuperarlo más tarde.

    4. Conéctate a tu clúster de GKE con el siguiente comando:

      gcloud container clusters get-credentials CLUSTER_ID \
      --zone ZONE \
      --project PROJECT
      

      Donde:

      • CLUSTER_ID es tu ID de clúster de GKE.
      • ZONE es la zona en la que se ubica tu GKE.
      • PROJECT es el ID de tu proyecto de Google Cloud.
    5. Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de sql_alchemy_conn en test_value:

      kubectl create secret generic airflow-secrets \
      --from-literal sql_alchemy_conn=test_value
      
  4. Después de configurar el secreto, vuelve a ejecutar la tarea ex-kube-secrets.

  5. Verifica que la tarea ex-kube-secrets se realice correctamente.

Configuración de afinidad de los pods

Si configuras el parámetro affinity en el KubernetesPodOperator, puedes controlar en qué nodos se programan los pods (por ejemplo, puedes especificar un grupo de nodos en particular). En este ejemplo, el operador solo se ejecuta en grupos de nodos llamados pool-0 y pool-1.

Recursos del entorno de Cloud Composer en el proyecto del usuario y el proyecto del cliente con una flecha que muestra que los pods iniciados estarán en el mismo Kubernetes Engine que los trabajadores de Airflow, Redis, el programador de Airflow y el proxy de Cloud SQL, pero específicamente en los grupos, pool-0 y pool-1, que figuran como cuadros independientes dentro de Kubernetes Engine.
Ubicación del inicio del pod de Kubernetes de Cloud Composer con afinidad de pods (haz clic para ampliar)


kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

Con la configuración actual de este ejemplo, la tarea falla. Veamos cómo depurar la tarea y resolver los errores.

  1. Verifica que la tarea ex-pod-affinity falle.
  2. Comprueba los registros de la tarea ex-pod-affinity.

    Si observas los registros, verás que la tarea falla porque los grupos de nodos pool-0 y pool-1 no existen.

  3. Para asegurarte de que los grupos de nodos de values existan, realiza cualquiera de los siguientes cambios de configuración:

    • Si creaste un grupo de nodos anteriormente, reemplaza pool-0 y pool-1 con los nombres de tus grupos de nodos y vuelve a subir el DAG.
    • Crea un grupo de nodos con el nombre pool-0 o pool-1. Puedes crear ambos, pero la tarea necesita solo uno para tener éxito.
    • Reemplaza pool-0 y pool-1 con default-pool, que es el grupo predeterminado que utiliza Airflow. A continuación, vuelve a subir el DAG. Nota: De forma predeterminada, los pods de Kubernetes se programan en default-pool. Si agregas grupos más adelante, estos se restringirán al default-pool.
  4. Espera unos minutos a que se actualice tu entorno.

  5. Vuelve a ejecutar la tarea ex-pod-affinity.

  6. Verifica que la tarea ex-pod-affinity se realice correctamente.

Configuración completa

En este ejemplo, se muestran todas las variables que puedes configurar en KubernetesPodOperator. No es necesario modificar el código para que la tarea ex-all-configs se realice correctamente.

Para obtener detalles sobre cada variable, consulta la referencia KubernetesPodOperator de Airflow.

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': 1, 'limit_cpu': 1},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

Administra los DAG

Visualiza el estado de una tarea

  1. Ve a la interfaz web de Airflow.
  2. En la página de los DAG, haz clic en el nombre del DAG (p. ej., composer_sample_kubernetes_pod).
  3. En la página de detalles de los DAG, haz clic en Graph View.
  4. Verifica el estado:

    • Tarea con errores: la tarea estará encerrada en un cuadro rojo (como ex-kube-templates). También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje State: Failed.

    • Tarea ejecutada correctamente: la tarea estará encerrada en un cuadro verde (como pod-ex-minimum). También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje State: Success.

Comprueba registros de tareas

  1. En la IU de Airflow, consulta el estado de la tarea.
  2. En la vista de grafo del DAG, haz clic en el nombre de la tarea.
  3. En el menú contextual Task Instance, haz clic en View Log.

Vuelve a ejecutar una tarea

  1. Para volver al DAG:, haz lo siguiente
    1. En la barra de herramientas de la IU de Airflow, haz clic en DAGs.
    2. Haz clic en el nombre del DAG (por ejemplo, composer_samples_kubernetes_pod).
  2. Para volver a ejecutar la tarea:
    1. Haz clic en el nombre de la tarea.
    2. Haz clic en Clear y, luego, en OK. La tarea se ejecuta de nuevo automáticamente.

Soluciona problemas

Sugerencias para solucionar problemas de error de pod

Además de comprobar los registros de tareas, también puedes verificar los siguientes registros:

  • El resultado del programador y los trabajadores de Airflow:

    1. Ve al depósito de Cloud Storage del entorno de Cloud Composer. Este es el depósito en el que se encuentran los DAG.
    2. Revisa los registros ubicados en logs/DAG_NAME/TASK_ID/EXECUTION_DATE.
  • Los registros detallados de los pods en Cloud Console, en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.

Códigos de retorno distintos de cero cuando también se usa el GKEPodOperator

Cuando se usa KubernetesPodOperator y GKEPodOperator, el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común cuando se utiliza KubernetesPodOperator y GKEPodOperator es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.

Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.

La tarea falla, a pesar del éxito del pod

La siguiente información es válida para entornos de Cloud Composer que ejecutan composer-1.4.1-airflow-* o versiones anteriores:

Si una tarea de Airflow se ejecuta durante una hora y los registros de tareas finalizan con kubernetes.client.rest.ApiException: (401) y Reason: Unauthorized, el trabajo subyacente de Kubernetes continuará ejecutándose después de ese punto (incluso es posible que tenga éxito). Sin embargo, Airflow informa que la tarea falló.

Para solucionar este problema, agrega una dependencia explícita de paquete de PyPI en kubernetes>=8.0.1.

Tiempos de espera de los pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un pod, el registro específico de la tarea estará disponible en la IU web de Airflow. Por ejemplo:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Si deseas verificar esto, revisa los errores en el nivel del pod mediante los Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Stackdriver Logging.

No se pudo establecer una conexión nueva

La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse

Para comprobar si tu clúster se está actualizando, ve al menú de GKE en Cloud Console y revisa si hay un ícono de carga junto al nombre del clúster de tu entorno.

Recursos relacionados