Usar KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2

En esta página, se describe cómo usar KubernetesPodOperator para implementar Pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer y garantizar que este tenga los recursos adecuados.

KubernetesPodOperator inicia los Pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un clúster específico, que puede ser un clúster independiente que no está relacionado con tu entorno. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en la imagen de archivo de trabajador de Cloud Composer.

En esta página, se explica un ejemplo de DAG de Airflow que incluye las siguientes configuraciones de KubernetesPodOperator:

Antes de comenzar

  • En Cloud Composer 2, el clúster de tu entorno escala automáticamente. Las cargas de trabajo adicionales que ejecutas con KubernetesPodOperator se escalan de manera independiente del entorno. Tu entorno no se ve afectado por el aumento de la demanda de recursos, pero el clúster de tu entorno aumenta o reduce la escala verticalmente según la demanda de recursos. Los precios de las cargas de trabajo adicionales que ejecutas en el clúster de tu entorno siguen el modelo de precios de Cloud Composer 2 y usan los SKU de Compute de Cloud Composer.

  • Los clústeres de Cloud Composer 2 usan Workload Identity. De forma predeterminada, los Pods que se ejecutan en espacios de nombres recién creados o en el espacio de nombres composer-user-workloads no pueden acceder a los recursos de Google Cloud. Cuando usas Workload Identity, las cuentas de servicio de Kubernetes asociadas con espacios de nombres deben asignarse a cuentas de servicio de Google Cloud a fin de habilitar la autorización de identidad de servicio para solicitudes a las API de Google y otros servicios.

    Debido a esto, si ejecutas Pods en el espacio de nombres composer-user-workloads o en un espacio de nombres recién creado en el clúster de tu entorno, no se crearán las vinculaciones de IAM adecuadas entre las cuentas de servicio de Kubernetes y Google Cloud, y estos Pods no podrán acceder a los recursos del proyecto de Google Cloud.

    Si deseas que tus Pods tengan acceso a los recursos de Google Cloud, usa el espacio de nombres composer-user-workloads o crea tu propio espacio de nombres como se describe más adelante.

    Para proporcionar acceso a los recursos de tu proyecto, sigue las instrucciones de Workload Identity y configura las vinculaciones:

    1. Crea un espacio de nombres separado en el clúster de tu entorno.
    2. Crea una vinculación entre la cuenta de servicio de Kubernetes composer-user-workloads/<namespace_name> y la cuenta de servicio de tu entorno.
    3. Agrega la anotación de la cuenta de servicio de tu entorno a la cuenta de servicio de Kubernetes.
    4. Cuando uses KubernetesPodOperator, especifica el espacio de nombres y la cuenta de servicio de Kubernetes en los parámetros namespace y service_account_name.
  • Si se usa la versión 5.0.0 del proveedor de Kubernetes de CNCF, sigue las instrucciones documentadas en la sección del proveedor de Kubernetes de CNCF.

  • Cloud Composer 2 usa clústeres de GKE con Workload Identity. El servidor de metadatos de GKE tarda unos segundos en comenzar a aceptar solicitudes en un Pod recién creado. Por lo tanto, los intentos de autenticación con Workload Identity en los primeros segundos de la vida de un Pod pueden fallar. Puedes obtener más información sobre esta limitación aquí.

  • Cloud Composer 2 usa clústeres de Autopilot que introducen la noción de clases de procesamiento. De forma predeterminada, si no se selecciona ninguna clase, se supone que es la clase general-purpose cuando creas Pods con KubernetesPodOperator.

    • Cada clase está asociada con propiedades y límites de recursos específicos. Puedes obtener más información al respecto en la documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase general-purpose pueden usar hasta 110 GiB de memoria.

Configuración de KubernetesPodOperator

Para continuar con este ejemplo, coloca todo el archivo kubernetes_pod_operator.py en la carpeta dags/ de tu entorno o agrega el código relevante KubernetesPodOperator a un DAG.

Las siguientes secciones explican cada configuración de KubernetesPodOperator en el ejemplo. Para obtener información sobre cada variable de configuración, consulta la referencia de Airflow.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Configuración mínima

Para crear un KubernetesPodOperator, solo se requieren el name del Pod, el namespace en el que se ejecutará el Pod, el image para usarlo y el task_id.

Cuando colocas el siguiente fragmento de código en un DAG, la configuración utiliza los valores predeterminados en /home/airflow/composer_kube_config. No es necesario modificar el código para que la tarea pod-ex-minimum se realice correctamente.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuración de la plantilla

Airflow admite el uso de plantillas de Jinja. Debes declarar las variables obligatorias (task_id, name, namespace y image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Sin cambiar el DAG ni tu entorno, la tarea ex-kube-templates falla debido a dos errores. Los registros muestran que esta tarea falla porque no existe la variable adecuada (my_value). El segundo error, que puedes obtener después de corregir el primer error, muestra que la tarea falla porque core/kube_config no se encuentra en config.

Para corregir ambos errores, sigue los pasos que se detallan en este artículo.

Para configurar my_value con gcloud o la IU de Airflow:

IU de Airflow

En la IU de Airflow 2, haz lo siguiente:

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variable de lista, haz clic en Agregar un registro nuevo.

  4. En la página Agregar variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

gcloud

Para Airflow 2, ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Reemplaza lo siguiente:

  • ENVIRONMENT por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno

Para hacer referencia a un config_file personalizado (un archivo de configuración de Kubernetes), anula la opción de configuración kube_config de Airflow por una configuración válida de Kubernetes:

Sección Clave Valor
core kube_config /home/airflow/composer_kube_config

Espera unos minutos a que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-kube-templates y verifica que la tarea ex-kube-templates se complete con éxito.

Configuración de variables secretas

Un secreto de Kubernetes es un objeto que contiene datos sensibles. Puedes pasar secretos a los Pods de Kubernetes mediante KubernetesPodOperator. Los secretos deben estar definidos en Kubernetes o el pod no se iniciará.

En este ejemplo, se muestran dos formas de usar los Secrets de Kubernetes: como una variable de entorno y como un volumen activado por el Pod.

El primer secreto, airflow-secrets, se establece en una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de en una variable de entorno de Airflow o Cloud Composer).

El segundo secreto, service-account, activa service-account.json, un archivo con un token de cuenta de servicio, en /var/secrets/google.

Los secretos se ven de la siguiente forma:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

El nombre del primer secreto de Kubernetes se define en la variable secret. Este secreto específico se llama airflow-secrets. Se expone como una variable de entorno, según lo determina el deploy_type. La variable de entorno que establece, deploy_target, es SQL_CONN. Por último, el key del secreto que se almacena en deploy_target es sql_alchemy_conn.

El nombre del segundo secreto de Kubernetes se define en la variable secret. Este secreto específico se llama service-account. Se expone como un volumen, según lo determina el deploy_type. La ruta de acceso del archivo que deseas activar, deploy_target, es /var/secrets/google. Por último, el key del secreto almacenado en deploy_target es service-account.json.

Esta es la configuración del operador:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Si no realizas ningún cambio en el DAG ni en tu entorno, la tarea ex-kube-secrets falla. Si observas los registros, la tarea falla debido a un error Pod took too long to start. Este error se produce porque Airflow no puede encontrar el secreto especificado en la configuración, secret_env.

gcloud

Para configurar el secreto con gcloud, haz lo siguiente:

  1. Obtén información sobre tu clúster de entorno de Cloud Composer.

    1. Ejecuta el siguiente comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Reemplaza lo siguiente:

      • ENVIRONMENT por el nombre del entorno
      • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.

      El resultado de este comando usa el siguiente formato: projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>.

    2. Para obtener el ID del clúster de GKE, copia el resultado después de /clusters/ (termina en -gke).

  2. Conéctate a tu clúster de GKE con el siguiente comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    Reemplaza lo siguiente:

    • CLUSTER_ID por el ID del clúster de GKE
    • PROJECT por el ID del proyecto de Google Cloud.
    • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.

  3. Crea secretos de Kubernetes

    1. Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de sql_alchemy_conn en test_value:

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    2. Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de service-account.json en una ruta local del archivo de claves de una cuenta de servicio llamado key.json:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

  4. Después de configurar los secretos, vuelve a ejecutar la tarea ex-kube-secrets en la IU de Airflow.

  5. Verifica que la tarea ex-kube-secrets se realice correctamente.

Configuración completa

En este ejemplo, se muestran todas las variables que puedes configurar en KubernetesPodOperator. No es necesario modificar el código para que la tarea ex-all-configs se realice correctamente.

Para obtener detalles sobre cada variable, consulta la referencia KubernetesPodOperator de Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Información sobre el proveedor de Kubernetes de CNCF

GKEStartPodOperator y KubernetesPodOperator se implementan dentro del proveedor de apache-airflow-providers-cncf-kubernetes.

Para ver las notas de la versión fallidas del proveedor de Kubernetes de CNCF, consulta el sitio web del proveedor de Kubernetes de CNCF.

Versión 6.0.0

En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF, se usa la conexión kubernetes_default de forma predeterminada en KubernetesPodOperator.

Si especificaste una conexión personalizada en la versión 5.0.0, el operador aún la usará. Para volver a usar la conexión kubernetes_default, es posible que desees ajustar tus DAG según corresponda.

Versión 5.0.0

Esta versión presenta algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Las más importantes, que debes tener en cuenta, están relacionadas con la conexión kubernetes_default, que no se usa en la versión 5.0.0.

  • Se debe modificar la conexión kubernetes_default: la ruta de configuración de Kube debe configurarse como /home/airflow/composer_kube_config (consulta la Figura 1) o, como alternativa, se debe agregar config_file a la configuración de KubernetesPodOperator (como se presentó a continuación).
Campo de ruta de acceso de la configuración de Kube en la IU de Airflow
Figura 1: IU de Airflow, modificando la conexión kubernetes_default (haz clic para ampliar)
  • Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Para obtener más información sobre la versión 5.0.0, consulta las Notas de la versión del proveedor de Kubernetes de CNCF

Soluciona problemas

Sugerencias para solucionar problemas de error de pod

Además de verificar los registros de tareas en la IU de Airflow, también verifica los siguientes registros:

  • El resultado del programador y los trabajadores de Airflow:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. Sigue el vínculo de los DAG de tu entorno.

    3. En el bucket de tu entorno, sube un nivel.

    4. Revisa los registros de la carpeta logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Registros detallados de Pods en la consola de Google Cloud, en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.

Códigos de retorno distintos de cero cuando también se usa el GKEStartPodOperator

Cuando se usa KubernetesPodOperator y GKEStartPodOperator, el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común cuando se utiliza KubernetesPodOperator y GKEStartPodOperator es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.

Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.

Tiempos de espera de los pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Si deseas verificar esto, revisa los errores en el nivel del pod mediante los Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

No se pudo establecer una nueva conexión

La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para verificar si tu clúster se está actualizando, en la consola de Google Cloud, ve a la página Clústeres de Kubernetes y busca el ícono de carga junto al nombre del clúster de tu entorno.

¿Qué sigue?