Usar KubernetesPodOperador

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se describe cómo usar KubernetesPodOperator para desplegar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer.

KubernetesPodOperator inicia pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan pods de Kubernetes en un clúster específico, que puede ser un clúster independiente que no esté relacionado con tu entorno. También puedes crear y eliminar clústeres con los operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles en el repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en la imagen de trabajador de Cloud Composer.

Antes de empezar

  • Te recomendamos que uses la versión más reciente de Cloud Composer. Como mínimo, esta versión debe admitirse como parte de la política de asistencia y retirada.
  • Asegúrate de que tu entorno tenga suficientes recursos. Si se lanzan pods en un entorno con pocos recursos, se pueden producir errores en el trabajador y el programador de Airflow.

Configurar los recursos del entorno de Cloud Composer

Cuando creas un entorno de Cloud Composer, especificas sus parámetros de rendimiento, incluidos los del clúster del entorno. Lanzar pods de Kubernetes en el clúster de entorno puede provocar una competencia por los recursos del clúster, como la CPU o la memoria. Como el programador y los trabajadores de Airflow están en el mismo clúster de GKE, no funcionarán correctamente si la competencia provoca una falta de recursos.

Para evitar que se agoten los recursos, lleva a cabo una o varias de las siguientes acciones:

Crear un grupo de nodos

La forma preferida de evitar la falta de recursos en el entorno de Cloud Composer es crear un grupo de nodos y configurar los pods de Kubernetes para que se ejecuten usando solo los recursos de ese grupo.

Consola

  1. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  2. Haz clic en el nombre de tu entorno.

  3. En la página Detalles del entorno, vaya a la pestaña Configuración del entorno.

  4. En la sección Recursos > Clúster de GKE, siga el enlace Ver detalles del clúster.

  5. Crea un grupo de nodos como se describe en el artículo Añadir un grupo de nodos.

gcloud

  1. Determina el nombre del clúster de tu entorno:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Sustituye:

    • ENVIRONMENT_NAME con el nombre del entorno.
    • LOCATION con la región en la que se encuentra el entorno.
  2. El resultado contiene el nombre del clúster de tu entorno. Por ejemplo, puede ser europe-west3-example-enviro-af810e25-gke.

  3. Crea un grupo de nodos como se describe en el artículo Añadir un grupo de nodos.

Aumentar el número de nodos de tu entorno

Si aumentas el número de nodos de tu entorno de Cloud Composer, también aumentará la potencia de computación disponible para tus cargas de trabajo. Este aumento no proporciona recursos adicionales para las tareas que requieren más CPU o RAM de las que ofrece el tipo de máquina especificado.

Para aumentar el número de nodos, actualiza tu entorno.

Especifica el tipo de máquina adecuado

Durante la creación del entorno de Cloud Composer, puedes especificar un tipo de máquina. Para asegurarte de que haya recursos disponibles, especifica un tipo de máquina para el tipo de computación que se produce en tu entorno de Cloud Composer.

Configuración mínima

Para crear un KubernetesPodOperator, solo se necesitan los parámetros name, image y task_id del pod que se va a usar. El /home/airflow/composer_kube_config contiene las credenciales para autenticarte en GKE.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Configuración de afinidad de pods

Cuando configuras el parámetro affinity en KubernetesPodOperator, controlas en qué nodos se programan los pods, como los nodos de un grupo de nodos concreto. En este ejemplo, el operador solo se ejecuta en grupos de nodos llamados pool-0 y pool-1. Los nodos de tu entorno de Cloud Composer 1 están en default-pool, por lo que tus pods no se ejecutan en los nodos de tu entorno.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Como el ejemplo está configurado, la tarea falla. Si consultas los registros, la tarea falla porque no existen los grupos de nodos pool-0 y pool-1.

Para asegurarte de que los grupos de nodos de values existen, haz uno de los siguientes cambios en la configuración:

  • Si has creado un grupo de nodos anteriormente, sustituye pool-0 y pool-1 por los nombres de tus grupos de nodos y vuelve a subir tu DAG.

  • Crea un grupo de nodos llamado pool-0 o pool-1. Puedes crear ambos, pero la tarea solo necesita uno para completarse.

  • Sustituye pool-0 y pool-1 por default-pool, que es el grupo predeterminado que usa Airflow. A continuación, vuelve a subir el DAG.

Después de hacer los cambios, espera unos minutos a que se actualice tu entorno. A continuación, vuelve a ejecutar la tarea ex-pod-affinity y comprueba que se haya completado correctamente.ex-pod-affinity

Configuración adicional

En este ejemplo se muestran parámetros adicionales que puedes configurar en KubernetesPodOperator.

Consulta los siguientes recursos para obtener más información:

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Usar plantillas Jinja

Airflow admite plantillas Jinja en los DAGs.

Debe declarar los parámetros de Airflow obligatorios (task_id, name y image) con el operador. Como se muestra en el siguiente ejemplo, puedes usar Jinja para crear plantillas de todos los demás parámetros, incluidos cmds, arguments, env_vars y config_file.

El parámetro env_vars del ejemplo se define a partir de una variable de Airflow llamada my_value. El DAG de ejemplo obtiene su valor de la variable de plantilla vars de Airflow. Airflow tiene más variables que proporcionan acceso a diferentes tipos de información. Por ejemplo, puedes usar la variable de plantilla conf para acceder a los valores de las opciones de configuración de Airflow. Para obtener más información y la lista de variables disponibles en Airflow, consulta la referencia de plantillas en la documentación de Airflow.

Sin cambiar el DAG ni crear la variable env_vars, la tarea ex-kube-templates del ejemplo falla porque la variable no existe. Crea esta variable en la interfaz de usuario de Airflow o con Google Cloud CLI:

Interfaz de usuario de Airflow

  1. Ve a la interfaz de Airflow.

  2. En la barra de herramientas, seleccione Administrar > Variables.

  3. En la página List Variable (Variable de lista), haz clic en Add a new record (Añadir un nuevo registro).

  4. En la página Añadir variable, introduce la siguiente información:

    • Tecla:my_value
    • Valor: example_value
  5. Haz clic en Guardar.

Si tu entorno usa Airflow 1, ejecuta el siguiente comando:

  1. Ve a la interfaz de Airflow.

  2. En la barra de herramientas, seleccione Administrar > Variables.

  3. En la página Variables, haz clic en la pestaña Crear.

  4. En la página Variable, introduce la siguiente información:

    • Tecla:my_value
    • Valor: example_value
  5. Haz clic en Guardar.

gcloud

Introduce el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Si tu entorno usa Airflow 1, ejecuta el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Sustituye:

  • ENVIRONMENT con el nombre del entorno.
  • LOCATION con la región en la que se encuentra el entorno.

En el siguiente ejemplo se muestra cómo usar plantillas Jinja con KubernetesPodOperator:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Usar secretos y ConfigMaps de Kubernetes

Un secreto de Kubernetes es un objeto que contiene datos sensibles. Un ConfigMap de Kubernetes es un objeto que contiene datos no confidenciales en pares clave-valor.

En Cloud Composer 2, puedes crear secretos y ConfigMaps con la CLI de Google Cloud, la API o Terraform, y luego acceder a ellos desde KubernetesPodOperator.

Acerca de los archivos de configuración YAML

Cuando creas un Secret o un ConfigMap de Kubernetes con la CLI de Google Cloud y la API, proporcionas un archivo en formato YAML. Este archivo debe tener el mismo formato que los secretos y los ConfigMaps de Kubernetes. La documentación de Kubernetes proporciona muchos ejemplos de código de ConfigMaps y Secrets. Para empezar, puedes consultar la página Distribuir credenciales de forma segura mediante secretos y ConfigMaps.

Al igual que en los secretos de Kubernetes, usa la representación en base64 cuando definas valores en los secretos.

Para codificar un valor, puedes usar el siguiente comando (esta es una de las muchas formas de obtener un valor codificado en base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Resultado:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

En los ejemplos que se muestran más adelante en esta guía se utilizan los dos archivos YAML que se indican a continuación. Ejemplo de archivo de configuración YAML de un secreto de Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Otro ejemplo que muestra cómo incluir archivos. Al igual que en el ejemplo anterior, primero codifica el contenido de un archivo (cat ./key.json | base64) y, a continuación, proporciona este valor en el archivo YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Ejemplo de archivo de configuración YAML de un ConfigMap. No es necesario que utilices la representación base64 en ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Gestionar secretos de Kubernetes

En Cloud Composer 2, puedes crear secretos con Google Cloud CLI y kubectl:

  1. Obtén información sobre el clúster de tu entorno:

    1. Ejecuta el siguiente comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Sustituye:

      • ENVIRONMENT con el nombre de tu entorno.
      • LOCATION con la región en la que se encuentra el entorno de Cloud Composer.

      El resultado de este comando tiene el siguiente formato: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Para obtener el ID del clúster de GKE, copia el resultado después de /clusters/ (termina en -gke).

    3. Para obtener la zona, copia el resultado después de /zones/.

  2. Conéctate a tu clúster de GKE con el siguiente comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Sustituye:

    • CLUSTER_ID: el ID del clúster del entorno.
    • PROJECT_ID: el ID de proyecto.
    • ZONE con la zona en la que se encuentra el clúster del entorno.
  3. Crea secretos de Kubernetes:

    En los siguientes comandos se muestran dos enfoques diferentes para crear secretos de Kubernetes. El enfoque --from-literal usa pares clave-valor. El enfoque --from-file usa el contenido de los archivos.

    • Para crear un secreto de Kubernetes proporcionando pares clave-valor, ejecuta el siguiente comando. En este ejemplo, se crea un secreto llamado airflow-secrets que tiene un campo sql_alchemy_conn con el valor test_value.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Para crear un secreto de Kubernetes proporcionando el contenido de un archivo, ejecuta el siguiente comando. En este ejemplo se crea un secreto llamado service-account que tiene el campo service-account.json con el valor tomado del contenido de un archivo ./key.json local.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Usar secretos de Kubernetes en tus DAGs

En este ejemplo se muestran dos formas de usar los secretos de Kubernetes: como variable de entorno y como volumen montado por el pod.

El primer secreto, airflow-secrets, se asigna a una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de a una variable de entorno de Airflow o Cloud Composer).

El segundo secreto, service-account, monta service-account.json, un archivo con un token de cuenta de servicio, en /var/secrets/google.

Este es el aspecto de los objetos secretos:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

El nombre del primer secreto de Kubernetes se define en la variable secret_env. Este secreto se llama airflow-secrets. El parámetro deploy_type especifica que debe exponerse como una variable de entorno. El nombre de la variable de entorno es SQL_CONN, tal como se especifica en el parámetro deploy_target. Por último, el valor de la variable de entorno SQL_CONN se asigna al valor de la clave sql_alchemy_conn.

El nombre del segundo secreto de Kubernetes se define en la variable secret_volume. Este secreto se llama service-account. Se expone como un volumen, tal como se especifica en el parámetro deploy_type. La ruta del archivo que se va a montar, deploy_target, es /var/secrets/google. Por último, el key del secreto almacenado en deploy_target es service-account.json.

Este es el aspecto de la configuración del operador:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Información sobre el proveedor de Kubernetes de la CNCF

KubernetesPodOperator se implementa en el proveedor apache-airflow-providers-cncf-kubernetes.

Para consultar las notas de la versión detalladas del proveedor de Kubernetes de CNCF, visita el sitio web del proveedor de Kubernetes de CNCF.

Versión 6.0.0

En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF, KubernetesPodOperator usa la conexión kubernetes_default de forma predeterminada.

Si has especificado una conexión personalizada en la versión 5.0.0, el operador seguirá usando esa conexión. Para volver a usar la kubernetes_default conexión, puede que tengas que ajustar tus DAGs en consecuencia.

Versión 5.0.0

Esta versión introduce algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Las más importantes están relacionadas con la kubernetes_defaultconexión, que no se usa en la versión 5.0.0.

  • Es necesario modificar la conexión kubernetes_default. La ruta de configuración de Kubernetes debe ser /home/airflow/composer_kube_config (como se muestra en la siguiente figura). Como alternativa, se debe añadir config_file a la configuración de KubernetesPodOperator (como se muestra en el siguiente ejemplo de código).
Campo de ruta de configuración de Kube en la interfaz de usuario de Airflow
Figura 1. Interfaz de usuario de Airflow, modificación de la conexión kubernetes_default (haz clic para ampliar)
  • Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Para obtener más información sobre la versión 5.0.0, consulta las notas de la versión del proveedor de Kubernetes de CNCF.

Solución de problemas

En esta sección se ofrecen consejos para solucionar problemas habituales de KubernetesPodOperator:

Ver registros

Cuando intentes solucionar problemas, puedes consultar los registros en el siguiente orden:

  1. Registros de tareas de Airflow:

    1. En la Google Cloud consola, ve a la página Entornos.

      Ir a Entornos

    2. En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.

    3. Ve a la pestaña DAGs.

    4. Haga clic en el nombre del DAG y, a continuación, en la ejecución del DAG para ver los detalles y los registros.

  2. Registros del programador de Airflow:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Registros.

    3. Inspecciona los registros del programador de Airflow.

  3. Registros de pods en la consola, en cargas de trabajo de GKE. Google Cloud Estos registros incluyen el archivo YAML de definición de pod, los eventos de pod y los detalles del pod.

Códigos de retorno distintos de cero

Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera completada correctamente o no. Los códigos de retorno distintos de cero indican un error.

Un patrón habitual es ejecutar un script de shell como punto de entrada del contenedor para agrupar varias operaciones en el contenedor.

Si vas a escribir una secuencia de comandos de este tipo, te recomendamos que incluyas el comando set -e en la parte superior para que, si se produce un error en un comando de la secuencia, esta se detenga y el error se propague a la instancia de tarea de Airflow.

Tiempos de espera de pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que se agote el tiempo de espera antes de que se descarguen imágenes más grandes. Puedes aumentar el tiempo de espera modificando el parámetro startup_timeout_seconds al crear el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un pod, el registro específico de la tarea está disponible en la interfaz de usuario de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Los tiempos de espera de los pods también pueden producirse cuando la cuenta de servicio de Cloud Composer no tiene los permisos de gestión de identidades y accesos necesarios para realizar la tarea en cuestión. Para verificarlo, consulta los errores a nivel de pod en los paneles de control de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

No se ha podido establecer una nueva conexión

La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que vea el siguiente error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para comprobar si tu clúster se está actualizando, en la consola de Google Cloud , ve a la página Clústeres de Kubernetes y busca el icono de carga junto al nombre del clúster de tu entorno.

Siguientes pasos