Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se describe cómo usar KubernetesPodOperator para implementar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer.
KubernetesPodOperator inicia Pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un clúster especificado, que puede ser un clúster separado que no está relacionado con tu entorno. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.
KubernetesPodOperator es una buena opción si necesitas lo siguiente:
- Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
- Dependencias binarias que no están disponibles en la imagen de archivo del trabajador de Cloud Composer.
Antes de comenzar
Si se usa la versión 5.0.0 del proveedor de Kubernetes de CNCF, sigue las instrucciones que se documentan en la sección del proveedor de Kubernetes de CNCF.
La configuración de afinidad de Pod no está disponible en Cloud Composer 2. Si quieres usar la afinidad del Pod, usa los operadores de GKE para iniciar Pods en un clúster diferente.
Acerca de KubernetesPodOperator en Cloud Composer 2
En esta sección, se describe cómo funciona KubernetesPodOperator en Cloud Composer 2.
Uso de recursos
En Cloud Composer 2, el clúster de tu entorno se escala automáticamente. Las cargas de trabajo adicionales que ejecutas con KubernetesPodOperator se ajustan de forma independiente de tu entorno.
Tu entorno no se ve afectado por el aumento de la demanda de recursos, pero el clúster de tu entorno aumenta y disminuye según la demanda de recursos.
Los precios de las cargas de trabajo adicionales que ejecutas en el clúster de tu entorno siguen el modelo de precios de Cloud Composer 2 y usan los SKU de Compute de Cloud Composer.
Cloud Composer 2 usa clústeres de Autopilot que introducen la noción de clases de procesamiento:
Cloud Composer solo admite la clase de procesamiento
general-purpose
.De forma predeterminada, si no se selecciona ninguna clase, se supone la clase
general-purpose
cuando creas Pods con KubernetesPodOperator.Cada clase se asocia con propiedades y límites de recursos específicos. Puedes obtener más información sobre ellas en la documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase
general-purpose
pueden usar hasta 110 GiB de memoria.
Acceso a los recursos del proyecto
Cloud Composer 2 usa clústeres de GKE con la federación de identidades para cargas de trabajo para GKE. Los Pods que se ejecutan en el espacio de nombres composer-user-workloads
pueden acceder a los recursos de Google Cloud en tu proyecto sin configuración adicional. La cuenta de servicio de tu entorno se usa para acceder a estos recursos.
Si deseas usar un espacio de nombres personalizado, las cuentas de servicio de Kubernetes asociadas a este espacio de nombres deben asignarse a cuentas de servicio de Google Cloud para habilitar la autorización de identidad de servicio para las solicitudes a las APIs de Google y otros servicios. Si ejecutas Pods en un espacio de nombres personalizado en el clúster de tu entorno, no se crearán vinculaciones de IAM entre Kubernetes y las cuentas de servicio deGoogle Cloud , y estos Pods no podrán acceder a los recursos de tu proyecto de Google Cloud .
Si usas un espacio de nombres personalizado y quieres que tus Pods tengan acceso a los recursos deGoogle Cloud , sigue las instrucciones de Workload Identity Federation for GKE y configura las vinculaciones para un espacio de nombres personalizado:
- Crea un espacio de nombres independiente en el clúster de tu entorno.
- Crea una vinculación entre la cuenta de servicio de Kubernetes del espacio de nombres personalizado y la cuenta de servicio de tu entorno.
- Agrega la anotación de la cuenta de servicio de tu entorno a la cuenta de servicio de Kubernetes.
- Cuando uses KubernetesPodOperator, especifica el espacio de nombres y la cuenta de servicio de Kubernetes en los parámetros
namespace
yservice_account_name
.
Configuración mínima
Para crear un KubernetesPodOperator, solo se requieren los parámetros name
, image
y task_id
del Pod. El archivo /home/airflow/composer_kube_config
contiene credenciales para autenticarse en GKE.
Configuración adicional
En este ejemplo, se muestran parámetros adicionales que puedes configurar en KubernetesPodOperator.
Consulta los siguientes recursos para obtener más información:
Para obtener información sobre cómo usar los objetos Secret y ConfigMap de Kubernetes, consulta Usa objetos Secret y ConfigMap de Kubernetes.
Para obtener información sobre el uso de plantillas de Jinja con KubernetesPodOperator, consulta Usa plantillas de Jinja.
Para obtener información sobre los parámetros de KubernetesPodOperator, consulta la referencia del operador en la documentación de Airflow.
Usa plantillas de Jinja
Airflow admite plantillas de Jinja en los DAGs.
Debes declarar los parámetros obligatorios de Airflow (task_id
, name
y image
) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds
, arguments
, env_vars
y config_file
.
El parámetro env_vars
del ejemplo se establece a partir de una variable de Airflow llamada my_value
. El DAG de ejemplo obtiene su valor de la variable de plantilla vars
en Airflow. Airflow tiene más variables que proporcionan acceso a diferentes tipos de información. Por ejemplo, puedes usar la variable de plantilla conf
para acceder a los valores de las opciones de configuración de Airflow. Para obtener más información y la lista de variables disponibles en Airflow, consulta Templates reference en la documentación de Airflow.
Sin cambiar el DAG ni crear la variable env_vars
, la tarea ex-kube-templates
del ejemplo falla porque la variable no existe. Crea esta variable en la IU de Airflow o con Google Cloud CLI:
IU de Airflow
Ve a la IU de Airflow.
En la barra de herramientas, selecciona Admin > Variables.
En la página Variable de lista, haz clic en Agregar un registro nuevo.
En la página Agregar variable, ingresa la siguiente información:
- Key:
my_value
- Val:
example_value
- Key:
Haz clic en Guardar.
gcloud
Ingresa el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno.
En el siguiente ejemplo, se muestra cómo usar plantillas de Jinja con KubernetesPodOperator:
Usa Secrets y ConfigMaps de Kubernetes
Un Secret de Kubernetes es un objeto que contiene datos sensibles. Un ConfigMap de Kubernetes es un objeto que contiene datos no confidenciales en pares clave-valor.
En Cloud Composer 2, puedes crear Secrets y ConfigMaps con Google Cloud CLI, la API o Terraform, y, luego, acceder a ellos desde KubernetesPodOperator.
Acerca de los archivos de configuración YAML
Cuando creas un Secret o un ConfigMap de Kubernetes con la CLI y la API de Google Cloud, proporcionas un archivo en formato YAML. Este archivo debe seguir el mismo formato que usan los Secrets y ConfigMaps de Kubernetes. La documentación de Kubernetes proporciona muchas muestras de código de ConfigMaps y Secrets. Para comenzar, puedes consultar la página Distribute Credentials Securely Using Secrets y ConfigMaps.
Al igual que en los Secrets de Kubernetes, usa la representación en Base64 cuando definas valores en los Secrets.
Para codificar un valor, puedes usar el siguiente comando (esta es una de las muchas formas de obtener un valor codificado en base64):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Resultado:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
En los ejemplos que se incluyen más adelante en esta guía, se usan los siguientes dos ejemplos de archivos YAML. Ejemplo de archivo de configuración YAML para un secreto de Kubernetes:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Otro ejemplo que muestra cómo incluir archivos. Al igual que en el ejemplo anterior, primero codifica el contenido de un archivo (cat ./key.json | base64
) y, luego, proporciona este valor en el archivo YAML:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Ejemplo de un archivo de configuración YAML para un ConfigMap. No es necesario que uses la representación en base64 en ConfigMaps:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Administra los Secrets de Kubernetes
En Cloud Composer 2, creas secretos con Google Cloud CLI y kubectl
de la siguiente manera:
Obtén información sobre el clúster de tu entorno:
Ejecuta el siguiente comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre de tu entorno.LOCATION
es la región en la que se encuentra el entorno de Cloud Composer.
El resultado de este comando usa el siguiente formato:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Para obtener el ID del clúster de GKE, copia el resultado después de
/clusters/
(finaliza en-gke
).
Conéctate a tu clúster de GKE con el siguiente comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Reemplaza lo siguiente:
CLUSTER_ID
: Es el ID del clúster del entorno.PROJECT_ID
: Es el ID del proyecto.LOCATION
: Es la región en la que se encuentra el entorno.
Crea Secrets de Kubernetes:
En los siguientes comandos, se muestran dos enfoques diferentes para crear objetos Secret de Kubernetes. El enfoque de
--from-literal
usa pares clave-valor. El enfoque de--from-file
usa el contenido del archivo.Para crear un objeto Secret de Kubernetes proporcionando pares clave-valor, ejecuta el siguiente comando. En este ejemplo, se crea un objeto Secret llamado
airflow-secrets
que tiene un camposql_alchemy_conn
con el valor detest_value
.kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Para crear un secreto de Kubernetes proporcionando el contenido del archivo, ejecuta el siguiente comando. En este ejemplo, se crea un Secret llamado
service-account
que tiene el camposervice-account.json
con el valor tomado del contenido de un archivo./key.json
local.kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Usa Secrets de Kubernetes en tus DAGs
En este ejemplo, se muestran dos formas de usar los Secret de Kubernetes: como una variable de entorno y como un volumen activado por el Pod.
El primer secreto, airflow-secrets
, se establece en una variable de entorno de Kubernetes llamada SQL_CONN
(en lugar de en una variable de entorno de Airflow o Cloud Composer).
El segundo Secret, service-account
, activa service-account.json
, un archivo con un token de cuenta de servicio, en /var/secrets/google
.
Así se ven los objetos Secret:
El nombre del primer secreto de Kubernetes se define en la variable secret_env
.
Este Secret se llama airflow-secrets
. El parámetro deploy_type
especifica que se debe exponer como una variable de entorno. El nombre de la variable de entorno es SQL_CONN
, como se especifica en el parámetro deploy_target
. Por último, el valor de la variable de entorno SQL_CONN
se establece en el valor de la clave sql_alchemy_conn
.
El nombre del segundo secreto de Kubernetes se define en la variable secret_volume
. Este Secret se llama service-account
. Se expone como un volumen, según se especifica en el parámetro deploy_type
. La ruta del archivo que se activará, deploy_target
, es /var/secrets/google
. Por último, la clave (key
) del secreto que se almacena en deploy_target
es service-account.json
.
La configuración del operador tiene el siguiente aspecto:
Información sobre el proveedor de Kubernetes de la CNCF
KubernetesPodOperator se implementa en el proveedor apache-airflow-providers-cncf-kubernetes
.
Para obtener notas de la versión detalladas del proveedor de Kubernetes de la CNCF, consulta el sitio web del proveedor de Kubernetes de la CNCF.
Versión 6.0.0
En la versión 6.0.0 del paquete de CNCF Kubernetes Provider, la conexión kubernetes_default
se usa de forma predeterminada en KubernetesPodOperator.
Si especificaste una conexión personalizada en la versión 5.0.0, el operador seguirá usando esa conexión. Para volver a usar la conexión kubernetes_default
, es posible que debas ajustar tus DAGs en consecuencia.
Versión 5.0.0
Esta versión introduce algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Los más importantes se relacionan con la conexión kubernetes_default
, que no se usa en la versión 5.0.0.
- Se debe modificar la conexión de
kubernetes_default
. La ruta de acceso de configuración de Kubernetes debe establecerse en/home/airflow/composer_kube_config
(como se muestra en la siguiente figura). Como alternativa, se debe agregarconfig_file
a la configuración de KubernetesPodOperator (como se muestra en el siguiente ejemplo de código).

- Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para obtener más información sobre la versión 5.0.0, consulta las notas de la versión del proveedor de Kubernetes de la CNCF.
Soluciona problemas
En esta sección, se proporcionan sugerencias para solucionar problemas comunes de KubernetesPodOperator:
Ver registros
Cuando soluciones problemas, puedes consultar los registros en el siguiente orden:
Registros de tareas de Airflow:
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña DAGs.
Haz clic en el nombre del DAG y, luego, en la ejecución del DAG para ver los detalles y los registros.
Registros del programador de Airflow:
Ve a la página Detalles del entorno.
Ve a la pestaña Registros.
Inspecciona los registros del programador de Airflow.
Registros de pods en la consola de Google Cloud , en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición del Pod, los eventos del Pod y los detalles del Pod.
Códigos de retorno distintos de cero
Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.
Un patrón común es ejecutar una secuencia de comandos de shell como punto de entrada del contenedor para agrupar varias operaciones dentro de este.
Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e
en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.
Tiempos de espera de los pods
El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds
cuando creas el KubernetesPodOperator.
Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Si deseas verificar esto, revisa los errores en el nivel del Pod con los Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.
No se pudo establecer una conexión nueva
La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para comprobar si tu clúster se está actualizando, ve a la página Clústeres de Kubernetes y busca el ícono de carga junto al nombre del clúster de tu entorno en la consola de Google Cloud .