Usar KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2

En esta página, se describe cómo usar KubernetesPodOperator para implementar Pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer y garantizar que este tenga los recursos adecuados.

KubernetesPodOperator inicia los Pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un clúster específico, que puede ser un clúster independiente que no está relacionado con tu entorno. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en la imagen de archivo de trabajador de Cloud Composer.

En esta página, se explica un ejemplo de DAG de Airflow que incluye las siguientes configuraciones de KubernetesPodOperator:

Antes de comenzar

  • Recomendamos usar la versión más reciente de Cloud Composer. Como mínimo, esta versión debe ser compatible como parte de la política de baja y asistencia.
  • Asegúrate de que tu entorno tenga recursos suficientes. Iniciar pods en un entorno con escasez de recursos puede causar errores en el trabajador y el programador de Airflow.

Configura los recursos del entorno de Cloud Composer

Cuando creas un entorno de Cloud Composer, debes especificar sus parámetros de rendimiento, incluidos los parámetros de rendimiento del clúster del entorno. Iniciar Pods de Kubernetes en el clúster del entorno puede generar competencia por los recursos del clúster, como CPU o memoria. Debido a que el programador y los trabajadores de Airflow están en el mismo clúster de GKE, los programadores y los trabajadores no funcionarán correctamente si la competencia provoca escasez de recursos.

Para evitar escasez de recursos, realiza una o más de las siguientes acciones:

Crear un grupo de nodos

La forma preferida de evitar la escasez de recursos en el entorno de Cloud Composer es crear un nuevo grupo de nodos y configurar los pods de Kubernetes para que se ejecuten solo con recursos de ese grupo.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. Haz clic en el nombre de tu entorno.

  3. En la página Detalles del entorno, ve a la pestaña Configuración del entorno.

  4. En la sección Recursos > Clúster de GKE, sigue el vínculo para ver los detalles del clúster.

  5. Crea un grupo de nodos como se describe en Agrega un grupo de nodos.

gcloud

  1. Determina el nombre del clúster de tu entorno:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Reemplaza lo siguiente:

    • ENVIRONMENT_NAME por el nombre del entorno.
    • LOCATION por la región en la que se encuentra el entorno
  2. El resultado contiene el nombre del clúster de tu entorno. Por ejemplo, puede ser europe-west3-example-enviro-af810e25-gke.

  3. Crea un grupo de nodos como se describe en Agrega un grupo de nodos.

Aumenta la cantidad de nodos de tu entorno

Si aumentas la cantidad de nodos de tu entorno de Cloud Composer, aumentará la capacidad de procesamiento disponible para tus trabajadores. Este aumento no proporciona recursos adicionales para las tareas que requieren más CPU o RAM de las que proporciona el tipo de máquina especificado.

Para aumentar el recuento de nodos, actualiza tu entorno.

Especifica el tipo de máquina adecuado

Durante la creación de entornos de Cloud Composer, puedes especificar un tipo de máquina. A fin de garantizar que haya recursos disponibles, especifica el tipo de máquina para el tipo de procesamiento que se produce en tu entorno de Cloud Composer.

Configuración de KubernetesPodOperator

Para continuar con este ejemplo, coloca todo el archivo kubernetes_pod_operator.py en la carpeta dags/ de tu entorno o agrega el código relevante KubernetesPodOperator a un DAG.

Las siguientes secciones explican cada configuración de KubernetesPodOperator en el ejemplo. Para obtener información sobre cada variable de configuración, consulta la referencia de Airflow.

Airflow 2

import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. In Composer 1 there is the potential for
        # the resource starvation of Airflow workers and scheduler
        # within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources,
        # and using Composer 2 will mean the environment will autoscale.
        namespace="default",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
    )
    kubenetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="default",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="default",
        image="ubuntu",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
        },
    )
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    kubernetes_affinity_ex = KubernetesPodOperator(
        task_id="ex-pod-affinity",
        name="ex-pod-affinity",
        namespace="default",
        image="perl:5.34.0",
        cmds=["perl"],
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            "nodeAffinity": {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [
                        {
                            "matchExpressions": [
                                {
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # 'cloud.google.com/gke-nodepool' with the value of
                                    # the nodepool's name.
                                    "key": "cloud.google.com/gke-nodepool",
                                    "operator": "In",
                                    # The label key's value that pods can be scheduled
                                    # on.
                                    "values": [
                                        "pool-0",
                                        "pool-1",
                                    ],
                                }
                            ]
                        }
                    ]
                }
            }
        },
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="default",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            limits={"memory": "250M", "cpu": "100m"},
        ),
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Airflow 1

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace="default",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
    )
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="default",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
    )
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="default",
        image="ubuntu",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
        },
    )
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-pod-affinity",
        name="ex-pod-affinity",
        namespace="default",
        image="perl:5.34.0",
        cmds=["perl"],
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            "nodeAffinity": {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [
                        {
                            "matchExpressions": [
                                {
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # 'cloud.google.com/gke-nodepool' with the value of
                                    # the nodepool's name.
                                    "key": "cloud.google.com/gke-nodepool",
                                    "operator": "In",
                                    # The label key's value that pods can be scheduled
                                    # on.
                                    "values": [
                                        "pool-0",
                                        "pool-1",
                                    ],
                                }
                            ]
                        }
                    ]
                }
            }
        },
    )
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="default",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={"limit_memory": "250M", "limit_cpu": "100m"},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={},
    )

Configuración mínima

Para crear un KubernetesPodOperator, solo se requieren el name del Pod, el namespace en el que se ejecutará el Pod, el image para usarlo y el task_id.

Cuando colocas el siguiente fragmento de código en un DAG, la configuración utiliza los valores predeterminados en /home/airflow/composer_kube_config. No es necesario modificar el código para que la tarea pod-ex-minimum se realice correctamente.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Configuración de la plantilla

Airflow admite el uso de plantillas de Jinja. Debes declarar las variables obligatorias (task_id, name, namespace y image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Sin cambiar el DAG ni tu entorno, la tarea ex-kube-templates falla debido a dos errores. Los registros muestran que esta tarea falla porque no existe la variable adecuada (my_value). El segundo error, que puedes obtener después de corregir el primer error, muestra que la tarea falla porque core/kube_config no se encuentra en config.

Para corregir ambos errores, sigue los pasos que se detallan en este artículo.

Para configurar my_value con gcloud o la IU de Airflow:

IU de Airflow

En la IU de Airflow 2, haz lo siguiente:

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variable de lista, haz clic en Agregar un registro nuevo.

  4. En la página Agregar variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

En la IU de Airflow 1, haz lo siguiente:

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variables, haz clic en la pestaña Crear.

  4. En la página Variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

gcloud

Para Airflow 2, ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

En Airflow 1, ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Reemplaza lo siguiente:

  • ENVIRONMENT por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno

Para hacer referencia a un config_file personalizado (un archivo de configuración de Kubernetes), anula la opción de configuración kube_config de Airflow por una configuración válida de Kubernetes:

Sección Clave Valor
core kube_config /home/airflow/composer_kube_config

Espera unos minutos a que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-kube-templates y verifica que la tarea ex-kube-templates se complete con éxito.

Configuración de variables secretas

Un secreto de Kubernetes es un objeto que contiene datos sensibles. Puedes pasar secretos a los Pods de Kubernetes mediante KubernetesPodOperator. Los secretos deben estar definidos en Kubernetes o el pod no se iniciará.

En este ejemplo, se muestran dos formas de usar los Secrets de Kubernetes: como una variable de entorno y como un volumen activado por el Pod.

El primer secreto, airflow-secrets, se establece en una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de en una variable de entorno de Airflow o Cloud Composer).

El segundo secreto, service-account, activa service-account.json, un archivo con un token de cuenta de servicio, en /var/secrets/google.

Los secretos se ven de la siguiente forma:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

El nombre del primer secreto de Kubernetes se define en la variable secret. Este secreto específico se llama airflow-secrets. Se expone como una variable de entorno, según lo determina el deploy_type. La variable de entorno que establece, deploy_target, es SQL_CONN. Por último, el key del secreto que se almacena en deploy_target es sql_alchemy_conn.

El nombre del segundo secreto de Kubernetes se define en la variable secret. Este secreto específico se llama service-account. Se expone como un volumen, según lo determina el deploy_type. La ruta de acceso del archivo que deseas activar, deploy_target, es /var/secrets/google. Por último, el key del secreto almacenado en deploy_target es service-account.json.

Esta es la configuración del operador:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Si no realizas ningún cambio en el DAG ni en tu entorno, la tarea ex-kube-secrets falla. Si observas los registros, la tarea falla debido a un error Pod took too long to start. Este error se produce porque Airflow no puede encontrar el secreto especificado en la configuración, secret_env.

gcloud

Para configurar el secreto con gcloud, haz lo siguiente:

  1. Obtén información sobre tu clúster de entorno de Cloud Composer.

    1. Ejecuta el siguiente comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Reemplaza lo siguiente:

      • ENVIRONMENT por el nombre del entorno
      • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.

      El resultado de este comando usa el siguiente formato: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Para obtener el ID del clúster de GKE, copia el resultado después de /clusters/ (termina en -gke).

    3. Para obtener la zona, copia el resultado después de /zones/.

  2. Conéctate a tu clúster de GKE con el siguiente comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Reemplaza lo siguiente:

    • CLUSTER_ID por el ID del clúster de GKE
    • PROJECT por el ID del proyecto de Google Cloud.
    • ZONE por la zona en la que se ubica tu GKE.
  3. Crea secretos de Kubernetes

    1. Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de sql_alchemy_conn en test_value:

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    2. Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de service-account.json en una ruta local del archivo de claves de una cuenta de servicio llamado key.json:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      
  4. Después de configurar los secretos, vuelve a ejecutar la tarea ex-kube-secrets en la IU de Airflow.

  5. Verifica que la tarea ex-kube-secrets se realice correctamente.

Configuración de afinidad de Pods

Si configuras el parámetro affinity en el KubernetesPodOperator, puedes controlar en qué nodos se programan los pods (por ejemplo, puedes especificar un grupo de nodos en particular). En este ejemplo, el operador solo se ejecuta en grupos de nodos llamados pool-0 y pool-1. Los nodos de tu entorno de Cloud Composer 1 se encuentran en default-pool, por lo que los Pods no se ejecutan en los nodos de tu entorno.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Con la configuración actual de este ejemplo, la tarea falla. Si observas los registros, la tarea falla porque los grupos de nodos pool-0 y pool-1 no existen.

Para asegurarte de que los grupos de nodos de values existan, realiza cualquiera de los siguientes cambios de configuración:

  • Si creaste un grupo de nodos anteriormente, reemplaza pool-0 y pool-1 con los nombres de tus grupos de nodos y vuelve a subir el DAG.

  • Crea un grupo de nodos con el nombre pool-0 o pool-1. Puedes crear ambos, pero la tarea necesita solo uno para tener éxito.

  • Reemplaza pool-0 y pool-1 con default-pool, que es el grupo predeterminado que utiliza Airflow. A continuación, vuelve a subir el DAG.

Después de realizar los cambios, espera unos minutos hasta que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-pod-affinity y verifica que la tarea ex-pod-affinity se realice de forma correcta.

Configuración completa

En este ejemplo, se muestran todas las variables que puedes configurar en KubernetesPodOperator. No es necesario modificar el código para que la tarea ex-all-configs se realice correctamente.

Para obtener detalles sobre cada variable, consulta la referencia KubernetesPodOperator de Airflow.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Información sobre el proveedor de Kubernetes de CNCF

GKEStartPodOperator y KubernetesPodOperator se implementan dentro del proveedor de apache-airflow-providers-cncf-kubernetes.

Para ver las notas de la versión fallidas del proveedor de Kubernetes de CNCF, consulta el sitio web del proveedor de Kubernetes de CNCF.

Versión 6.0.0

En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF, se usa la conexión kubernetes_default de forma predeterminada en KubernetesPodOperator.

Si especificaste una conexión personalizada en la versión 5.0.0, el operador aún la usará. Para volver a usar la conexión kubernetes_default, es posible que desees ajustar tus DAG según corresponda.

Versión 5.0.0

Esta versión presenta algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Las más importantes, que debes tener en cuenta, están relacionadas con la conexión kubernetes_default, que no se usa en la versión 5.0.0.

  • Se debe modificar la conexión kubernetes_default: la ruta de configuración de Kube debe configurarse como /home/airflow/composer_kube_config (consulta la Figura 1) o, como alternativa, se debe agregar config_file a la configuración de KubernetesPodOperator (como se presentó a continuación).
Campo de ruta de acceso de la configuración de Kube en la IU de Airflow
Figura 1: IU de Airflow, modificando la conexión kubernetes_default (haz clic para ampliar)
  • Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Para obtener más información sobre la versión 5.0.0, consulta las Notas de la versión del proveedor de Kubernetes de CNCF

Soluciona problemas

Sugerencias para solucionar problemas de error de pod

Además de verificar los registros de tareas en la IU de Airflow, también verifica los siguientes registros:

  • El resultado del programador y los trabajadores de Airflow:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. Sigue el vínculo de los DAG de tu entorno.

    3. En el bucket de tu entorno, sube un nivel.

    4. Revisa los registros de la carpeta logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Registros detallados de Pods en la consola de Google Cloud, en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.

Códigos de retorno distintos de cero cuando también se usa el GKEStartPodOperator

Cuando se usa KubernetesPodOperator y GKEStartPodOperator, el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común cuando se utiliza KubernetesPodOperator y GKEStartPodOperator es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.

Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.

Tiempos de espera de los pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Si deseas verificar esto, revisa los errores en el nivel del pod mediante los Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

No se pudo establecer una nueva conexión

La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para verificar si tu clúster se está actualizando, en la consola de Google Cloud, ve a la página Clústeres de Kubernetes y busca el ícono de carga junto al nombre del clúster de tu entorno.

¿Qué sigue?