Usar KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se describe cómo usar KubernetesPodOperator para implementar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer.

KubernetesPodOperator inicia Pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un clúster especificado, que puede ser un clúster separado que no está relacionado con tu entorno. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en la imagen de archivo del trabajador de Cloud Composer.

Antes de comenzar

Consulta la siguiente lista de diferencias entre KubernetesPodOperator en Cloud Composer 3 y Cloud Composer 2, y asegúrate de que tus DAG sean compatibles:

  • No es posible crear espacios de nombres personalizados en Cloud Composer 3. Los Pods siempre se ejecutan en el espacio de nombres composer-user-workloads, incluso si se especifica un espacio de nombres diferente. Los Pods de este espacio de nombres tienen acceso a los recursos y a la red de VPC de tu proyecto (si está habilitada) sin configuración adicional.

  • No es posible ejecutar varios contenedores adicionales de archivos complementarios en Cloud Composer 3. Puedes ejecutar un solo contenedor secundario si se llama airflow-xcom-sidecar.

  • No se pueden crear objetos Secret y ConfigMap de Kubernetes con la API de Kubernetes. En cambio, Cloud Composer proporciona comandos de Google Cloud CLI, recursos de Terraform y la API de Cloud Composer para administrar los objetos Secret y ConfigMap de Kubernetes. Para obtener más información, consulta Usa Secrets y ConfigMaps de Kubernetes.

  • No es posible implementar cargas de trabajo personalizadas en Cloud Composer 3. Solo se pueden modificar los objetos Secret y ConfigMap de Kubernetes, pero no es posible realizar ningún otro cambio de configuración.

  • Los requisitos de recursos (CPU, memoria y almacenamiento) se deben especificar con valores admitidos.

  • Al igual que en Cloud Composer 2, la configuración de afinidad de Pod no está disponible. Si deseas usar la afinidad de Pods, usa los operadores de GKE para iniciar Pods en un clúster diferente.

Acerca de KubernetesPodOperator en Cloud Composer 3

En esta sección, se describe cómo funciona KubernetesPodOperator en Cloud Composer 3.

Uso de recursos

En Cloud Composer 3, el clúster de tu entorno se escala automáticamente. Las cargas de trabajo adicionales que ejecutas con KubernetesPodOperator se ajustan de forma independiente de tu entorno. Tu entorno no se ve afectado por el aumento de la demanda de recursos, pero el clúster de tu entorno aumenta y disminuye según la demanda de recursos.

Los precios de las cargas de trabajo adicionales que ejecutas en el clúster de tu entorno siguen el modelo de precios de Cloud Composer 3 y usan los SKU de Cloud Composer 3.

Cloud Composer 3 usa clústeres de Autopilot que introducen la noción de clases de procesamiento:

  • Cloud Composer solo admite la clase de procesamiento general-purpose.

  • De forma predeterminada, si no se selecciona ninguna clase, se supone la clase general-purpose cuando creas Pods con KubernetesPodOperator.

  • Cada clase se asocia con propiedades y límites de recursos específicos. Puedes obtener más información sobre ellas en la documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase general-purpose pueden usar hasta 110 GiB de memoria.

Acceso a los recursos del proyecto

En Cloud Composer 3, el clúster de tu entorno se encuentra en el proyecto de inquilino y no es posible configurarlo. Los Pods se ejecutan en el clúster del entorno, en un espacio de nombres aislado.

En Cloud Composer 3, los Pods siempre se ejecutan en el espacio de nombres composer-user-workloads, incluso si se especifica uno diferente. Los Pods de este espacio de nombres pueden acceder a los recursos de tu proyecto y a tu red de VPC (si está habilitada) sin configuración adicional. Google Cloud La cuenta de servicio de tu entorno se usa para acceder a estos recursos. No es posible especificar otra cuenta de servicio.

Configuración mínima

Para crear un KubernetesPodOperator, solo se requieren los parámetros name, image y task_id del Pod. El archivo /home/airflow/composer_kube_config contiene credenciales para autenticarse en GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuración adicional

En este ejemplo, se muestran parámetros adicionales que puedes configurar en KubernetesPodOperator.

Consulta los siguientes recursos para obtener más información:

  • Para obtener información sobre cómo usar los objetos Secret y ConfigMap de Kubernetes, consulta Usa objetos Secret y ConfigMap de Kubernetes.

  • Para obtener información sobre el uso de plantillas de Jinja con KubernetesPodOperator, consulta Usa plantillas de Jinja.

  • Para obtener información sobre los valores admitidos para los requisitos de recursos (CPU, memoria y almacenamiento), consulta Requisitos de recursos.

  • Para obtener información sobre los parámetros de KubernetesPodOperator, consulta la referencia del operador en la documentación de Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Usa plantillas de Jinja

Airflow admite plantillas de Jinja en los DAGs.

Debes declarar los parámetros obligatorios de Airflow (task_id, name y image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

El parámetro env_vars del ejemplo se establece a partir de una variable de Airflow llamada my_value. El DAG de ejemplo obtiene su valor de la variable de plantilla vars en Airflow. Airflow tiene más variables que proporcionan acceso a diferentes tipos de información. Por ejemplo, puedes usar la variable de plantilla conf para acceder a los valores de las opciones de configuración de Airflow. Para obtener más información y la lista de variables disponibles en Airflow, consulta Templates reference en la documentación de Airflow.

Sin cambiar el DAG ni crear la variable env_vars, la tarea ex-kube-templates del ejemplo falla porque la variable no existe. Crea esta variable en la IU de Airflow o con Google Cloud CLI:

IU de Airflow

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Admin > Variables.

  3. En la página Variable de lista, haz clic en Agregar un registro nuevo.

  4. En la página Agregar variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

gcloud

Ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Reemplaza lo siguiente:

  • ENVIRONMENT por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.

En el siguiente ejemplo, se muestra cómo usar plantillas de Jinja con KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Usa Secrets y ConfigMaps de Kubernetes

Un Secret de Kubernetes es un objeto que contiene datos sensibles. Un ConfigMap de Kubernetes es un objeto que contiene datos no confidenciales en pares clave-valor.

En Cloud Composer 3, puedes crear Secrets y ConfigMaps con Google Cloud CLI, la API o Terraform, y, luego, acceder a ellos desde KubernetesPodOperator:

  • Con la CLI y la API de Google Cloud, proporcionas un archivo de configuración YAML.
  • Con Terraform, defines Secrets y ConfigMaps como recursos separados en los archivos de configuración de Terraform.

Acerca de los archivos de configuración YAML

Cuando creas un Secret o un ConfigMap de Kubernetes con la CLI y la API de Google Cloud, proporcionas un archivo en formato YAML. Este archivo debe seguir el mismo formato que usan los Secrets y ConfigMaps de Kubernetes. La documentación de Kubernetes proporciona muchas muestras de código de ConfigMaps y Secrets. Para comenzar, puedes consultar la página Distribute Credentials Securely Using Secrets y ConfigMaps.

Al igual que en los Secrets de Kubernetes, usa la representación en Base64 cuando definas valores en los Secrets.

Para codificar un valor, puedes usar el siguiente comando (esta es una de las muchas formas de obtener un valor codificado en base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Resultado:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

En los ejemplos que se incluyen más adelante en esta guía, se usan los siguientes dos ejemplos de archivos YAML. Ejemplo de archivo de configuración YAML para un secreto de Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Otro ejemplo que muestra cómo incluir archivos. Al igual que en el ejemplo anterior, primero codifica el contenido de un archivo (cat ./key.json | base64) y, luego, proporciona este valor en el archivo YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Ejemplo de un archivo de configuración YAML para un ConfigMap. No es necesario que uses la representación en base64 en ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Administra los Secrets de Kubernetes

gcloud

Crea un Secret

Para crear un secreto de Kubernetes, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración del secreto.

Ejemplo:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Actualiza un Secret

Para actualizar un secreto de Kubernetes, ejecuta el siguiente comando. El nombre del Secret se tomará del archivo YAML especificado y se reemplazará el contenido del Secret.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración del secreto. Especifica el nombre del Secret en el campo metadata > name de este archivo.

Enumera los secretos

Para obtener una lista de los secretos y sus campos para un entorno, ejecuta el siguiente comando. Los valores de las claves en el resultado se reemplazarán por asteriscos.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Obtener detalles del secreto

Para obtener información detallada sobre un Secret, ejecuta el siguiente comando. Los valores de las claves en el resultado se reemplazarán por asteriscos.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • SECRET_NAME: Es el nombre del Secret, tal como se definió en el campo metadata > name del archivo YAML con la configuración del Secret.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Cómo borrar un secreto

Para borrar un Secret, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: Es el nombre del Secret, tal como se definió en el campo metadata > name del archivo YAML con la configuración del Secret.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

API

Crea un Secret

  1. Crea una solicitud a la API de environments.userWorkloadsSecrets.create.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del nuevo secreto.
    2. En el cuerpo de la solicitud, en el campo data, especifica claves y valores codificados en Base64 para el secreto.

Ejemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Actualiza un Secret

  1. Crea una solicitud a la API de environments.userWorkloadsSecrets.update.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del secreto.
    2. En el cuerpo de la solicitud, en el campo data, especifica claves y valores codificados en Base64 para el secreto. Se reemplazarán los valores.

Ejemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

Enumera los secretos

Crea una solicitud a la API de environments.userWorkloadsSecrets.list. Los valores de las claves en el resultado se reemplazarán por asteriscos. Es posible usar la paginación con esta solicitud. Consulta la referencia de la solicitud para obtener más detalles.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Obtener detalles del secreto

Crea una solicitud a la API de environments.userWorkloadsSecrets.get. Los valores de las claves en el resultado se reemplazarán por asteriscos.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Cómo borrar un secreto

Crea una solicitud a la API de environments.userWorkloadsSecrets.delete.

Ejemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

El recurso google_composer_user_workloads_secret define un objeto Secret de Kubernetes, con claves y valores definidos en el bloque data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Es el nombre del recurso del entorno, que contiene la definición del entorno en Terraform. El nombre del entorno real también se especifica en este recurso.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_NAME: Es el nombre del Secret.
  • KEY_NAME: Una o más claves para este Secret.
  • KEY_VALUE: Es el valor codificado en Base64 de la clave. Puedes usar la función base64encode para codificar el valor (consulta el ejemplo).

En los siguientes dos ejemplos de Secrets de Kubernetes, se usan muestras más adelante en esta guía.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Otro ejemplo que muestra cómo incluir archivos. Puedes usar la función file para leer el contenido del archivo como una cadena y, luego, codificarlo en Base64:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Usa Secrets de Kubernetes en tus DAGs

En este ejemplo, se muestran dos formas de usar los Secret de Kubernetes: como una variable de entorno y como un volumen activado por el Pod.

El primer secreto, airflow-secrets, se establece en una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de en una variable de entorno de Airflow o Cloud Composer).

El segundo Secret, service-account, activa service-account.json, un archivo con un token de cuenta de servicio, en /var/secrets/google.

Así se ven los objetos Secret:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

El nombre del primer secreto de Kubernetes se define en la variable secret_env. Este Secret se llama airflow-secrets. El parámetro deploy_type especifica que se debe exponer como una variable de entorno. El nombre de la variable de entorno es SQL_CONN, como se especifica en el parámetro deploy_target. Por último, el valor de la variable de entorno SQL_CONN se establece en el valor de la clave sql_alchemy_conn.

El nombre del segundo secreto de Kubernetes se define en la variable secret_volume. Este Secret se llama service-account. Se expone como un volumen, según se especifica en el parámetro deploy_type. La ruta del archivo que se activará, deploy_target, es /var/secrets/google. Por último, la clave (key) del secreto que se almacena en deploy_target es service-account.json.

La configuración del operador tiene el siguiente aspecto:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Administra ConfigMaps de Kubernetes

gcloud

Crea un ConfigMap

Para crear un ConfigMap, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración de ConfigMap.

Ejemplo:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

Actualiza un ConfigMap

Para actualizar un ConfigMap, ejecuta el siguiente comando. El nombre de ConfigMap se tomará del archivo YAML especificado y se reemplazará el contenido de ConfigMap.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración de ConfigMap. Especifica el nombre del ConfigMap en el campo metadata > name de este archivo.

Enumera ConfigMaps

Para obtener una lista de los ConfigMaps y sus campos para un entorno, ejecuta el siguiente comando. Los valores de las claves en el resultado se mostrarán tal como están.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Obtén los detalles del ConfigMap

Para obtener información detallada sobre un ConfigMap, ejecuta el siguiente comando. Los valores de clave en el resultado se mostrarán tal como están.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • CONFIG_MAP_NAME: Es el nombre del ConfigMap, tal como se definió en el campo metadata > name del archivo YAML con la configuración del ConfigMap.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Borra un ConfigMap

Para borrar un ConfigMap, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: Es el nombre del ConfigMap, tal como se definió en el campo metadata > name del archivo YAML con la configuración del ConfigMap.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

API

Crea un ConfigMap

  1. Crea una solicitud a la API de environments.userWorkloadsConfigMaps.create.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del nuevo ConfigMap.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y los valores del ConfigMap.

Ejemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

Actualiza un ConfigMap

  1. Crea una solicitud a la API de environments.userWorkloadsConfigMaps.update.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI de ConfigMap.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y los valores del ConfigMap. Se reemplazarán los valores.

Ejemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

Enumera ConfigMaps

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.list. Los valores de las claves en el resultado se mostrarán tal como están. Es posible usar la paginación con esta solicitud. Consulta la referencia de la solicitud para obtener más detalles.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Obtén los detalles del ConfigMap

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.get. Los valores de las claves en el resultado se mostrarán tal como están.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Borra un ConfigMap

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.delete.

Ejemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

El recurso google_composer_user_workloads_config_map define un ConfigMap, con claves y valores definidos en el bloque data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Es el nombre del recurso del entorno, que contiene la definición del entorno en Terraform. El nombre del entorno real también se especifica en este recurso.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_NAME: Es el nombre del ConfigMap.
  • KEY_NAME: Son una o más claves para este ConfigMap.
  • KEY_VALUE: Es el valor de la clave.

Ejemplo:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

Usa ConfigMaps en tus DAGs

En este ejemplo, se muestra cómo usar ConfigMaps en tus DAGs.

En el siguiente ejemplo, se pasa un ConfigMap en el parámetro configmaps. Todas las claves de este ConfigMap están disponibles como variables de entorno:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

En el siguiente ejemplo, se muestra cómo montar un ConfigMap como un volumen:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Información sobre el proveedor de Kubernetes de la CNCF

KubernetesPodOperator se implementa en el proveedor apache-airflow-providers-cncf-kubernetes.

Para obtener notas de la versión detalladas del proveedor de Kubernetes de la CNCF, consulta el sitio web del proveedor de Kubernetes de la CNCF.

Requisitos de los recursos

Cloud Composer 3 admite los siguientes valores para los requisitos de recursos. Para ver un ejemplo del uso de los requisitos de recursos, consulta Configuración adicional.

Recurso Mínimo Máximo Paso
CPU 0.25 32 Valores de distancia: 0.25, 0.5, 1, 2, 4, 6, 8, 10, …, 32. Los valores solicitados se redondean al valor de paso admitido más cercano (por ejemplo, de 5 a 6).
Memoria 2G (GB) 128 GB Valores de pasos: 2, 3, 4, 5, …, 128. Los valores solicitados se redondean al valor de paso admitido más cercano (por ejemplo, 3.5 G a 4 G).
Almacenamiento - 100 G (GB) Cualquier valor. Si se solicitan más de 100 GB, solo se proporcionarán 100 GB.

Para obtener más información sobre las unidades de recursos en Kubernetes, consulta Unidades de recursos en Kubernetes.

Soluciona problemas

En esta sección, se proporcionan sugerencias para solucionar problemas comunes de KubernetesPodOperator:

Ver registros

Cuando soluciones problemas, puedes consultar los registros en el siguiente orden:

  1. Registros de tareas de Airflow:

    1. En la consola de Google Cloud , ve a la página Entornos.

      Ir a Entornos

    2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

    3. Ve a la pestaña DAGs.

    4. Haz clic en el nombre del DAG y, luego, en la ejecución del DAG para ver los detalles y los registros.

  2. Registros del programador de Airflow:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Registros.

    3. Inspecciona los registros del programador de Airflow.

  3. Registros de cargas de trabajo de usuarios:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Monitoring.

    3. Selecciona Cargas de trabajo del usuario.

    4. Inspecciona la lista de cargas de trabajo ejecutadas. Puedes ver los registros y la información de uso de recursos de cada carga de trabajo.

Códigos de retorno distintos de cero

Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común es ejecutar una secuencia de comandos de shell como punto de entrada del contenedor para agrupar varias operaciones dentro de este.

Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.

Tiempos de espera de los pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Si deseas verificar esto, revisa los errores en el nivel del Pod con los Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

Las tareas de KubernetesPodOperator fallan cuando se ejecuta una gran cantidad de tareas

Cuando tu entorno ejecuta una gran cantidad de tareas de KubernetesPodOperator o KubernetesExecutor al mismo tiempo, Cloud Composer 3 no acepta tareas nuevas hasta que finalicen algunas de las tareas existentes.

Para obtener más información sobre cómo solucionar este problema, consulta Soluciona problemas de tareas de KubernetesExecutor.

¿Qué sigue?