Usar KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se describe cómo usar KubernetesPodOperator para implementar Pods de Kubernetes de Cloud Composer a Google Kubernetes Engine clúster que es parte de tu entorno de Cloud Composer.

KubernetesPodOperator inicia pods de Kubernetes en el clúster de tu entorno. En comparación, Los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un que puede ser un clúster independiente que no esté relacionado con tu en un entorno de nube. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en la imagen de archivo del trabajador de Cloud Composer.

Antes de comenzar

Consulta la siguiente lista de diferencias entre KubernetesPodOperator en Cloud Composer 3 y Cloud Composer 2, y asegúrate de que tus DAG compatible:

  • No es posible crear espacios de nombres personalizados en Cloud Composer 3. Grupos de anuncios se ejecuta siempre en el espacio de nombres composer-user-workloads, incluso si se trata de un un espacio de nombres. Los pods de este espacio de nombres tienen acceso a los recursos de tu proyecto y a la red de VPC (si está habilitada) sin configuración adicional.

  • Los Secrets de Kubernetes y los ConfigMaps no se pueden crear con la API de Kubernetes. En su lugar, Cloud Composer proporciona comandos de Google Cloud CLI, recursos de Terraform y la API de Cloud Composer para administrar secretos y ConfigMaps de Kubernetes. Para obtener más información, consulta Cómo usar Secrets y ConfigMaps de Kubernetes.

  • Al igual que en Cloud Composer 2, la configuración de afinidad de Pods no está disponible. Si quieres usar la afinidad de Pods, usa los operadores de GKE para iniciar Pods en un clúster diferente.

Acerca de KubernetesPodOperator en Cloud Composer 3

En esta sección, se describe cómo funciona KubernetesPodOperator en Cloud Composer 3.

Uso de recursos

En Cloud Composer 3, el clúster de tu entorno escala automáticamente. Cargas de trabajo adicionales que ejecutas con KubernetesPodOperator escala de forma independiente de tu entorno. Tu entorno no se ve afectado por el aumento de la demanda de recursos, pero el clúster de tu entorno aumenta y disminuye según la demanda de recursos.

Los precios de las cargas de trabajo adicionales que ejecutas en el clúster de tu entorno siguen el modelo de precios de Cloud Composer 3 y usan los SKU de Cloud Composer 3.

Cloud Composer 3 usa clústeres de Autopilot que presentan el concepto de clases de procesamiento:

  • Cloud Composer solo admite la clase de procesamiento general-purpose.

  • De forma predeterminada, si no se selecciona ninguna clase, se asume la clase general-purpose cuando creas pods con KubernetesPodOperator.

  • Cada clase está asociada con propiedades y límites de recursos específicos. Puedes obtener información sobre ellos en la documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase general-purpose pueden usar hasta 110 GiB de memoria.

Acceso a los recursos del proyecto

En Cloud Composer 3, el clúster de tu entorno se encuentra en el proyecto del inquilino. Los pods se ejecutan en el clúster del entorno, en un espacio de nombres aislado.

En Cloud Composer 3, los Pods siempre se ejecutan en el espacio de nombres composer-user-workloads, incluso si se especifica un espacio de nombres diferente. Los pods de este espacio de nombres pueden acceder a los recursos de Google Cloud en tu proyecto y a tu red de VPC (si está habilitada) sin configuración adicional. La cuenta de servicio de tu entorno se usa para acceder a estos recursos. No es posible especificar otra cuenta de servicio.

Configuración mínima

Para crear un KubernetesPodOperator, solo se usarán los valores name y image del Pod Los parámetros task_id son obligatorios. /home/airflow/composer_kube_config contiene credenciales para autenticarse en GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuración adicional

En este ejemplo, se muestran los parámetros adicionales que puedes configurar en KubernetesPodOperator.

Para obtener más información sobre los parámetros, consulta la Referencia de Airflow para KubernetesPodOperator Para obtener información sobre el uso de Secrets y ConfigMaps de Kubernetes, consulta Cómo usar Secrets y ConfigMaps de Kubernetes. Para para obtener más información sobre el uso de las plantillas de Jinja con KubernetesPodOperator, consulta Usa las plantillas de Jinja.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Usa plantillas de Jinja

Airflow admite plantillas de Jinja en DAG.

Debes declarar los parámetros obligatorios de Airflow (task_id, name y image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas para todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

El parámetro env_vars del ejemplo se establece desde un Variable de Airflow llamada my_value. El DAG de ejemplo obtiene su valor de la variable de plantilla vars en Airflow. Airflow tiene más variables que proporcionan acceso a diferentes tipos de información. Por ejemplo: puedes usar la variable de plantilla conf para acceder a los valores Opciones de configuración de Airflow. Para obtener más información y la lista de variables disponibles en Airflow, consulta la Referencia de plantillas en la documentación de Airflow.

Sin cambiar el DAG ni crear la variable env_vars, el La tarea ex-kube-templates del ejemplo falla porque la variable no existen. Crea esta variable en la IU de Airflow o con Google Cloud CLI:

IU de Airflow

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variable de lista, haz clic en Agregar un registro nuevo.

  4. En la página Agregar variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.

gcloud

Ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Reemplaza lo siguiente:

  • ENVIRONMENT por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno

El siguiente ejemplo demuestra cómo usar las plantillas de Jinja con KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Usa Secrets de Kubernetes y ConfigMaps

Un Secret de Kubernetes es un objeto que contiene datos sensibles. Un objeto Kubernetes ConfigMap es un objeto que contiene datos no confidenciales en pares clave-valor.

En Cloud Composer 3, puede crear Secrets y ConfigMaps con Google Cloud CLI, API o Terraform y, luego, accede a ellas desde KubernetesPodOperator:

  • Con la CLI y la API de Google Cloud, proporcionas un archivo de configuración YAML.
  • Con Terraform, defines Secrets y ConfigMaps como recursos independientes en los archivos de configuración de Terraform.

Acerca de los archivos de configuración YAML

Cuando creas un Secret de Kubernetes o un ConfigMap con Google Cloud CLI y en la API, debes proporcionar un archivo en formato YAML. Este archivo debe seguir el mismo que usan los Secrets y ConfigMaps de Kubernetes. La documentación de Kubernetes proporciona muchas muestras de código de ConfigMaps y Secrets. Para comenzar, puedes consulta la Distribuye credenciales de forma segura con secretos y ConfigMaps.

Al igual que en los secretos de Kubernetes, usa la representación en Base64 cuando definas valores en Secrets.

Para codificar un valor, puedes usar el siguiente comando (esta es una de muchas formas para obtener un valor codificado en base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Resultado:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Los siguientes dos ejemplos de archivos YAML se usan en muestras más adelante en esta guía. Ejemplo de archivo de configuración YAML para un Secret de Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Otro ejemplo que demuestra cómo incluir archivos. Igual que en el ejemplo anterior ejemplo, primero codifica el contenido de un archivo (cat ./key.json | base64) y, luego, proporciona este valor en el archivo YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Un archivo de configuración YAML de ejemplo para un ConfigMap. No es necesario que uses la representación base64 en los ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Administra Secrets de Kubernetes

gcloud

Crea un Secret

Para crear un Secret de Kubernetes, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración del secreto.

Ejemplo:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Cómo actualizar un Secret

Para actualizar un Secret de Kubernetes, ejecuta el siguiente comando. El nombre del Secret se tomará del archivo YAML especificado y se reemplazará su contenido.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_FILE: Es la ruta de acceso a un archivo YAML local que contiene la configuración del secreto. Especifica el nombre del Secret en el campo name metadata > de este archivo.

List Secrets

Para obtener una lista de Secrets y sus campos para un entorno, ejecuta el siguiente comando. Los valores clave del resultado se reemplazarán por asteriscos.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Obtén detalles del Secret

Para obtener información detallada sobre un Secret, ejecuta el siguiente comando. Los valores clave en el resultado se reemplazarán por asteriscos.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • SECRET_NAME: Es el nombre del Secret, tal como se definió en el campo name metadata > del archivo YAML con la configuración del Secret.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Cómo borrar un secreto

Para borrar un Secret, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: Es el nombre del Secret, tal como se definió en el campo name metadata > del archivo YAML con la configuración del Secret.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

API

Crea un Secret

  1. Crea un API de environments.userWorkloadsSecrets.create para cada solicitud.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del secreto nuevo.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y valores codificados en base64 para el secreto.

Ejemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Cómo actualizar un Secret

  1. Crea una solicitud a la API de environments.userWorkloadsSecrets.update.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del Secreto.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y valores codificados en base64 para el secreto. Se reemplazarán los valores.

Ejemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

List Secrets

Crea una solicitud a la API de environments.userWorkloadsSecrets.list. Los valores clave en el resultado se reemplazarán por asteriscos. Es posible usar la paginación con esta solicitud, consulta la referencia de la solicitud para más detalles.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Obtén detalles del Secret

Crea una solicitud a la API de environments.userWorkloadsSecrets.get. Los valores clave en el resultado se reemplazarán por asteriscos.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Cómo borrar un secreto

Crea un API de environments.userWorkloadsSecrets.delete para cada solicitud.

Ejemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

La google_composer_user_workloads_secret recurso define un Secret de Kubernetes, con claves y valores definidos en el data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Es el nombre del recurso del entorno, que contiene la definición del entorno en Terraform. El nombre del entorno real también se especifica en este recurso.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • SECRET_NAME: Es el nombre del Secreto.
  • KEY_NAME: Es una o más claves para este Secret.
  • KEY_VALUE: Es el valor codificado en Base64 de la clave. Puedes usar la base64encode para codificar el valor (consulta el ejemplo).

Los siguientes dos ejemplos de Secrets de Kubernetes se usan en muestras más adelante en en esta guía.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Otro ejemplo en el que se muestra cómo incluir archivos. Puedes usar la función file para leer el contenido del archivo como una cadena y, luego, codificarlo en Base64:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Usa Secrets de Kubernetes en tus DAG

En este ejemplo, se muestran dos formas de usar Secrets de Kubernetes: como entorno variable y como un volumen activado por el Pod.

El primer secreto, airflow-secrets, se establece en una variable de entorno de Kubernetes llamada SQL_CONN (en lugar de en una variable de entorno de Airflow o Cloud Composer).

El segundo Secret, service-account, activa service-account.json, un archivo con un token de cuenta de servicio, en /var/secrets/google.

Los objetos Secret se ven de la siguiente manera:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

El nombre del primer Secret de Kubernetes se define en la variable secret_env. Este Secret se llama airflow-secrets. El parámetro deploy_type especifica que debe exponerse como una variable de entorno. El nombre de la variable de entorno es SQL_CONN, como se especifica en el parámetro deploy_target. Por último, la de la variable de entorno SQL_CONN se establece en el valor de Tecla sql_alchemy_conn.

El nombre del segundo Secret de Kubernetes se define en el archivo secret_volume de salida. Este Secret se llama service-account. Se expone como un Volume, como se especifica en el parámetro deploy_type. La ruta del archivo que se activará, deploy_target, es /var/secrets/google. Por último, el key del secreto que se almacena en deploy_target es service-account.json.

La configuración del operador tiene el siguiente aspecto:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Administra ConfigMaps de Kubernetes

gcloud

Crea un ConfigMap

Para crear un ConfigMap, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_FILE: Es la ruta de acceso a un archivo YAML local que contiene el ConfigMap. configuración.

Ejemplo:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

Cómo actualizar un ConfigMap

Para actualizar un ConfigMap, ejecuta el siguiente comando. El nombre de ConfigMaps del archivo YAML especificado, y el contenido del ConfigMap se y reemplazarse.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_FILE: Es la ruta de acceso a un archivo YAML local que contiene el ConfigMap. configuración. Especifica el nombre del ConfigMap en metadata > name campo de este archivo.

Cómo enumerar ConfigMaps

Para obtener una lista de ConfigMaps y sus campos para un entorno, ejecuta el comando siguiente comando. Los valores clave del resultado se mostrarán tal como están.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Obtén los detalles del ConfigMap

Para obtener información detallada sobre un ConfigMap, ejecuta el siguiente comando. Clave los valores del resultado se mostrarán tal cual.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Reemplaza lo siguiente:

  • CONFIG_MAP_NAME: Es el nombre del ConfigMap, tal como se definió en el campo name > metadata del archivo YAML con la configuración del ConfigMap.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

Borra un ConfigMap

Para borrar un ConfigMap, ejecuta el siguiente comando:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: Es el nombre del ConfigMap, tal como se definió en el campo name > metadata del archivo YAML con la configuración del ConfigMap.
  • ENVIRONMENT_NAME: Es el nombre de tu entorno.
  • LOCATION: Es la región en la que se encuentra el entorno.

API

Cómo crear un ConfigMap

  1. Crea un environments.userWorkloadsConfigMaps.create solicitud a la API.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del nuevo ConfigMap.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y para el ConfigMap.

Ejemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

Actualiza un ConfigMap

  1. Crea un environments.userWorkloadsConfigMaps.update solicitud a la API.

  2. En esta solicitud, realiza lo siguiente:

    1. En el cuerpo de la solicitud, en el campo name, especifica el URI del ConfigMap.
    2. En el cuerpo de la solicitud, en el campo data, especifica las claves y los valores del ConfigMap. Se reemplazarán los valores.

Ejemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

Enumera ConfigMaps

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.list. Los valores clave del resultado se mostrarán tal como están. Es posible usar la paginación con esta solicitud, consulta la referencia de la solicitud para más detalles.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Obtén los detalles del ConfigMap

Crea un API de environments.userWorkloadsConfigMaps.get para cada solicitud. Los valores clave del resultado se mostrarán tal como están.

Ejemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Borra un ConfigMap

Crea una solicitud a la API de environments.userWorkloadsConfigMaps.delete.

Ejemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

El google_composer_user_workloads_config_map recurso define un ConfigMap, con las claves y los valores definidos en el data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Es el nombre del recurso del entorno, que contiene la definición del entorno en Terraform. El estado el nombre del entorno también se especifica en este recurso.
  • LOCATION: Es la región en la que se encuentra el entorno.
  • CONFIG_MAP_NAME: Es el nombre del ConfigMap.
  • KEY_NAME: Es una o más claves para este ConfigMap.
  • KEY_VALUE: Es el valor de la clave.

Ejemplo:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

Usa ConfigMaps en tus DAG

En este ejemplo, se muestra cómo usar ConfigMaps en tus DAG.

En el siguiente ejemplo, se pasa un ConfigMap en el parámetro configmaps. Todas las claves de este ConfigMap están disponibles como variables de entorno:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

En el siguiente ejemplo, se muestra cómo activar un ConfigMap como un volumen:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Información sobre el proveedor de Kubernetes de CNCF

KubernetesPodOperator se implementa en el proveedor apache-airflow-providers-cncf-kubernetes.

Para obtener notas de la versión detalladas del proveedor de Kubernetes para CNCF, consulta Sitio web del proveedor de Kubernetes de CNCF.

Soluciona problemas

En esta sección, se proporcionan sugerencias para solucionar problemas habituales de KubernetesPodOperator:

Visualiza registros

Cuando soluciones problemas, puedes revisar los registros en el siguiente orden:

  1. Registros de tareas de Airflow:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

    3. Ve a la pestaña DAG.

    4. Haz clic en el nombre del DAG y, luego, en la ejecución del DAG para ver los detalles. y registros.

  2. Registros del programador de Airflow:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Registros.

    3. Inspeccionar los registros del programador de Airflow

  3. Registros de cargas de trabajo del usuario:

    1. Ve a la página Detalles del entorno.

    2. Ve a la pestaña Monitoring.

    3. Selecciona Cargas de trabajo del usuario.

    4. Inspecciona la lista de cargas de trabajo ejecutadas. Puedes ver los registros y información de uso de recursos de cada carga de trabajo.

Códigos de retorno distintos de cero

Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común es ejecutar una secuencia de comandos de shell como punto de entrada del contenedor para varias operaciones en el contenedor.

Si estás escribiendo un script de este tipo, te recomendamos que incluyas el Comando set -e en la parte superior de la secuencia de comandos para que haya comandos con errores en ella finalizar la secuencia de comandos y propagar la falla a la instancia de tarea de Airflow.

Tiempos de espera de los pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Los tiempos de espera de Pods también pueden ocurrir Cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea en mano. Para verificarlo, observa los errores a nivel del Pod con el Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

¿Qué sigue?