Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En esta página, se describe cómo usar KubernetesPodOperator
para implementar
Pods de Kubernetes
de Cloud Composer a Google Kubernetes Engine
clúster que es parte de tu entorno de Cloud Composer y garantizar
de que tu entorno cuente con los recursos adecuados.
KubernetesPodOperator
inicios
Pods de Kubernetes
en el clúster de tu entorno. En comparación,
Los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un
que puede ser un clúster independiente que no esté relacionado con tu
en un entorno de nube. También puedes crear y borrar clústeres con
operadores de Google Kubernetes Engine.
KubernetesPodOperator
es una buena opción si necesitas lo siguiente:
- Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
- Dependencias binarias que no están disponibles en stock Imagen del trabajador de Cloud Composer.
En esta página, se explica un ejemplo de DAG de Airflow que incluye lo siguiente
Parámetros de configuración de KubernetesPodOperator
:
- Configuración mínima: establece solo los parámetros requeridos.
- Configuración de plantilla: utiliza parámetros que puedes agregar a una plantilla con Jinja.
Configuración de variables secretas: pasa un objeto secreto de Kubernetes al pod.
En Cloud Composer 3, la configuración de afinidad de Pods no está disponible. En cambio, usar los operadores de GKE para iniciar Pods en un clúster.
Configuración completa: incluye todas las configuraciones.
Antes de comenzar
En Cloud Composer 3, el clúster de tu entorno escala automáticamente. Cargas de trabajo adicionales que ejecutas con
KubernetesPodOperator
escalan de forma independiente de tu entorno. Tu entorno no se ve afectado por el aumento en la demanda de recursos, pero el clúster de tu entorno escala verticalmente según el recurso demanda. El precio de las cargas de trabajo adicionales que ejecutas en tu clúster sigue el Modelo de precios de Cloud Composer 3 y sus usos SKU de Cloud Composer 3.En Cloud Composer 3, el clúster de tu entorno se encuentra en el proyecto de usuario. Sin embargo, KubernetesPodOperator funciona de la misma manera, sin tener que realizar cambios en el código en comparación con Cloud Composer 2. Los pods se ejecutan en el clúster del entorno, en un espacio de nombres aislado, pero con acceso a tu red de VPC (si está habilitada).
Cloud Composer 3 usa clústeres de GKE con Federación de identidades para cargas de trabajo para GKE. De forma predeterminada, los Pods que se ejecutan en un espacio de nombres recién creado o en
composer-user-workloads
no pueden acceder a los recursos de Google Cloud. Cuando se usa Workload Identity Federation para GKE, las cuentas de servicio de Kubernetes asociadas con espacios de nombres deben asignarse a cuentas de servicio de Google Cloud para habilitar la autorización de identidad de servicio para las solicitudes a las APIs de Google y otros servicios.Debido a esto, si ejecutas Pods en el espacio de nombres
composer-user-workloads
o un espacio de nombres recién creado en el clúster de tu entorno, se debe Vinculaciones de IAM entre Kubernetes y Google Cloud no se crean cuentas de servicio y estos Pods no pueden acceder a los recursos de tu proyecto de Google Cloud.Si deseas que los Pods tengan acceso a los recursos de Google Cloud, Luego, usa el espacio de nombres
composer-user-workloads
o crea uno propio el espacio de nombres, tal como se describe más adelante.Para proporcionar acceso a los recursos de tu proyecto, sigue las instrucciones de la Federación de identidades para cargas de trabajo para GKE y configura las vinculaciones:
- Crea un espacio de nombres independiente en el clúster de tu entorno.
- Crea una vinculación entre la
cuenta de servicio de Kubernetes
composer-user-workloads/<namespace_name>
y la cuenta de servicio de tu entorno. - Agrega la anotación de la cuenta de servicio de tu entorno a la biblioteca de Kubernetes cuenta de servicio.
- Cuando uses
KubernetesPodOperator
, especifica el espacio de nombres cuenta de servicio de Kubernetes ennamespace
y Parámetrosservice_account_name
.
Cloud Composer 3 usa clústeres de GKE con Workload Identity. El servidor de metadatos de GKE toma unos segundos en comenzar a aceptar solicitudes en un Pod recién creado. Por lo tanto, los intentos de se autentican con Workload Identity en los primeros segundos de La vida del Pod podría fallar. Para obtener más información sobre esta limitación, consulta Restricciones de Workload Identity.
Cloud Composer 3 usa clústeres de Autopilot que presentan la noción de clases de procesamiento:
De forma predeterminada, si no se selecciona ninguna clase, la clase
general-purpose
se que se da por sentado cuando creas Pods conKubernetesPodOperator
.Cada clase se asocia con propiedades y límites de recursos específicos. Puedes leer sobre ellas en Documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase
general-purpose
pueden usar hasta 110 GiB de memoria.
Configuración de KubernetesPodOperator
Para continuar con este ejemplo, coloca todo el archivo kubernetes_pod_operator.py
en la carpeta dags/
de tu entorno o agrega el código relevante KubernetesPodOperator
a un DAG.
Las siguientes secciones explican cada configuración de KubernetesPodOperator
en el ejemplo. Para obtener información sobre cada variable de configuración, consulta la referencia de Airflow.
Configuración mínima
Para crear un KubernetesPodOperator
, solo se requieren el name
del Pod, el namespace
donde ejecutarlo, el image
que se usará y task_id
.
Cuando colocas el siguiente fragmento de código en un DAG, la configuración utiliza los valores predeterminados en /home/airflow/composer_kube_config
. No es necesario modificar el código para que la tarea pod-ex-minimum
se realice correctamente.
Configuración de la plantilla
Airflow admite el uso de plantillas de Jinja.
Debes declarar las variables obligatorias (task_id
, name
, namespace
y image
) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds
, arguments
, env_vars
y config_file
.
Sin cambiar el DAG ni tu entorno, la tarea ex-kube-templates
falla debido a dos errores. Los registros muestran que esta tarea está fallando porque la
no existe la variable correspondiente (my_value
). El segundo error, que
después de corregir el primer error, muestra que la tarea falla porque
core/kube_config
no se encuentra en config
.
Para corregir ambos errores, sigue los pasos que se describen a continuación.
Para configurar my_value
con gcloud
o la IU de Airflow:
IU de Airflow
En la IU de Airflow 2, haz lo siguiente:
Ve a la IU de Airflow.
En la barra de herramientas, selecciona Administrador > Variables.
En la página Variable de lista, haz clic en Agregar un registro nuevo.
En la página Agregar variable, ingresa la siguiente información:
- Key:
my_value
- Val:
example_value
- Key:
Haz clic en Guardar.
gcloud
Para Airflow 2, ingresa el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno
Para hacer referencia a un config_file
personalizado (un archivo de configuración de Kubernetes), anula la opción de configuración de Airflow kube_config
a una configuración de Kubernetes válida:
Sección | Clave | Valor |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Espera unos minutos a que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-kube-templates
y verifica que la tarea ex-kube-templates
se complete con éxito.
Configuración completa
En este ejemplo, se muestran todas las variables que puedes configurar en KubernetesPodOperator
. No es necesario modificar el código para que la tarea ex-all-configs
se realice correctamente.
Para obtener detalles sobre cada variable, consulta la referencia KubernetesPodOperator
de Airflow.
Información sobre el proveedor de Kubernetes para CNCF
GKEStartPodOperator y KubernetesPodOperator se implementan dentro del proveedor apache-airflow-providers-cncf-kubernetes
.
Para conocer las notas de la versión fallidas del proveedor de Kubernetes para CNCF, consulta el sitio web del proveedor de Kubernetes para CNCF.
Versión 6.0.0
En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF, la conexión kubernetes_default
se usa de forma predeterminada en KubernetesPodOperator
.
Si especificaste una conexión personalizada en la versión 5.0.0, el operador seguirá usándola. Para volver a usar kubernetes_default
en la conexión, es posible que desees
ajustar tus DAG según corresponda.
Versión 5.0.0
Esta versión incorpora algunos cambios incompatibles con versiones anteriores
en comparación con la versión 4.4.0. Las más importantes se relacionan con
la conexión kubernetes_default
, que no se usa en la versión 5.0.0.
- Se debe modificar la conexión
kubernetes_default
. Ruta de acceso de la configuración de Kube Debe establecerse en/home/airflow/composer_kube_config
(como se muestra en la Figura 1). Como alternativa, se debe agregarconfig_file
a la configuración deKubernetesPodOperator
(como se muestra en el siguiente código) ejemplo).
- Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para obtener más información sobre la versión 5.0.0, consulta las Notas de la versión del proveedor de Kubernetes de CNCF.
Soluciona problemas
Sugerencias para solucionar problemas de error de pod
Además de verificar los registros de tareas en la IU de Airflow, revisa también los siguientes registros:
El resultado del programador y los trabajadores de Airflow:
En la consola de Google Cloud, ve a la página Entornos.
Sigue el vínculo de los DAG de tu entorno.
En el bucket de tu entorno, sube un nivel.
Revisa los registros en la carpeta
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Registros detallados de Pods en la consola de Google Cloud en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.
Códigos de retorno distintos de cero cuando también se usa el GKEStartPodOperator
Cuando se usa KubernetesPodOperator
y GKEStartPodOperator
, el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.
Un patrón común cuando se utiliza KubernetesPodOperator
y GKEStartPodOperator
es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.
Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e
en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.
Tiempos de espera de los pods
El tiempo de espera predeterminado de KubernetesPodOperator
es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds
cuando creas el KubernetesPodOperator
.
Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Los tiempos de espera de Pods también pueden ocurrir Cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea en mano. Para verificar esto, observa los errores a nivel del Pod con el Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.
No se pudo establecer una conexión nueva
La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para verificar si tu clúster se está actualizando, en la consola de Google Cloud, ve a Clústeres de Kubernetes y busca el ícono de carga junto nombre del clúster del entorno.