Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En esta página, se describe cómo usar KubernetesPodOperator
para implementar pods de Kubernetes de Cloud Composer en el clúster de Google Kubernetes Engine que forma parte de tu entorno de Cloud Composer y para garantizar que tu entorno tenga los recursos adecuados.
KubernetesPodOperator
inicia Pods de Kubernetes
en el clúster de tu entorno. En comparación, los operadores de Google Kubernetes Engine ejecutan pods de Kubernetes en un clúster específico, que puede ser un clúster independiente que no está relacionado con tu entorno. También puedes crear y borrar clústeres con los operadores de Google Kubernetes Engine.
KubernetesPodOperator
es una buena opción si necesitas lo siguiente:
- Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
- Dependencias binarias que no están disponibles en la imagen de archivo del trabajador de Cloud Composer.
En esta página, se explica un ejemplo de DAG de Airflow que incluye las siguientes configuraciones de KubernetesPodOperator
:
- Configuración mínima: establece solo los parámetros requeridos.
- Configuración de plantilla: utiliza parámetros que puedes agregar a una plantilla con Jinja.
Configuración de variables secretas: pasa un objeto secreto de Kubernetes al pod.
En Cloud Composer 2, la configuración de afinidad de Pods no está disponible. En su lugar, usa los operadores de GKE para iniciar Pods en un clúster diferente.
Configuración completa: incluye todas las configuraciones.
Antes de comenzar
En Cloud Composer 2, el clúster de tu entorno escala de forma automática. Las cargas de trabajo adicionales que ejecutas con
KubernetesPodOperator
se escalan de forma independiente del entorno. Tu entorno no se ve afectado por una mayor demanda de recursos, pero el clúster de tu entorno aumenta o reduce la escala verticalmente según la demanda de recursos. El precio de las cargas de trabajo adicionales que ejecutas en el clúster de tu entorno sigue el modelo de precios de Cloud Composer 2 y usa los SKU de Compute de Cloud Composer.Cloud Composer 2 usa clústeres de GKE con Workload Identity. De forma predeterminada, los Pods que se ejecutan en espacios de nombres recién creados o el espacio de nombres
composer-user-workloads
no pueden acceder a los recursos de Google Cloud. Cuando usas Workload Identity, las cuentas de servicio de Kubernetes asociadas con los espacios de nombres deben asignarse a las cuentas de servicio de Google Cloud a fin de habilitar la autorización de identidad de servicio para las solicitudes a las API de Google y otros servicios.Debido a esto, si ejecutas Pods en el espacio de nombres
composer-user-workloads
o en uno recién creado en el clúster de tu entorno, no se crearán las vinculaciones de IAM adecuadas entre las cuentas de servicio de Kubernetes y Google Cloud, y estos Pods no podrán acceder a los recursos de tu proyecto de Google Cloud.Si deseas que los Pods tengan acceso a los recursos de Google Cloud, usa el espacio de nombres
composer-user-workloads
o crea tu propio espacio de nombres como se describe más adelante.Para proporcionar acceso a los recursos de tu proyecto, sigue las instrucciones de Workload Identity y configura las vinculaciones:
- Crea un espacio de nombres separado en el clúster de tu entorno.
- Crea una vinculación entre la cuenta de servicio de Kubernetes
composer-user-workloads/<namespace_name>
y la cuenta de servicio de tu entorno. - Agrega la anotación de la cuenta de servicio de tu entorno a la cuenta de servicio de Kubernetes.
- Cuando uses
KubernetesPodOperator
, especifica el espacio de nombres y la cuenta de servicio de Kubernetes en los parámetrosnamespace
yservice_account_name
.
Cloud Composer 2 usa clústeres de GKE con Workload Identity. El servidor de metadatos de GKE tarda unos segundos en comenzar a aceptar solicitudes en un Pod recién creado. Por lo tanto, los intentos de autenticación mediante Workload Identity durante los primeros segundos de la vida útil de un Pod pueden fallar. Para obtener más información sobre esta limitación, consulta Restricciones de Workload Identity.
Cloud Composer 2 usa clústeres de Autopilot que introducen la noción de clases de procesamiento:
De forma predeterminada, si no se selecciona ninguna clase, se supone la clase
general-purpose
cuando creas Pods conKubernetesPodOperator
.Cada clase está asociada con propiedades y límites de recursos específicos. Puedes obtener más información al respecto en la documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase
general-purpose
pueden usar hasta 110 GiB de memoria.
Si se usa la versión 5.0.0 del proveedor de Kubernetes para CNCF, sigue las instrucciones documentadas en la sección del proveedor de Kubernetes para CNCF.
Configuración de KubernetesPodOperator
Para continuar con este ejemplo, coloca todo el archivo kubernetes_pod_operator.py
en la carpeta dags/
de tu entorno o agrega el código relevante KubernetesPodOperator
a un DAG.
Las siguientes secciones explican cada configuración de KubernetesPodOperator
en el ejemplo. Para obtener información sobre cada variable de configuración, consulta la referencia de Airflow.
Configuración mínima
Para crear un KubernetesPodOperator
, solo se requieren el name
del Pod, el namespace
en el que se ejecutará el Pod, el image
que se usará y el task_id
.
Cuando colocas el siguiente fragmento de código en un DAG, la configuración utiliza los valores predeterminados en /home/airflow/composer_kube_config
. No es necesario modificar el código para que la tarea pod-ex-minimum
se realice correctamente.
Configuración de la plantilla
Airflow admite el uso de plantillas de Jinja.
Debes declarar las variables obligatorias (task_id
, name
, namespace
y image
) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds
, arguments
, env_vars
y config_file
.
Sin cambiar el DAG ni tu entorno, la tarea ex-kube-templates
falla debido a dos errores. Los registros muestran que esta tarea falla porque no existe la variable adecuada (my_value
). El segundo error, que puedes obtener después de corregir el primero, muestra que la tarea falla porque core/kube_config
no se encuentra en config
.
Para corregir ambos errores, sigue los pasos detallados a continuación.
Para configurar my_value
con gcloud
o la IU de Airflow:
IU de Airflow
En la IU de Airflow 2, haz lo siguiente:
Ve a la IU de Airflow.
En la barra de herramientas, selecciona Administrador > Variables.
En la página Variable de lista, haz clic en Agregar un registro nuevo.
En la página Agregar variable, ingresa la siguiente información:
- Key:
my_value
- Val:
example_value
- Key:
Haz clic en Guardar.
gcloud
Para Airflow 2, ingresa el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno
Para hacer referencia a un config_file
personalizado (un archivo de configuración de Kubernetes), anula la opción de configuración kube_config
de Airflow a una configuración de Kubernetes válida:
Sección | Clave | Valor |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Espera unos minutos a que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-kube-templates
y verifica que la tarea ex-kube-templates
se complete con éxito.
Configuración de variables secretas
Un secreto de Kubernetes es un objeto que contiene datos sensibles. Puedes pasar secretos a los pods de Kubernetes con KubernetesPodOperator
.
Los secretos deben estar definidos en Kubernetes o el pod no se iniciará.
En este ejemplo, se muestran dos formas de usar los secretos de Kubernetes: como una variable de entorno y como un volumen activado por el pod.
El primer secreto, airflow-secrets
, se establece en una variable de entorno de Kubernetes llamada SQL_CONN
(en lugar de en una variable de entorno de Airflow o Cloud Composer).
El segundo secreto, service-account
, activa service-account.json
, un archivo con un token de cuenta de servicio, en /var/secrets/google
.
Los secretos se ven de la siguiente forma:
El nombre del primer secreto de Kubernetes se define en la variable secret
.
Este secreto específico se llama airflow-secrets
. Se expone como una variable de entorno, según lo determina el deploy_type
. La variable de entorno que establece, deploy_target
, es SQL_CONN
. Por último, el key
del secreto que se almacena en deploy_target
es sql_alchemy_conn
.
El nombre del segundo secreto de Kubernetes se define en la variable secret
.
Este secreto específico se llama service-account
. Se expone como un volumen, según lo determina el deploy_type
. La ruta del archivo que se activará, deploy_target
, es /var/secrets/google
. Por último, el key
del secreto que se almacena en deploy_target
es service-account.json
.
A continuación, te mostramos cómo se ve la configuración del operador:
Si no realizas ningún cambio en el DAG o en tu entorno, la tarea ex-kube-secrets
falla. Si observas los registros, la tarea falla debido a un error Pod took too long to start
. Este error se produce porque Airflow no puede encontrar el secreto especificado en la configuración, secret_env
.
gcloud
Para configurar el secreto con gcloud
, haz lo siguiente:
Obtén información sobre el clúster de tu entorno de Cloud Composer.
Ejecuta el siguiente comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre de tu entornoLOCATION
es la región en la que se encuentra el entorno de Cloud Composer.
El resultado de este comando usa el siguiente formato:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Para obtener el ID del clúster de GKE, copia el resultado después de
/clusters/
(termina en-gke
).
Conéctate a tu clúster de GKE con el siguiente comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Reemplaza lo siguiente:
CLUSTER_ID
por el ID del clúster de GKE.PROJECT
por el ID del proyecto de Google Cloud.LOCATION
es la región en la que se encuentra el entorno de Cloud Composer.
Crea secretos de Kubernetes
Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de
sql_alchemy_conn
entest_value
:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de
service-account.json
en una ruta local del archivo de claves de una cuenta de servicio llamadokey.json
:kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Después de configurar los secretos, vuelve a ejecutar la tarea
ex-kube-secrets
en la IU de Airflow.Verifica que la tarea
ex-kube-secrets
se realice correctamente.
Configuración completa
En este ejemplo, se muestran todas las variables que puedes configurar en KubernetesPodOperator
. No es necesario modificar el código para que la tarea ex-all-configs
se realice correctamente.
Para obtener detalles sobre cada variable, consulta la referencia KubernetesPodOperator
de Airflow.
Información sobre el proveedor de Kubernetes para CNCF
GKEStartPodOperator y KubernetesPodOperator se implementan en el proveedor apache-airflow-providers-cncf-kubernetes
.
Para conocer las notas de la versión fallidas del proveedor de Kubernetes para CNCF, consulta el sitio web del proveedor de Kubernetes para CNCF.
Versión 6.0.0
En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF, la conexión kubernetes_default
se usa de forma predeterminada en KubernetesPodOperator
.
Si especificaste una conexión personalizada en la versión 5.0.0, el operador seguirá usando esta conexión personalizada. Para volver a usar la conexión kubernetes_default
, es posible que desees ajustar tus DAG según corresponda.
Versión 5.0.0
Esta versión presenta algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Las más importantes están relacionadas con la conexión kubernetes_default
, que no se usa en la versión 5.0.0.
- Se debe modificar la conexión
kubernetes_default
. La ruta de acceso a la configuración de Kube se debe establecer en/home/airflow/composer_kube_config
(como se muestra en la Figura 1). Como alternativa, se debe agregarconfig_file
a la configuraciónKubernetesPodOperator
(como se muestra en el siguiente ejemplo de código).
- Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para obtener más información sobre la versión 5.0.0, consulta las Notas de la versión del proveedor de Kubernetes de CNCF.
Soluciona problemas
Sugerencias para solucionar problemas de error de pod
Además de verificar los registros de tareas en la IU de Airflow, comprueba los siguientes registros:
El resultado del programador y los trabajadores de Airflow:
En la consola de Google Cloud, ve a la página Entornos.
Sigue el vínculo de los DAG de tu entorno.
En el bucket de tu entorno, sube un nivel.
Revisa los registros en la carpeta
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Registros detallados de Pods, en la consola de Cloud en cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.
Códigos de retorno distintos de cero cuando también se usa el GKEStartPodOperator
Cuando se usa KubernetesPodOperator
y GKEStartPodOperator
, el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.
Un patrón común cuando se utiliza KubernetesPodOperator
y GKEStartPodOperator
es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.
Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e
en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.
Tiempos de espera de los pods
El tiempo de espera predeterminado de KubernetesPodOperator
es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds
cuando creas el KubernetesPodOperator
.
Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Los tiempos de espera de los Pods también pueden ocurrir cuando la cuenta de servicio de Cloud Composer no tiene los permisos de IAM necesarios para realizar la tarea a mano. Para verificar esto, observa los errores a nivel del Pod mediante los Paneles de GKE para ver los registros de tu carga de trabajo en particular o usa Cloud Logging.
No se pudo establecer una conexión nueva
La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para verificar si tu clúster se está actualizando, en la consola de Google Cloud, ve a la página Clústeres de Kubernetes y busca el ícono de carga junto al nombre del clúster de tu entorno.