Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se describe cómo usar KubernetesPodOperator para desplegar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer.
KubernetesPodOperator inicia pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan pods de Kubernetes en un clúster específico, que puede ser un clúster independiente que no esté relacionado con tu entorno. También puedes crear y eliminar clústeres con los operadores de Google Kubernetes Engine.
KubernetesPodOperator es una buena opción si necesitas lo siguiente:
- Dependencias de Python personalizadas que no están disponibles en el repositorio público de PyPI.
- Dependencias binarias que no están disponibles en la imagen de trabajador de Cloud Composer.
Antes de empezar
- Te recomendamos que uses la versión más reciente de Cloud Composer. Como mínimo, esta versión debe admitirse como parte de la política de asistencia y retirada.
- Asegúrate de que tu entorno tenga suficientes recursos. Si se lanzan pods en un entorno con pocos recursos, se pueden producir errores en el trabajador y el programador de Airflow.
Configurar los recursos del entorno de Cloud Composer
Cuando creas un entorno de Cloud Composer, especificas sus parámetros de rendimiento, incluidos los del clúster del entorno. Lanzar pods de Kubernetes en el clúster de entorno puede provocar una competencia por los recursos del clúster, como la CPU o la memoria. Como el programador y los trabajadores de Airflow están en el mismo clúster de GKE, no funcionarán correctamente si la competencia provoca una falta de recursos.
Para evitar que se agoten los recursos, lleva a cabo una o varias de las siguientes acciones:
- Crea un grupo de nodos (opción recomendada)
- Aumentar el número de nodos de tu entorno
- Especificar el tipo de máquina adecuado
Crear un grupo de nodos
La forma preferida de evitar la falta de recursos en el entorno de Cloud Composer es crear un grupo de nodos y configurar los pods de Kubernetes para que se ejecuten usando solo los recursos de ese grupo.
Consola
En la Google Cloud consola, ve a la página Entornos.
Haz clic en el nombre de tu entorno.
En la página Detalles del entorno, vaya a la pestaña Configuración del entorno.
En la sección Recursos > Clúster de GKE, siga el enlace Ver detalles del clúster.
Crea un grupo de nodos como se describe en el artículo Añadir un grupo de nodos.
gcloud
Determina el nombre del clúster de tu entorno:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"
Sustituye:
ENVIRONMENT_NAME
con el nombre del entorno.LOCATION
con la región en la que se encuentra el entorno.
El resultado contiene el nombre del clúster de tu entorno. Por ejemplo, puede ser
europe-west3-example-enviro-af810e25-gke
.Crea un grupo de nodos como se describe en el artículo Añadir un grupo de nodos.
Aumentar el número de nodos de tu entorno
Si aumentas el número de nodos de tu entorno de Cloud Composer, también aumentará la potencia de computación disponible para tus cargas de trabajo. Este aumento no proporciona recursos adicionales para las tareas que requieren más CPU o RAM de las que ofrece el tipo de máquina especificado.
Para aumentar el número de nodos, actualiza tu entorno.
Especifica el tipo de máquina adecuado
Durante la creación del entorno de Cloud Composer, puedes especificar un tipo de máquina. Para asegurarte de que haya recursos disponibles, especifica un tipo de máquina para el tipo de computación que se produce en tu entorno de Cloud Composer.
Configuración mínima
Para crear un KubernetesPodOperator, solo se necesitan los parámetros name
, image
y task_id
del pod que se va a usar. El /home/airflow/composer_kube_config
contiene las credenciales para autenticarte en GKE.
Airflow 2
Airflow 1
Configuración de afinidad de pods
Cuando configuras el parámetro affinity
en KubernetesPodOperator, controlas en qué nodos se programan los pods, como los nodos de un grupo de nodos concreto. En este ejemplo, el operador solo se ejecuta en grupos de nodos llamados pool-0
y pool-1
. Los nodos de tu entorno de Cloud Composer 1 están en default-pool
, por lo que tus pods no se ejecutan en los nodos de tu entorno.
Airflow 2
Airflow 1
Como el ejemplo está configurado, la tarea falla. Si consultas los registros, la tarea falla porque no existen los grupos de nodos pool-0
y pool-1
.
Para asegurarte de que los grupos de nodos de values
existen, haz uno de los siguientes cambios en la configuración:
Si has creado un grupo de nodos anteriormente, sustituye
pool-0
ypool-1
por los nombres de tus grupos de nodos y vuelve a subir tu DAG.Crea un grupo de nodos llamado
pool-0
opool-1
. Puedes crear ambos, pero la tarea solo necesita uno para completarse.Sustituye
pool-0
ypool-1
pordefault-pool
, que es el grupo predeterminado que usa Airflow. A continuación, vuelve a subir el DAG.
Después de hacer los cambios, espera unos minutos a que se actualice tu entorno.
A continuación, vuelve a ejecutar la tarea ex-pod-affinity
y comprueba que se haya completado correctamente.ex-pod-affinity
Configuración adicional
En este ejemplo se muestran parámetros adicionales que puedes configurar en KubernetesPodOperator.
Consulta los siguientes recursos para obtener más información:
Para obtener información sobre cómo usar los secretos y los ConfigMaps de Kubernetes, consulta Usar secretos y ConfigMaps de Kubernetes.
Para obtener información sobre cómo usar plantillas Jinja con KubernetesPodOperator, consulta Usar plantillas Jinja.
Para obtener información sobre los parámetros de KubernetesPodOperator, consulta la referencia del operador en la documentación de Airflow.
Airflow 2
Airflow 1
Usar plantillas Jinja
Airflow admite plantillas Jinja en los DAGs.
Debe declarar los parámetros de Airflow obligatorios (task_id
, name
y image
) con el operador. Como se muestra en el siguiente ejemplo, puedes usar Jinja para crear plantillas de todos los demás parámetros, incluidos cmds
, arguments
, env_vars
y config_file
.
El parámetro env_vars
del ejemplo se define a partir de una variable de Airflow llamada my_value
. El DAG de ejemplo obtiene su valor de la variable de plantilla vars
de Airflow. Airflow tiene más variables que proporcionan acceso a diferentes tipos de información. Por ejemplo, puedes usar la variable de plantilla conf
para acceder a los valores de las opciones de configuración de Airflow. Para obtener más información y la lista de variables disponibles en Airflow, consulta la referencia de plantillas en la documentación de Airflow.
Sin cambiar el DAG ni crear la variable env_vars
, la tarea ex-kube-templates
del ejemplo falla porque la variable no existe. Crea esta variable en la interfaz de usuario de Airflow o con Google Cloud CLI:
Interfaz de usuario de Airflow
Ve a la interfaz de Airflow.
En la barra de herramientas, seleccione Administrar > Variables.
En la página List Variable (Variable de lista), haz clic en Add a new record (Añadir un nuevo registro).
En la página Añadir variable, introduce la siguiente información:
- Tecla:
my_value
- Valor:
example_value
- Tecla:
Haz clic en Guardar.
Si tu entorno usa Airflow 1, ejecuta el siguiente comando:
Ve a la interfaz de Airflow.
En la barra de herramientas, seleccione Administrar > Variables.
En la página Variables, haz clic en la pestaña Crear.
En la página Variable, introduce la siguiente información:
- Tecla:
my_value
- Valor:
example_value
- Tecla:
Haz clic en Guardar.
gcloud
Introduce el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Si tu entorno usa Airflow 1, ejecuta el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
Sustituye:
ENVIRONMENT
con el nombre del entorno.LOCATION
con la región en la que se encuentra el entorno.
En el siguiente ejemplo se muestra cómo usar plantillas Jinja con KubernetesPodOperator:
Airflow 2
Airflow 1
Usar secretos y ConfigMaps de Kubernetes
Un secreto de Kubernetes es un objeto que contiene datos sensibles. Un ConfigMap de Kubernetes es un objeto que contiene datos no confidenciales en pares clave-valor.
En Cloud Composer 2, puedes crear secretos y ConfigMaps con la CLI de Google Cloud, la API o Terraform, y luego acceder a ellos desde KubernetesPodOperator.
Acerca de los archivos de configuración YAML
Cuando creas un Secret o un ConfigMap de Kubernetes con la CLI de Google Cloud y la API, proporcionas un archivo en formato YAML. Este archivo debe tener el mismo formato que los secretos y los ConfigMaps de Kubernetes. La documentación de Kubernetes proporciona muchos ejemplos de código de ConfigMaps y Secrets. Para empezar, puedes consultar la página Distribuir credenciales de forma segura mediante secretos y ConfigMaps.
Al igual que en los secretos de Kubernetes, usa la representación en base64 cuando definas valores en los secretos.
Para codificar un valor, puedes usar el siguiente comando (esta es una de las muchas formas de obtener un valor codificado en base64):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Resultado:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
En los ejemplos que se muestran más adelante en esta guía se utilizan los dos archivos YAML que se indican a continuación. Ejemplo de archivo de configuración YAML de un secreto de Kubernetes:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Otro ejemplo que muestra cómo incluir archivos. Al igual que en el ejemplo anterior, primero codifica el contenido de un archivo (cat ./key.json | base64
) y, a continuación, proporciona este valor en el archivo YAML:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Ejemplo de archivo de configuración YAML de un ConfigMap. No es necesario que utilices la representación base64 en ConfigMaps:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Gestionar secretos de Kubernetes
En Cloud Composer 2, puedes crear secretos con Google Cloud CLI y kubectl
:
Obtén información sobre el clúster de tu entorno:
Ejecuta el siguiente comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Sustituye:
ENVIRONMENT
con el nombre de tu entorno.LOCATION
con la región en la que se encuentra el entorno de Cloud Composer.
El resultado de este comando tiene el siguiente formato:
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
.Para obtener el ID del clúster de GKE, copia el resultado después de
/clusters/
(termina en-gke
).Para obtener la zona, copia el resultado después de
/zones/
.
Conéctate a tu clúster de GKE con el siguiente comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE
Sustituye:
CLUSTER_ID
: el ID del clúster del entorno.PROJECT_ID
: el ID de proyecto.ZONE
con la zona en la que se encuentra el clúster del entorno.
Crea secretos de Kubernetes:
En los siguientes comandos se muestran dos enfoques diferentes para crear secretos de Kubernetes. El enfoque
--from-literal
usa pares clave-valor. El enfoque--from-file
usa el contenido de los archivos.Para crear un secreto de Kubernetes proporcionando pares clave-valor, ejecuta el siguiente comando. En este ejemplo, se crea un secreto llamado
airflow-secrets
que tiene un camposql_alchemy_conn
con el valortest_value
.kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
Para crear un secreto de Kubernetes proporcionando el contenido de un archivo, ejecuta el siguiente comando. En este ejemplo se crea un secreto llamado
service-account
que tiene el camposervice-account.json
con el valor tomado del contenido de un archivo./key.json
local.kubectl create secret generic service-account \ --from-file service-account.json=./key.json
Usar secretos de Kubernetes en tus DAGs
En este ejemplo se muestran dos formas de usar los secretos de Kubernetes: como variable de entorno y como volumen montado por el pod.
El primer secreto, airflow-secrets
, se asigna a una variable de entorno de Kubernetes llamada SQL_CONN
(en lugar de a una variable de entorno de Airflow o Cloud Composer).
El segundo secreto, service-account
, monta service-account.json
, un archivo
con un token de cuenta de servicio, en /var/secrets/google
.
Este es el aspecto de los objetos secretos:
Airflow 2
Airflow 1
El nombre del primer secreto de Kubernetes se define en la variable secret_env
.
Este secreto se llama airflow-secrets
. El parámetro deploy_type
especifica que debe exponerse como una variable de entorno. El nombre de la variable de entorno es SQL_CONN
, tal como se especifica en el parámetro deploy_target
. Por último, el valor de la variable de entorno SQL_CONN
se asigna al valor de la clave sql_alchemy_conn
.
El nombre del segundo secreto de Kubernetes se define en la variable secret_volume
. Este secreto se llama service-account
. Se expone como un volumen, tal como se especifica en el parámetro deploy_type
. La ruta del archivo que se va a montar, deploy_target
, es /var/secrets/google
. Por último, el key
del secreto almacenado en deploy_target
es service-account.json
.
Este es el aspecto de la configuración del operador:
Airflow 2
Airflow 1
Información sobre el proveedor de Kubernetes de la CNCF
KubernetesPodOperator se implementa en el proveedor apache-airflow-providers-cncf-kubernetes
.
Para consultar las notas de la versión detalladas del proveedor de Kubernetes de CNCF, visita el sitio web del proveedor de Kubernetes de CNCF.
Versión 6.0.0
En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF, KubernetesPodOperator usa la conexión kubernetes_default
de forma predeterminada.
Si has especificado una conexión personalizada en la versión 5.0.0, el operador seguirá usando esa conexión. Para volver a usar la kubernetes_default
conexión, puede que tengas que ajustar tus DAGs en consecuencia.
Versión 5.0.0
Esta versión introduce algunos cambios incompatibles con versiones anteriores
en comparación con la versión 4.4.0. Las más importantes están relacionadas con la kubernetes_default
conexión, que no se usa en la versión 5.0.0.
- Es necesario modificar la conexión
kubernetes_default
. La ruta de configuración de Kubernetes debe ser/home/airflow/composer_kube_config
(como se muestra en la siguiente figura). Como alternativa, se debe añadirconfig_file
a la configuración de KubernetesPodOperator (como se muestra en el siguiente ejemplo de código).

- Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para obtener más información sobre la versión 5.0.0, consulta las notas de la versión del proveedor de Kubernetes de CNCF.
Solución de problemas
En esta sección se ofrecen consejos para solucionar problemas habituales de KubernetesPodOperator:
Ver registros
Cuando intentes solucionar problemas, puedes consultar los registros en el siguiente orden:
Registros de tareas de Airflow:
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña DAGs.
Haga clic en el nombre del DAG y, a continuación, en la ejecución del DAG para ver los detalles y los registros.
Registros del programador de Airflow:
Ve a la página Detalles del entorno.
Ve a la pestaña Registros.
Inspecciona los registros del programador de Airflow.
Registros de pods en la consola, en cargas de trabajo de GKE. Google Cloud Estos registros incluyen el archivo YAML de definición de pod, los eventos de pod y los detalles del pod.
Códigos de retorno distintos de cero
Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera completada correctamente o no. Los códigos de retorno distintos de cero indican un error.
Un patrón habitual es ejecutar un script de shell como punto de entrada del contenedor para agrupar varias operaciones en el contenedor.
Si vas a escribir una secuencia de comandos de este tipo, te recomendamos que incluyas el comando set -e
en la parte superior para que, si se produce un error en un comando de la secuencia, esta se detenga y el error se propague a la instancia de tarea de Airflow.
Tiempos de espera de pods
El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que se agote el tiempo de espera antes de que se descarguen imágenes más grandes. Puedes aumentar el tiempo de espera modificando el parámetro startup_timeout_seconds
al crear el KubernetesPodOperator.
Cuando se agota el tiempo de espera de un pod, el registro específico de la tarea está disponible en la interfaz de usuario de Airflow. Por ejemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Los tiempos de espera de los pods también pueden producirse cuando la cuenta de servicio de Cloud Composer no tiene los permisos de gestión de identidades y accesos necesarios para realizar la tarea en cuestión. Para verificarlo, consulta los errores a nivel de pod en los paneles de control de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.
No se ha podido establecer una nueva conexión
La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que vea el siguiente error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para comprobar si tu clúster se está actualizando, en la consola de Google Cloud , ve a la página Clústeres de Kubernetes y busca el icono de carga junto al nombre del clúster de tu entorno.