Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En esta página, se describe cómo usar KubernetesPodOperator
para implementar
Pods de Kubernetes
de Cloud Composer a Google Kubernetes Engine
clúster que es parte de tu entorno de Cloud Composer y garantizar
de que tu entorno cuente con los recursos adecuados.
KubernetesPodOperator
inicios
Pods de Kubernetes
en el clúster de tu entorno. En comparación,
Los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un
que puede ser un clúster independiente que no esté relacionado con tu
en un entorno de nube. También puedes crear y borrar clústeres con
operadores de Google Kubernetes Engine.
KubernetesPodOperator
es una buena opción si necesitas lo siguiente:
- Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
- Dependencias binarias que no están disponibles en stock Imagen del trabajador de Cloud Composer.
En esta página, se explica un ejemplo de DAG de Airflow que incluye lo siguiente
Parámetros de configuración de KubernetesPodOperator
:
- Configuración mínima: establece solo los parámetros requeridos.
- Configuración de plantilla: utiliza parámetros que puedes agregar a una plantilla con Jinja.
Configuración de variables secretas: pasa un objeto secreto de Kubernetes al pod.
En Cloud Composer 2, la configuración de afinidad de Pods no está disponible. En cambio, usar los operadores de GKE para iniciar Pods en un clúster.
Configuración completa: incluye todas las configuraciones.
Antes de comenzar
En Cloud Composer 2, el clúster de tu entorno escala automáticamente. Cargas de trabajo adicionales que ejecutas con
KubernetesPodOperator
escalan de forma independiente de tu entorno. Tu entorno no se ve afectado por el aumento en la demanda de recursos, pero el clúster de tu entorno escala verticalmente según el recurso demanda. El precio de las cargas de trabajo adicionales que ejecutas en tu clúster sigue el Modelo de precios de Cloud Composer 2 y sus usos SKU de procesamiento de Cloud Composer.Cloud Composer 2 usa clústeres de GKE con Workload Identity De forma predeterminada, los Pods que se ejecutan en un espacio de nombres recién creado o en
composer-user-workloads
no pueden acceder a los recursos de Google Cloud. Al usar Workload Identity, las cuentas de servicio de Kubernetes asociadas espacios de nombres deben asignarse a cuentas de servicio de Google Cloud, para habilitar la autorización de identidad del servicio para las solicitudes a las APIs de Google y otras de Google Cloud.Debido a esto, si ejecutas Pods en el espacio de nombres
composer-user-workloads
o un espacio de nombres recién creado en el clúster de tu entorno, entonces Vinculaciones de IAM entre Kubernetes y Google Cloud no se crean cuentas de servicio y estos Pods no pueden acceder a los recursos de tu proyecto de Google Cloud.Si deseas que los Pods tengan acceso a los recursos de Google Cloud, Luego, usa el espacio de nombres
composer-user-workloads
o crea uno propio el espacio de nombres, tal como se describe más adelante.Para proporcionar acceso a los recursos de tu proyecto, sigue las instrucciones de Workload Identity y configura las vinculaciones:
- Crea un espacio de nombres separado en el clúster de tu entorno.
- Crea una vinculación entre
composer-user-workloads/<namespace_name>
cuenta de servicio de Kubernetes y la cuenta de servicio de tu entorno. - Agrega la anotación de la cuenta de servicio de tu entorno a la biblioteca de Kubernetes cuenta de servicio.
- Cuando uses
KubernetesPodOperator
, especifica el espacio de nombres cuenta de servicio de Kubernetes ennamespace
y Parámetrosservice_account_name
.
Cloud Composer 2 usa clústeres de GKE con carga de trabajo Identidad. El servidor de metadatos de GKE tarda unos segundos en comenzar a aceptar solicitudes en un Pod recién creado. Por lo tanto, los intentos de se autentican con Workload Identity en los primeros segundos de La vida del Pod podría fallar. Para obtener más información sobre esta limitación, consulta Restricciones de Workload Identity.
Cloud Composer 2 usa clústeres de Autopilot que introducen la noción de clases de procesamiento:
De forma predeterminada, si no se selecciona ninguna clase, la clase
general-purpose
se que se da por sentado cuando creas Pods conKubernetesPodOperator
.Cada clase se asocia con propiedades y límites de recursos específicos. Puedes leer sobre ellas en Documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase
general-purpose
pueden usar hasta 110 GiB de memoria.
Si se usa la versión 5.0.0 del proveedor de Kubernetes para CNCF, sigue las instrucciones sección del proveedor de Kubernetes de CNCF documentada.
Configuración de KubernetesPodOperator
Para continuar con este ejemplo, coloca todo el archivo kubernetes_pod_operator.py
en la carpeta dags/
de tu entorno o agrega el código relevante KubernetesPodOperator
a un DAG.
Las siguientes secciones explican cada configuración de KubernetesPodOperator
en el ejemplo. Para obtener información sobre cada variable de configuración, consulta la referencia de Airflow.
Configuración mínima
Para crear un KubernetesPodOperator
, solo el name
del Pod, namespace
donde
ejecutar el Pod, image
para usar y task_id
son obligatorios.
Cuando colocas el siguiente fragmento de código en un DAG, la configuración utiliza los valores predeterminados en /home/airflow/composer_kube_config
. No es necesario modificar el código para que la tarea pod-ex-minimum
se realice correctamente.
Configuración de la plantilla
Airflow admite el uso de plantillas de Jinja.
Debes declarar las variables obligatorias (task_id
, name
, namespace
y image
) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds
, arguments
, env_vars
y config_file
.
Sin cambiar el DAG ni tu entorno, la tarea ex-kube-templates
falla debido a dos errores. Los registros muestran que esta tarea está fallando porque la
no existe la variable correspondiente (my_value
). El segundo error, que
después de corregir el primer error, muestra que la tarea falla porque
core/kube_config
no se encuentra en config
.
Para corregir ambos errores, sigue los pasos detallados a continuación.
Para configurar my_value
con gcloud
o la IU de Airflow:
IU de Airflow
En la IU de Airflow 2, haz lo siguiente:
Ve a la IU de Airflow.
En la barra de herramientas, selecciona Administrador > Variables.
En la página Variable de lista, haz clic en Agregar un registro nuevo.
En la página Agregar variable, ingresa la siguiente información:
- Key:
my_value
- Val:
example_value
- Key:
Haz clic en Guardar.
gcloud
Para Airflow 2, ingresa el siguiente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno
Para hacer referencia a un config_file
(un archivo de configuración de Kubernetes) personalizado, haz lo siguiente:
anular la opción de configuración kube_config
de Airflow para una
configuración válida de Kubernetes:
Sección | Clave | Valor |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Espera unos minutos a que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-kube-templates
y verifica que la tarea ex-kube-templates
se complete con éxito.
Configuración de variables secretas
Un secreto de Kubernetes
es un objeto que contiene datos sensibles. Puedes pasar secretos a la
Pods de Kubernetes con KubernetesPodOperator
.
Los secretos deben estar definidos en Kubernetes o el pod no se iniciará.
En este ejemplo, se muestran dos formas de usar Secrets de Kubernetes: como entorno variable y como un volumen activado por el Pod.
El primer secreto, airflow-secrets
, se establece en una variable de entorno de Kubernetes llamada SQL_CONN
(en lugar de en una variable de entorno de Airflow o Cloud Composer).
El segundo secreto, service-account
, activa service-account.json
, un archivo con un token de cuenta de servicio, en /var/secrets/google
.
Los secretos se ven de la siguiente forma:
El nombre del primer secreto de Kubernetes se define en la variable secret
.
Este secreto específico se llama airflow-secrets
. Se expone como una variable de entorno, según lo determina el deploy_type
. Entorno
variable que establece, deploy_target
, es SQL_CONN
. Por último, el key
de la
que se almacena en deploy_target
es sql_alchemy_conn
.
El nombre del segundo secreto de Kubernetes se define en la variable secret
.
Este secreto específico se llama service-account
. Se expone como un volumen, según lo determina el deploy_type
. La ruta de acceso del archivo que se activará
deploy_target
, es /var/secrets/google
. Por último, el key
del secreto
que se almacena en deploy_target
es service-account.json
.
Así se ve la configuración del operador:
Sin hacer ningún cambio en el DAG ni en tu entorno
ex-kube-secrets
la tarea falla. Si observas los registros, la tarea falla debido a
un error Pod took too long to start
Este error se produce porque Airflow
no puede encontrar el secreto especificado en la configuración, secret_env
.
gcloud
Para configurar el secreto con gcloud
, haz lo siguiente:
Obtén información sobre el clúster de tu entorno de Cloud Composer.
Ejecuta el siguiente comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Reemplaza lo siguiente:
ENVIRONMENT
por el nombre de tu entornoLOCATION
es la región en la que se encuentra el entorno de Cloud Composer.
El resultado de este comando usa el siguiente formato:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
Para obtener el ID del clúster de GKE, copia el resultado después de
/clusters/
(termina en-gke
)
Conéctate a tu clúster de GKE con el siguiente comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Reemplaza lo siguiente:
CLUSTER_ID
por el ID del clúster de GKE.PROJECT
por el ID del proyecto de Google Cloud.LOCATION
es la región en la que se encuentra el entorno de Cloud Composer.
Crea secretos de Kubernetes
Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de
sql_alchemy_conn
entest_value
:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Ejecuta el siguiente comando para crear un secreto de Kubernetes que establezca el valor de
service-account.json
en una ruta local del archivo de claves de una cuenta de servicio llamadokey.json
:kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Después de configurar los secretos, vuelve a ejecutar la tarea
ex-kube-secrets
en IU de Airflow.Verifica que la tarea
ex-kube-secrets
se realice correctamente.
Configuración completa
En este ejemplo, se muestran todas las variables que puedes configurar en KubernetesPodOperator
. No es necesario modificar el código para que la tarea ex-all-configs
se realice correctamente.
Para obtener detalles sobre cada variable, consulta la referencia KubernetesPodOperator
de Airflow.
Información sobre el proveedor de Kubernetes para CNCF
GKEStartPodOperator y KubernetesPodOperator se implementan en
apache-airflow-providers-cncf-kubernetes
proveedor.
Para conocer las notas de la versión fallidas del proveedor de Kubernetes para CNCF, consulta el sitio web del proveedor de Kubernetes para CNCF.
Versión 6.0.0
En la versión 6.0.0 del paquete del proveedor de Kubernetes para CNCF,
se usa la conexión kubernetes_default
de forma predeterminada en
KubernetesPodOperator
Si especificaste una conexión personalizada en la versión 5.0.0, esta
todavía lo usa el operador. Para volver a usar kubernetes_default
en la conexión, es posible que desees
ajustar tus DAG según corresponda.
Versión 5.0.0
Esta versión incorpora algunos cambios incompatibles con versiones anteriores
en comparación con la versión 4.4.0. Las más importantes se relacionan con
la conexión kubernetes_default
, que no se usa en la versión 5.0.0.
- Se debe modificar la conexión
kubernetes_default
. Ruta de acceso de la configuración de Kube Debe establecerse en/home/airflow/composer_kube_config
(como se muestra en la Figura 1). Como alternativa, se debe agregarconfig_file
a la configuración deKubernetesPodOperator
(como se muestra en el siguiente código) ejemplo).
- Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para obtener más información sobre la versión 5.0.0, consulta las Notas de la versión del proveedor de Kubernetes de CNCF.
Soluciona problemas
Sugerencias para solucionar problemas de error de pod
Además de verificar los registros de tareas en la IU de Airflow, revisa también los siguientes registros:
El resultado del programador y los trabajadores de Airflow:
En la consola de Google Cloud, ve a la página Entornos.
Sigue el vínculo de los DAG de tu entorno.
En el bucket de tu entorno, sube un nivel.
Revisa los registros en
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
. carpeta.
Registros detallados de Pods en la consola de Google Cloud en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.
Códigos de retorno distintos de cero cuando también se usa el GKEStartPodOperator
Cuando se usa KubernetesPodOperator
y GKEStartPodOperator
, el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.
Un patrón común cuando se utiliza KubernetesPodOperator
y GKEStartPodOperator
es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.
Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e
en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.
Tiempos de espera de los pods
El tiempo de espera predeterminado de KubernetesPodOperator
es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds
cuando creas el KubernetesPodOperator
.
Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Los tiempos de espera de Pods también pueden ocurrir Cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea en mano. Para verificar esto, observa los errores a nivel del Pod con el Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.
No se pudo establecer una conexión nueva
La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para verificar si tu clúster se está actualizando, en la consola de Google Cloud, ve a Clústeres de Kubernetes y busca el ícono de carga junto nombre del clúster del entorno.