Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se describe cómo usar KubernetesPodOperator para implementar pods de Kubernetes desde Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer.
KubernetesPodOperator inicia pods de Kubernetes en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan pods de Kubernetes en un clúster especificado, que puede ser un clúster independiente que no está relacionado con tu entorno. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.
KubernetesPodOperator es una buena opción si necesitas lo siguiente:
- Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
- Dependencias binarias que no están disponibles en la imagen de archivo del trabajador de Cloud Composer.
Antes de comenzar
- Te recomendamos que uses la versión más reciente de Cloud Composer. Como mínimo, esta versión debe ser compatible como parte de la política de baja y asistencia.
- Asegúrate de que tu entorno tenga recursos suficientes. Iniciar pods en un entorno con escasez de recursos puede causar errores en el trabajador y el programador de Airflow.
Configura los recursos de tu entorno de Cloud Composer
Cuando creas un entorno de Cloud Composer, especificas sus parámetros de rendimiento, incluidos los parámetros de rendimiento del clúster del entorno. Iniciar pods de Kubernetes en el clúster del entorno puede generar competencia por los recursos del clúster, como CPU o memoria. Debido a que el programador y los trabajadores de Airflow están en el mismo clúster de GKE, los programadores y los trabajadores no funcionarán correctamente si la competencia provoca escasez de recursos.
Para evitar escasez de recursos, realiza una o más de las siguientes acciones:
- (Recomendado) Crea un grupo de nodos
- Aumenta la cantidad de nodos de tu entorno
- Especifica el tipo de máquina adecuado
Crea un grupo de nodos
La forma preferida de evitar la escasez de recursos en el entorno de Cloud Composer es crear un nuevo grupo de nodos y configurar los pods de Kubernetes para que se ejecuten solo con recursos de ese grupo.
Console
En la consola de Google Cloud, ve a la página Entornos.
Haz clic en el nombre de tu entorno.
En la página Detalles del entorno, ve a la pestaña Configuración del entorno.
En la sección Recursos > Clúster de GKE, sigue el vínculo Ver detalles del clúster.
Crea un grupo de nodos como se describe en Cómo agregar un grupo de nodos.
gcloud
Determina el nombre del clúster de tu entorno:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"
Reemplaza lo siguiente:
ENVIRONMENT_NAME
por el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno.
El resultado contiene el nombre del clúster de tu entorno. Por ejemplo, esto puede ser
europe-west3-example-enviro-af810e25-gke
.Crea un grupo de nodos como se describe en Cómo agregar un grupo de nodos.
Aumenta la cantidad de nodos de tu entorno
Si aumentas la cantidad de nodos de tu entorno de Cloud Composer, aumentará la capacidad de procesamiento disponible para tus trabajadores. Este aumento no proporciona recursos adicionales para las tareas que requieren más CPU o RAM de las que proporciona el tipo de máquina especificado.
Para aumentar el recuento de nodos, actualiza tu entorno.
Especifica el tipo de máquina adecuado
Durante la creación de entornos de Cloud Composer, puedes especificar un tipo de máquina. A fin de garantizar que haya recursos disponibles, especifica el tipo de máquina para el tipo de procesamiento que se produce en tu entorno de Cloud Composer.
Configuración mínima
Para crear un KubernetesPodOperator, solo se requieren los parámetros name
, image
y task_id
del pod. /home/airflow/composer_kube_config
contiene credenciales para autenticarse en GKE.
Airflow 2
Airflow 1
Configuración de afinidad de los pods
Si configuras el parámetro affinity
en KubernetesPodOperator, puedes controlar en qué nodos se programan los pods (por ejemplo, puedes especificar un grupo de nodos en particular). En este ejemplo, el operador solo se ejecuta en grupos de nodos llamados pool-0
y pool-1
. Los nodos de tu entorno de Cloud Composer 1 están en default-pool
, por lo que tus pods no se ejecutan en los nodos de tu entorno.
Airflow 2
Airflow 1
Con la configuración actual de este ejemplo, la tarea falla. Si observas los registros, verás que la tarea falla porque los grupos de nodos pool-0
y pool-1
no existen.
Para asegurarte de que los grupos de nodos de values
existan, realiza cualquiera de los siguientes cambios de configuración:
Si creaste un grupo de nodos anteriormente, reemplaza
pool-0
ypool-1
con los nombres de tus grupos de nodos y vuelve a subir el DAG.Crea un grupo de nodos con el nombre
pool-0
opool-1
. Puedes crear ambos, pero la tarea necesita solo uno para tener éxito.Reemplaza
pool-0
ypool-1
condefault-pool
, que es el grupo predeterminado que utiliza Airflow. A continuación, vuelve a subir el DAG.
Después de realizar los cambios, espera unos minutos a que se actualice tu entorno.
Luego, vuelve a ejecutar la tarea ex-pod-affinity
y verifica que la tarea ex-pod-affinity
se complete con éxito.
Configuración adicional
En este ejemplo, se muestran los parámetros adicionales que puedes configurar en KubernetesPodOperator.
Consulta los siguientes recursos para obtener más información:
Para obtener información sobre el uso de Secrets y ConfigMaps de Kubernetes, consulta Cómo usar Secrets y ConfigMaps de Kubernetes.
Para obtener información sobre el uso de plantillas de Jinja con KubernetesPodOperator, consulta Cómo usar plantillas de Jinja.
Para obtener información sobre los parámetros de KubernetesPodOperator, consulta la referencia del operador en la documentación de Airflow.
Airflow 2
Airflow 1
Usa plantillas de Jinja
Airflow admite plantillas de Jinja en DAG.
Debes declarar los parámetros de Airflow obligatorios (task_id
, name
y
image
) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds
, arguments
, env_vars
y config_file
.
El parámetro env_vars
del ejemplo se establece desde una variable de Airflow llamada my_value
. El DAG de ejemplo obtiene su valor de la variable de plantilla vars
en Airflow. Airflow tiene más variables que proporcionan acceso a diferentes tipos de información. Por ejemplo, puedes usar la variable de plantilla conf
para acceder a los valores de las opciones de configuración de Airflow. Para obtener más información y la lista de variables disponibles en Airflow, consulta la Referencia de plantillas en la documentación de Airflow.
Sin cambiar el DAG ni crear la variable env_vars
, la tarea ex-kube-templates
del ejemplo falla porque la variable no existe. Crea esta variable en la IU de Airflow o con Google Cloud CLI:
IU de Airflow
Ve a la IU de Airflow.
En la barra de herramientas, selecciona Administrador > Variables.
En la página Variable de lista, haz clic en Agregar un registro nuevo.
En la página Agregar variable, ingresa la siguiente información:
- Key:
my_value
- Val:
example_value
- Key:
Haz clic en Guardar.
Si tu entorno usa Airflow 1, ejecuta el siguiente comando:
Ve a la IU de Airflow.
En la barra de herramientas, selecciona Administrador > Variables.
En la página Variables, haz clic en la pestaña Crear.
En la página Variable, ingresa la siguiente información:
- Key:
my_value
- Val:
example_value
- Key:
Haz clic en Guardar.
gcloud
Ingresa el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Si tu entorno usa Airflow 1, ejecuta el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno.
En el siguiente ejemplo, se muestra cómo usar plantillas de Jinja con KubernetesPodOperator:
Airflow 2
Airflow 1
Usa Secrets y ConfigMaps de Kubernetes
Un Secreto de Kubernetes es un objeto que contiene datos sensibles. Un ConfigMap de Kubernetes es un objeto que contiene datos no confidenciales en pares clave-valor.
En Cloud Composer 2, puedes crear secretos y ConfigMaps con Google Cloud CLI, la API o Terraform y, luego, acceder a ellos desde KubernetesPodOperator.
Información acerca de los archivos de configuración YAML
Cuando creas un Secret o un ConfigMap de Kubernetes con Google Cloud CLI y la API, proporcionas un archivo en formato YAML. Este archivo debe seguir el mismo formato que usan los Secrets y ConfigMaps de Kubernetes. La documentación de Kubernetes proporciona muchas muestras de código de ConfigMaps y Secrets. Para comenzar, puedes ver la página Cómo distribuir credenciales de forma segura con Secrets y ConfigMaps.
Al igual que en los secretos de Kubernetes, usa la representación en Base64 cuando definas valores en Secrets.
Para codificar un valor, puedes usar el siguiente comando (esta es una de las muchas formas de obtener un valor codificado en base64):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Resultado:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Los siguientes dos ejemplos de archivos YAML se usan en muestras más adelante en esta guía. Ejemplo de archivo de configuración YAML para un secreto de Kubernetes:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Otro ejemplo que demuestra cómo incluir archivos. Al igual que en el ejemplo anterior, primero codifica el contenido de un archivo (cat ./key.json | base64
) y, luego, proporciona este valor en el archivo YAML:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Ejemplo de archivo de configuración YAML para un ConfigMap. No es necesario que uses la representación base64 en los ConfigMaps:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Administra los Secrets de Kubernetes
En Cloud Composer 2, creas secretos con Google Cloud CLI y kubectl
:
Obtén información sobre el clúster de tu entorno:
Ejecuta el siguiente comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre de tu entorno.LOCATION
es la región en la que se encuentra el entorno de Cloud Composer.
El resultado de este comando usa el siguiente formato:
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
.Para obtener el ID del clúster de GKE, copia el resultado después de
/clusters/
(finaliza en-gke
).Para obtener la zona, copia el resultado después de
/zones/
.
Conéctate a tu clúster de GKE con el siguiente comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE
Reemplaza lo siguiente:
CLUSTER_ID
: Es el ID del clúster del entorno.PROJECT_ID
: El ID del proyecto.ZONE
con la zona en la que se encuentra el clúster del entorno.
Crea secretos de Kubernetes:
En los siguientes comandos, se muestran dos enfoques diferentes para crear secretos de Kubernetes. El enfoque de
--from-literal
usa pares clave-valor. El enfoque--from-file
usa el contenido del archivo.Para crear un Secret de Kubernetes proporcionando pares clave-valor, ejecuta el siguiente comando: En este ejemplo, se crea un Secret llamado
airflow-secrets
que tiene un camposql_alchemy_conn
con el valortest_value
.kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
Para crear un Secret de Kubernetes proporcionando el contenido del archivo, ejecuta el siguiente comando: En este ejemplo, se crea un Secret con el nombre
service-account
que tiene el camposervice-account.json
con el valor tomado del contenido de un archivo./key.json
local.kubectl create secret generic service-account \ --from-file service-account.json=./key.json
Usa Secrets de Kubernetes en tus DAG
En este ejemplo, se muestran dos formas de usar los Secret de Kubernetes: como una variable de entorno y como un volumen activado por el Pod.
El primer secreto, airflow-secrets
, se establece en una variable de entorno de Kubernetes llamada SQL_CONN
(en lugar de en una variable de entorno de Airflow o Cloud Composer).
El segundo Secret, service-account
, activa service-account.json
, un archivo con un token de cuenta de servicio, en /var/secrets/google
.
Los objetos Secret se ven de la siguiente forma:
Airflow 2
Airflow 1
El nombre del primer Secret de Kubernetes se define en la variable secret_env
.
Este Secret se llama airflow-secrets
. El parámetro deploy_type
especifica que se debe exponer como una variable de entorno. El nombre de la variable de entorno es SQL_CONN
, como se especifica en el parámetro deploy_target
. Por último, el valor de la variable de entorno SQL_CONN
se establece en el valor de la clave sql_alchemy_conn
.
El nombre del segundo Secret de Kubernetes se define en la variable secret_volume
. Este Secret se llama service-account
. Se expone como un volumen, como se especifica en el parámetro deploy_type
. La ruta del archivo que se activará, deploy_target
, es /var/secrets/google
. Por último, el key
del secreto que se almacena en deploy_target
es service-account.json
.
La configuración del operador tiene el siguiente aspecto:
Airflow 2
Airflow 1
Información sobre el proveedor de Kubernetes de CNCF
KubernetesPodOperator se implementa en el proveedor apache-airflow-providers-cncf-kubernetes
.
Para obtener notas de la versión detalladas del proveedor de Kubernetes de CNCF, consulta el sitio web del proveedor de Kubernetes de CNCF.
Versión 6.0.0
En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF,
la conexión kubernetes_default
se usa de forma predeterminada en KubernetesPodOperator.
Si especificaste una conexión personalizada en la versión 5.0.0, el operador seguirá usándola. Para volver a usar la conexión kubernetes_default
, te recomendamos que ajustes tus DAG según corresponda.
Versión 5.0.0
Esta versión introduce algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Los más importantes se relacionan con la conexión kubernetes_default
, que no se usa en la versión 5.0.0.
- Se debe modificar la conexión de
kubernetes_default
. La ruta de acceso de configuración de Kubernetes debe establecerse en/home/airflow/composer_kube_config
(como se muestra en la siguiente figura). Como alternativa, se debe agregarconfig_file
a la configuración de KubernetesPodOperator (como se muestra en el siguiente ejemplo de código).
![Campo de ruta de configuración de Kube en la IU de Airflow](https://cloud.google.com/static/composer/docs/images/airflowui_connection.png?authuser=19&hl=es)
- Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para obtener más información sobre la versión 5.0.0, consulta las notas de la versión del proveedor de CNCF Kubernetes.
Soluciona problemas
En esta sección, se proporcionan sugerencias para solucionar problemas habituales de KubernetesPodOperator:
Ver registros
Cuando soluciones problemas, puedes revisar los registros en el siguiente orden:
Registros de tareas de Airflow:
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña DAG.
Haz clic en el nombre del DAG y, luego, en la ejecución del DAG para ver los detalles y los registros.
Registros del programador de Airflow:
Ve a la página Detalles del entorno.
Ve a la pestaña Registros.
Inspecciona los registros del programador de Airflow.
Los registros de pods en la consola de Google Cloud, en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.
Códigos de retorno distintos de cero
Cuando se usa KubernetesPodOperator (y GKEStartPodOperator), el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.
Un patrón común es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.
Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e
en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.
Tiempos de espera de los pods
El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds
cuando creas el KubernetesPodOperator.
Cuando se agota el tiempo de espera de un pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
También es posible que se agote el tiempo de espera de los pods cuando la cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea solicitada. Si deseas verificar esto, revisa los errores en el nivel del pod mediante los Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.
No se pudo establecer una conexión nueva
La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para comprobar si tu clúster se está actualizando, ve a la página Clústeres de Kubernetes y busca el ícono de carga junto al nombre del clúster de tu entorno en la consola de Google Cloud.