Usar KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se describe cómo usar KubernetesPodOperator para implementar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer.

KubernetesPodOperator inicia Pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan pods de Kubernetes en un clúster especificado, que puede ser un clúster independiente que no está relacionado con tu entorno. También puedes crear y borrar clústeres con operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en stock Imagen del trabajador de Cloud Composer.

Antes de comenzar

  • Te recomendamos que uses la versión más reciente de Cloud Composer. Como mínimo, esta versión debe ser compatible como parte de la política de baja y asistencia.
  • Asegúrate de que tu entorno tenga recursos suficientes. Iniciar pods en un entorno con escasez de recursos puede causar errores en el trabajador y el programador de Airflow.

Configura los recursos de tu entorno de Cloud Composer

Cuando creas un entorno de Cloud Composer, especificas sus parámetros de rendimiento, incluidos los parámetros de rendimiento del clúster del entorno. Iniciar Pods de Kubernetes en el clúster de entorno puede causar por los recursos del clúster, como la CPU o la memoria. Debido a que el programador y los trabajadores de Airflow están en el mismo clúster de GKE, los programadores y los trabajadores no funcionarán correctamente si la competencia provoca escasez de recursos.

Para evitar escasez de recursos, realiza una o más de las siguientes acciones:

Crea un grupo de nodos

La forma preferida de evitar la escasez de recursos en el entorno de Cloud Composer es crear un nuevo grupo de nodos y configurar los pods de Kubernetes para que se ejecuten solo con recursos de ese grupo.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. Haz clic en el nombre de tu entorno.

  3. En la página Detalles del entorno, ve a la pestaña Configuración del entorno.

  4. En Recursos > Clúster de GKE sigue el vínculo Ver detalles del clúster.

  5. Crea un grupo de nodos como se describe en Cómo agregar un grupo de nodos.

gcloud

  1. Determina el nombre del clúster de tu entorno:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Reemplaza lo siguiente:

    • ENVIRONMENT_NAME por el nombre del entorno.
    • LOCATION por la región en la que se encuentra el entorno
  2. El resultado contiene el nombre del clúster de tu entorno. Por ejemplo: puede ser europe-west3-example-enviro-af810e25-gke.

  3. Crea un grupo de nodos como se describe en Agrega un grupo de nodos.

Aumenta la cantidad de nodos en tu entorno

Si aumentas la cantidad de nodos de tu entorno de Cloud Composer, aumentará la capacidad de procesamiento disponible para tus trabajadores. Este aumento no proporciona recursos adicionales para las tareas que requieren más CPU o RAM de las que proporciona el tipo de máquina especificado.

Para aumentar el recuento de nodos, actualiza tu entorno.

Especifica el tipo de máquina adecuado

Durante la creación de entornos de Cloud Composer, puedes especificar un tipo de máquina. A fin de garantizar que haya recursos disponibles, especifica el tipo de máquina para el tipo de procesamiento que se produce en tu entorno de Cloud Composer.

Configuración mínima

Para crear un KubernetesPodOperator, solo se requieren los parámetros name, image y task_id del pod. /home/airflow/composer_kube_config contiene credenciales para autenticarse en GKE.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Configuración de afinidad de Pods

Si configuras el parámetro affinity en KubernetesPodOperator, puedes controlar en qué nodos se programan los pods (por ejemplo, puedes especificar un grupo de nodos en particular). En este ejemplo, el operador solo se ejecuta en grupos de nodos llamados pool-0 y pool-1. Los nodos de tu entorno de Cloud Composer 1 se encuentran en el default-pool, de modo que los Pods no se ejecuten en los nodos de tu entorno.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Con la configuración actual de este ejemplo, la tarea falla. Si observas los registros, verás que la tarea falla porque los grupos de nodos pool-0 y pool-1 no existen.

Para asegurarte de que los grupos de nodos de values existan, realiza cualquiera de los siguientes cambios de configuración:

  • Si creaste un grupo de nodos anteriormente, reemplaza pool-0 y pool-1 con los nombres de tus grupos de nodos y vuelve a subir el DAG.

  • Crea un grupo de nodos con el nombre pool-0 o pool-1. Puedes crear ambos, pero la tarea necesita solo uno para tener éxito.

  • Reemplaza pool-0 y pool-1 con default-pool, que es el grupo predeterminado que utiliza Airflow. A continuación, vuelve a subir el DAG.

Después de realizar los cambios, espera unos minutos a que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-pod-affinity y verifica que el ex-pod-affinity tarea se realice de forma correcta.

Configuración adicional

En este ejemplo, se muestran parámetros adicionales que puedes configurar en el KubernetesPodOperator

Para obtener más información sobre los parámetros, consulta la referencia de Airflow para KubernetesPodOperator. Para obtener información sobre cómo usar Secrets de Kubernetes y ConfigMaps, consulta Cómo usar Secrets y ConfigMaps de Kubernetes. Para para obtener más información sobre el uso de las plantillas de Jinja con KubernetesPodOperator, consulta Usa las plantillas de Jinja.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Utiliza las plantillas de Jinja

Airflow admite plantillas de Jinja en DAG.

Debes declarar los parámetros de Airflow obligatorios (task_id, name y image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas para todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

El parámetro env_vars del ejemplo se establece desde un Variable de Airflow llamada my_value. El DAG de ejemplo obtiene su valor de la variable de plantilla vars en Airflow. Airflow dispone de más variables que brindan acceso a distintos tipos de información. Por ejemplo, puedes usar la variable de plantilla conf para acceder a los valores de las opciones de configuración de Airflow. Para obtener más información de variables disponibles en Airflow, consulta Referencia de plantillas en Airflow en la documentación de Google Cloud.

Sin cambiar el DAG ni crear la variable env_vars, la tarea ex-kube-templates del ejemplo falla porque la variable no existe. Crea esta variable en la IU de Airflow o con Google Cloud CLI:

IU de Airflow

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variable de lista, haz clic en Agregar un registro nuevo.

  4. En la página Agregar variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

Si tu entorno usa Airflow 1, ejecuta el siguiente comando en su lugar:

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variables, haz clic en la pestaña Crear.

  4. En la página Variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

gcloud

Ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Si tu entorno usa Airflow 1, ejecuta el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Reemplaza lo siguiente:

  • ENVIRONMENT por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno

En el siguiente ejemplo, se muestra cómo usar plantillas de Jinja con KubernetesPodOperator:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Use Secrets y ConfigMaps de Kubernetes

Un Secret de Kubernetes es un objeto que contiene datos sensibles. Un ConfigMap de Kubernetes es un objeto que contiene datos no confidenciales en pares clave-valor.

En Cloud Composer 2, puede crear Secrets y ConfigMaps con Google Cloud CLI, API o Terraform y, luego, accede a ellas desde KubernetesPodOperator

Acerca de los archivos de configuración YAML

Cuando creas un Secret de Kubernetes o un ConfigMap con Google Cloud CLI y en la API, debes proporcionar un archivo en formato YAML. Este archivo debe seguir el mismo formato que usan los Secrets y ConfigMaps de Kubernetes. Documentación de Kubernetes proporciona muchas muestras de código de ConfigMaps y Secrets. Para comenzar, puedes ver la página Cómo distribuir credenciales de forma segura con objetos secretos y ConfigMaps.

Al igual que en los secretos de Kubernetes, usa la representación en Base64 cuando definas valores en Secrets.

Para codificar un valor, puedes usar el siguiente comando (esta es una de muchas formas para obtener un valor codificado en base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Resultado:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Los siguientes dos ejemplos de archivos YAML se usan en muestras más adelante en esta guía. Ejemplo de archivo de configuración YAML para un Secret de Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Otro ejemplo que demuestra cómo incluir archivos. Al igual que en el ejemplo anterior, primero codifica el contenido de un archivo (cat ./key.json | base64) y, luego, proporciona este valor en el archivo YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Ejemplo de archivo de configuración YAML para un ConfigMap. No es necesario que uses la representación base64 en los ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Administra Secrets de Kubernetes

En Cloud Composer 2, creas secretos con Google Cloud CLI y kubectl:

  1. Obtén información sobre el clúster de tu entorno:

    1. Ejecuta el siguiente comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Reemplaza lo siguiente:

      • ENVIRONMENT por el nombre de tu entorno
      • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.

      El resultado de este comando usa el siguiente formato: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. Para obtener el ID del clúster de GKE, copia el resultado después de /clusters/ (finaliza en -gke).

    3. Para obtener la zona, copia el resultado después de /zones/.

  2. Conéctate a tu clúster de GKE con el siguiente comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Reemplaza lo siguiente:

    • CLUSTER_ID: Es el ID de clúster del entorno.
    • PROJECT_ID: El ID del proyecto
    • ZONE por la zona en la que se encuentra el clúster del entorno
  3. Crea secretos de Kubernetes:

    En los siguientes comandos, se demuestran dos enfoques diferentes para crear secretos de Kubernetes. El enfoque de --from-literal usa pares clave-valor. El enfoque --from-file usa el contenido del archivo.

    • Para crear un Secret de Kubernetes proporcionando pares clave-valor, ejecuta el siguiente comando: siguiente comando. En este ejemplo, se crea un Secret llamado airflow-secrets que tiene un campo sql_alchemy_conn con el valor test_value.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Para crear un Secret de Kubernetes proporcionando contenido del archivo, ejecuta el comando siguiente comando. En este ejemplo, se crea un Secret llamado service-account que tiene el campo service-account.json con el elemento valor tomado del contenido de un archivo ./key.json local.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Usa Secrets de Kubernetes en tus DAG

En este ejemplo, se muestran dos formas de usar Secrets de Kubernetes: como entorno variable y como un volumen activado por el Pod.

Se configura el primer Secret, airflow-secrets a una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de una Airflow o variable de entorno de Cloud Composer).

El segundo Secret, service-account, activa service-account.json, un archivo con un token de cuenta de servicio, a /var/secrets/google.

A continuación, se muestra cómo se ven los objetos Secret:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

El nombre del primer Secret de Kubernetes se define en la variable secret_env. Este Secret se llama airflow-secrets. El parámetro deploy_type especifica que debe exponerse como una variable de entorno. La variable de entorno el nombre es SQL_CONN, como se especifica en el parámetro deploy_target. Por último, la de la variable de entorno SQL_CONN se establece en el valor de Tecla sql_alchemy_conn.

El nombre del segundo Secret de Kubernetes se define en la variable secret_volume. Este Secret se llama service-account. Se expone como un Volume, como se especifica en el parámetro deploy_type. La ruta del archivo a la activación, deploy_target, es /var/secrets/google. Por último, el key de la El secreto que se almacena en deploy_target es service-account.json.

La configuración del operador tiene el siguiente aspecto:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Información sobre el proveedor de Kubernetes para CNCF

KubernetesPodOperator se implementa en apache-airflow-providers-cncf-kubernetes proveedor.

Para obtener notas de la versión detalladas del proveedor de Kubernetes para CNCF, consulta Sitio web del proveedor de Kubernetes de CNCF.

Versión 6.0.0

En la versión 6.0.0 del paquete del proveedor de Kubernetes para CNCF, Se usa la conexión kubernetes_default de forma predeterminada en KubernetesPodOperator.

Si especificaste una conexión personalizada en la versión 5.0.0, esta todavía lo usa el operador. Para volver a usar kubernetes_default por lo que recomendamos que ajustes tus DAG según corresponda.

Versión 5.0.0

Esta versión incorpora algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Los más importantes se relacionan con la conexión kubernetes_default, que no se usa en la versión 5.0.0.

  • Se debe modificar la conexión kubernetes_default. Configuración de Kubernetes la ruta de acceso se debe establecer en /home/airflow/composer_kube_config (como se muestra en la siguiente imagen). Como alternativa, config_file debe se agregará a la configuración de KubernetesPodOperator (como se muestra en siguiente ejemplo de código).
Campo de ruta de configuración de Kube en la IU de Airflow
Figura 1. IU de Airflow, modificando la conexión kubernetes_default (haz clic para ampliar)
  • Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Para obtener más información sobre la versión 5.0.0, consulta las notas de la versión del proveedor de CNCF Kubernetes.

Soluciona problemas

En esta sección, se proporcionan sugerencias para solucionar problemas habituales de KubernetesPodOperator:

Visualiza registros

Cuando solucionas problemas, puedes verificar los registros en el siguiente orden:

  1. Registros de tareas de Airflow:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

    3. Ve a la pestaña DAG.

    4. Haz clic en el nombre del DAG y, luego, en la ejecución del DAG para ver los detalles y los registros.

  2. Registros del programador de Airflow:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Registros.

    3. Inspecciona los registros del programador de Airflow.

  3. Registros del Pod en la consola de Google Cloud, en GKE de las cargas de trabajo. Estos registros incluyen el archivo YAML de definición de Pod, los eventos de Pods y Detalles del Pod.

Códigos de retorno distintos de cero

Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común es ejecutar una secuencia de comandos de shell como punto de entrada del contenedor para varias operaciones en el contenedor.

Si estás escribiendo un script de este tipo, te recomendamos que incluyas el Comando set -e en la parte superior de la secuencia de comandos para que haya comandos con errores en ella finalizar la secuencia de comandos y propagar la falla a la instancia de tarea de Airflow.

Tiempos de espera de los pods

El tiempo de espera predeterminado para KubernetesPodOperator es de 120 segundos, puede provocar tiempos de espera antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Para verificarlo, observa los errores a nivel del Pod con el Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

No se pudo establecer una conexión nueva

La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para verificar si tu clúster se está actualizando, en la consola de Google Cloud, ve a Clústeres de Kubernetes y busca el ícono de carga junto nombre del clúster del entorno.

¿Qué sigue?