Usar KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se describe cómo usar KubernetesPodOperator para implementar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer.

KubernetesPodOperator inicia pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan pods de Kubernetes en un clúster especificado, que puede ser un clúster independiente que no está relacionado con tu entorno. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en la imagen de archivo del trabajador de Cloud Composer.

Antes de comenzar

Consulta la siguiente lista de diferencias entre KubernetesPodOperator en Cloud Composer 3 y Cloud Composer 2, y asegúrate de que tus DAG sean compatibles:

  • No es posible crear espacios de nombres personalizados en Cloud Composer 3. Los pods siempre se ejecutan en el espacio de nombres composer-user-workloads, incluso si se especifica un espacio de nombres diferente. Los pods de este espacio de nombres tienen acceso a los recursos de tu proyecto y a la red de VPC (si está habilitada) sin configuración adicional.

  • No se pueden crear Secrets ni ConfigMaps de Kubernetes con la API de Kubernetes. En su lugar, Cloud Composer proporciona comandos de Google Cloud CLI, recursos de Terraform y la API de Cloud Composer para administrar secretos y ConfigMaps de Kubernetes. Para obtener más información, consulta Cómo usar secretos y ConfigMaps de Kubernetes.

  • No es posible implementar cargas de trabajo personalizadas en Cloud Composer 3. Solo se pueden modificar los secretos y los ConfigMaps de Kubernetes, pero no se pueden realizar otros cambios de configuración.

  • Los requisitos de recursos (CPU, memoria y almacenamiento) se deben especificar con los valores admitidos.

  • Al igual que en Cloud Composer 2, la configuración de afinidad de pod no está disponible. Si quieres usar la afinidad de Pods, usa los operadores de GKE para iniciar Pods en un clúster diferente.

Acerca de KubernetesPodOperator en Cloud Composer 3

En esta sección, se describe cómo funciona KubernetesPodOperator en Cloud Composer 3.

Uso de recursos

En Cloud Composer 3, el clúster de tu entorno se escala automáticamente. Las cargas de trabajo adicionales que ejecutas con KubernetesPodOperator se escalan de forma independiente de tu entorno. Tu entorno no se ve afectado por el aumento de la demanda de recursos, pero el clúster de tu entorno aumenta y disminuye según la demanda de recursos.

Los precios de las cargas de trabajo adicionales que ejecutas en el clúster de tu entorno siguen el modelo de precios de Cloud Composer 3 y usan los SKU de Cloud Composer 3.

Cloud Composer 3 usa clústeres de Autopilot que presentan el concepto de clases de procesamiento:

  • Cloud Composer solo admite la clase de procesamiento general-purpose.

  • De forma predeterminada, si no se selecciona ninguna clase, se asume la clase general-purpose cuando creas pods con KubernetesPodOperator.

  • Cada clase está asociada con propiedades y límites de recursos específicos. Puedes obtener información sobre ellos en la documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase general-purpose pueden usar hasta 110 GiB de memoria.

Acceso a los recursos del proyecto

En Cloud Composer 3, el clúster de tu entorno se encuentra en el proyecto del inquilino. Los pods se ejecutan en el clúster del entorno, en un espacio de nombres aislado.

En Cloud Composer 3, los pods siempre se ejecutan en el espacio de nombres composer-user-workloads, incluso si se especifica un espacio de nombres diferente. Los pods de este espacio de nombres pueden acceder a los Google Cloud recursos de tu proyecto y a tu red de VPC (si está abilitada) sin configuración adicional. La cuenta de servicio de tu entorno se usa para acceder a estos recursos. No es posible especificar otra cuenta de servicio.

Configuración mínima

Para crear un KubernetesPodOperator, solo se requieren los parámetros name, image y task_id del pod. /home/airflow/composer_kube_config contiene credenciales para autenticarse en GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuración adicional

En este ejemplo, se muestran los parámetros adicionales que puedes configurar en KubernetesPodOperator.

Consulta los siguientes recursos para obtener más información:

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Usa plantillas de Jinja

Airflow admite plantillas de Jinja en DAG.

Debes declarar los parámetros de Airflow obligatorios (task_id, name y image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

El parámetro env_vars del ejemplo se establece desde una variable de Airflow llamada my_value. El DAG de ejemplo obtiene su valor de la variable de plantilla vars en Airflow. Airflow tiene más variables que proporcionan acceso a diferentes tipos de información. Por ejemplo, puedes usar la variable de plantilla conf para acceder a los valores de las opciones de configuración de Airflow. Para obtener más información y la lista de variables disponibles en Airflow, consulta la Referencia de plantillas en la documentación de Airflow.

Sin cambiar el DAG ni crear la variable env_vars, la tarea ex-kube-templates del ejemplo falla porque la variable no existe. Crea esta variable en la IU de Airflow o con Google Cloud CLI:

IU de Airflow

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variable de lista, haz clic en Agregar un registro nuevo.

  4. En la página Agregar variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

gcloud

Ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Reemplaza lo siguiente:

  • ENVIRONMENT por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.

En el siguiente ejemplo, se muestra cómo usar plantillas de Jinja con KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Usa Secrets y ConfigMaps de Kubernetes

Un Secreto de Kubernetes es un objeto que contiene datos sensibles. Un ConfigMap de Kubernetes es un objeto que contiene datos no confidenciales en pares clave-valor.

En Cloud Composer 3, puedes crear secretos y ConfigMaps con Google Cloud CLI, la API o Terraform y, luego, acceder a ellos desde KubernetesPodOperator:

  • Con la CLI y la API de Google Cloud, proporcionas un archivo de configuración YAML.
  • Con Terraform, defines Secrets y ConfigMaps como recursos independientes en los archivos de configuración de Terraform.

Información acerca de los archivos de configuración YAML

Cuando creas un Secret o un ConfigMap de Kubernetes con Google Cloud CLI y la API, proporcionas un archivo en formato YAML. Este archivo debe seguir el mismo formato que usan los Secrets y ConfigMaps de Kubernetes. La documentación de Kubernetes proporciona muchas muestras de código de ConfigMaps y Secrets. Para comenzar, puedes ver la página Cómo distribuir credenciales de forma segura con Secrets y ConfigMaps.

Al igual que en los secretos de Kubernetes, usa la representación en Base64 cuando definas valores en Secrets.

Para codificar un valor, puedes usar el siguiente comando (esta es una de las muchas formas de obtener un valor codificado en base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Resultado:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Los siguientes dos ejemplos de archivos YAML se usan en muestras más adelante en esta guía. Ejemplo de archivo de configuración YAML para un secreto de Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Otro ejemplo que demuestra cómo incluir archivos. Al igual que en el ejemplo anterior, primero codifica el contenido de un archivo (cat ./key.json | base64) y, luego, proporciona este valor en el archivo YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Ejemplo de archivo de configuración YAML para un ConfigMap. No es necesario que uses la representación base64 en los ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Administra los Secrets de Kubernetes

gcloud

Crea un Secret

Para crear un secreto de Kubernetes, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración del secreto.

Ejemplo:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Cómo actualizar un Secret

Para actualizar un Secret de Kubernetes, ejecuta el siguiente comando. El nombre del Secret se tomará del archivo YAML especificado y se reemplazará su contenido.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración del secreto. Especifica el nombre del Secret en el campo name metadata > de este archivo.

List Secrets

Para obtener una lista de Secrets y sus campos para un entorno, ejecuta el siguiente comando. Los valores clave en el resultado se reemplazarán por asteriscos.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Obtén los detalles del Secret

Para obtener información detallada sobre un Secret, ejecuta el siguiente comando. Los valores clave en el resultado se reemplazarán por asteriscos.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • SECRET_NAME: Es el nombre del Secret, tal como se definió en el campo name metadata > del archivo YAML con la configuración del Secret.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Cómo borrar un secreto

Para borrar un Secret, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: Es el nombre del Secret, tal como se definió en el campo name metadata > del archivo YAML con la configuración del Secret.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

API

Crea un Secret

  1. Crea una solicitud a la API de environments.userWorkloadsSecrets.create.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del secreto nuevo.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y los valores codificados en Base64 para el Secret.

Ejemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Cómo actualizar un Secret

  1. Crea una solicitud a la API de environments.userWorkloadsSecrets.update.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del secreto.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y los valores codificados en Base64 para el Secret. Se reemplazarán los valores.

Ejemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

List Secrets

Crea una solicitud a la API de environments.userWorkloadsSecrets.list. Los valores clave en el resultado se reemplazarán por asteriscos. Es posible usar la paginación con esta solicitud. Consulta la referencia de la solicitud para obtener más detalles.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Obtén los detalles del Secret

Crea una solicitud a la API de environments.userWorkloadsSecrets.get. Los valores clave en el resultado se reemplazarán por asteriscos.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Cómo borrar un secreto

Crea una solicitud a la API de environments.userWorkloadsSecrets.delete.

Ejemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

El recurso google_composer_user_workloads_secret define un Secret de Kubernetes, con claves y valores definidos en el bloque data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Es el nombre del recurso del entorno, que contiene la definición del entorno en Terraform. El nombre del entorno real también se especifica en este recurso.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_NAME: Es el nombre del Secret.
  • KEY_NAME: Una o más claves para este Secret.
  • KEY_VALUE: Es el valor codificado en Base64 de la clave. Puedes usar la función base64encode para codificar el valor (consulta el ejemplo).

Los siguientes dos ejemplos de Secrets de Kubernetes se usan en muestras más adelante en esta guía.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Otro ejemplo que demuestra cómo incluir archivos. Puedes usar la función file para leer el contenido del archivo como una cadena y, luego, codificarlo en Base64:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Usa Secrets de Kubernetes en tus DAG

En este ejemplo, se muestran dos formas de usar los Secret de Kubernetes: como una variable de entorno y como un volumen activado por el Pod.

El primer secreto, airflow-secrets, se establece en una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de en una variable de entorno de Airflow o Cloud Composer).

El segundo Secret, service-account, activa service-account.json, un archivo con un token de cuenta de servicio, en /var/secrets/google.

Los objetos Secret se ven de la siguiente forma:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

El nombre del primer Secret de Kubernetes se define en la variable secret_env. Este Secret se llama airflow-secrets. El parámetro deploy_type especifica que se debe exponer como una variable de entorno. El nombre de la variable de entorno es SQL_CONN, como se especifica en el parámetro deploy_target. Por último, el valor de la variable de entorno SQL_CONN se establece en el valor de la clave sql_alchemy_conn.

El nombre del segundo Secret de Kubernetes se define en la variable secret_volume. Este Secret se llama service-account. Se expone como un volumen, como se especifica en el parámetro deploy_type. La ruta del archivo que se activará, deploy_target, es /var/secrets/google. Por último, el key del secreto que se almacena en deploy_target es service-account.json.

La configuración del operador tiene el siguiente aspecto:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Administra ConfigMaps de Kubernetes

gcloud

Cómo crear un ConfigMap

Para crear un ConfigMap, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración de ConfigMap.

Ejemplo:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

Cómo actualizar un ConfigMap

Para actualizar un ConfigMap, ejecuta el siguiente comando. El nombre de ConfigMap se tomará del archivo YAML especificado y se reemplazará su contenido.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración de ConfigMap. Especifica el nombre del ConfigMap en el campo name metadata > de este archivo.

Cómo enumerar ConfigMaps

Para obtener una lista de ConfigMaps y sus campos para un entorno, ejecuta el siguiente comando: Los valores de clave en el resultado se mostrarán tal como están.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Obtén los detalles del ConfigMap

Para obtener información detallada sobre un ConfigMap, ejecuta el siguiente comando. Los valores clave en el resultado se mostrarán tal como están.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • CONFIG_MAP_NAME: Es el nombre del ConfigMap, tal como se definió en el campo name > metadata del archivo YAML con la configuración del ConfigMap.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Cómo borrar un ConfigMap

Para borrar un ConfigMap, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: Es el nombre del ConfigMap, tal como se definió en el campo name > metadata del archivo YAML con la configuración del ConfigMap.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

API

Cómo crear un ConfigMap

  1. Crea una solicitud a la API de environments.userWorkloadsConfigMaps.create.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del nuevo ConfigMap.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y los valores del ConfigMap.

Ejemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

Cómo actualizar un ConfigMap

  1. Crea una solicitud a la API de environments.userWorkloadsConfigMaps.update.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del ConfigMap.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y los valores del ConfigMap. Se reemplazarán los valores.

Ejemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

Cómo enumerar ConfigMaps

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.list. Los valores clave en el resultado se mostrarán tal como están. Es posible usar la paginación con esta solicitud. Consulta la referencia de la solicitud para obtener más detalles.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Obtén los detalles del ConfigMap

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.get. Los valores de clave en el resultado se mostrarán tal como están.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Cómo borrar un ConfigMap

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.delete.

Ejemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

El recurso google_composer_user_workloads_config_map define un ConfigMap, con claves y valores definidos en el bloque data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Es el nombre del recurso del entorno, que contiene la definición del entorno en Terraform. El nombre del entorno real también se especifica en este recurso.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_NAME: Es el nombre del ConfigMap.
  • KEY_NAME: Una o más claves para este ConfigMap.
  • KEY_VALUE: Es el valor de la clave.

Ejemplo:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

Usa ConfigMaps en tus DAG

En este ejemplo, se muestra cómo usar ConfigMaps en tus DAG.

En el siguiente ejemplo, se pasa un ConfigMap en el parámetro configmaps. Todas las claves de este ConfigMap están disponibles como variables de entorno:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

En el siguiente ejemplo, se muestra cómo activar un ConfigMap como volumen:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Información sobre el proveedor de Kubernetes de CNCF

KubernetesPodOperator se implementa en el proveedor apache-airflow-providers-cncf-kubernetes.

Para obtener notas de la versión detalladas del proveedor de Kubernetes de CNCF, consulta el sitio web del proveedor de Kubernetes de CNCF.

Requisitos de los recursos

Cloud Composer 3 admite los siguientes valores para los requisitos de recursos. Para ver un ejemplo del uso de requisitos de recursos, consulta Configuración adicional.

Recurso Mínimo Máximo Paso
CPU 0.25 32 Valores de paso: 0.25, 0.5, 1, 2, 4, 6, 8, 10, …, 32. Los valores solicitados se redondean al valor de paso admitido más cercano (por ejemplo, de 5 a 6).
Memoria 2G (GB) 128 G (GB) Valores de paso: 2, 3, 4, 5, …, 128. Los valores solicitados se redondean al valor de paso compatible más cercano (por ejemplo, de 3.5G a 4G).
Almacenamiento - 100G (GB) Cualquier valor. Si se solicitan más de 100 GB, solo se proporcionan 100 GB.

Para obtener más información sobre las unidades de recursos en Kubernetes, consulta Unidades de recursos en Kubernetes.

Soluciona problemas

En esta sección, se proporcionan sugerencias para solucionar problemas habituales de KubernetesPodOperator:

Ver registros

Cuando soluciones problemas, puedes revisar los registros en el siguiente orden:

  1. Registros de tareas de Airflow:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

    3. Ve a la pestaña DAG.

    4. Haz clic en el nombre del DAG y, luego, en la ejecución del DAG para ver los detalles y los registros.

  2. Registros del programador de Airflow:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Registros.

    3. Inspecciona los registros del programador de Airflow.

  3. Registros de cargas de trabajo del usuario:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Monitoring.

    3. Selecciona Cargas de trabajo de los usuarios.

    4. Inspecciona la lista de cargas de trabajo ejecutadas. Puedes ver los registros y la información de uso de recursos de cada carga de trabajo.

Códigos de retorno distintos de cero

Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.

Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.

Tiempos de espera de los pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Si deseas verificar esto, revisa los errores en el nivel del pod mediante los Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

Las tareas de KubernetesPodOperator fallan cuando se ejecuta una gran cantidad de tareas.

Cuando tu entorno ejecuta una gran cantidad de tareas de KubernetesPodOperator o KubernetesExecutor al mismo tiempo, Cloud Composer 3 no acepta tareas nuevas hasta que se terminan algunas de las tareas existentes.

Para obtener más información sobre cómo solucionar este problema, consulta Cómo solucionar problemas de tareas de KubernetesExecutor.

¿Qué sigue?