Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En esta guía, se muestra cómo escribir un grafo acíclico dirigido por Apache Airflow (DAG) que se ejecuta en un entorno de Cloud Composer.
Debido a que Apache Airflow no proporciona un DAG sólido ni un aislamiento de tareas, recomendamos que uses entornos de producción y de prueba separados para evitar la interferencia del DAG. Para obtener más información, consulta Probar DAG.
Estructura un DAG de Airflow
Un DAG de Airflow se define en un archivo de Python y se compone de lo siguiente componentes:
- Definición de DAG
- Operadores de Airflow
- Relaciones con los operadores
En los siguientes fragmentos de código, se muestran ejemplos de cada componente fuera de contexto.
Una definición de DAG
En el siguiente ejemplo, se muestra una definición de DAG de Airflow:
Operadores y tareas
Los operadores de Airflow describen el trabajo que se debe realizar. Una tarea es una instancia específica de un operador.
Relaciones de tareas
Las relaciones de tareas describen el orden en la que el trabajo debe completarse.
Ejemplo completo de flujo de trabajo de DAG en Python
El siguiente flujo de trabajo es una plantilla de DAG de trabajo completa que consta de
dos tareas: una tarea hello_python
y una goodbye_bash
:
Para obtener más información sobre la definición de DAG de Airflow, consulta el Instructivo de Airflow y Conceptos de Airflow.
Operadores de Airflow
En los siguientes ejemplos, se muestran algunos operadores populares de Airflow. Para obtener una referencia fidedigna de los operadores de Airflow, consulta la Referencia de operadores y hooks y el índice de proveedores.
BashOperator
Usa BashOperator para ejecutar programas de línea de comandos.
Cloud Composer ejecuta los comandos proporcionados en una secuencia de comandos de Bash en un Trabajador de Airflow. El trabajador es un contenedor de Docker basado en Debian que incluye varios paquetes.
- El comando
gcloud
, incluido el subcomandogcloud storage
para trabajar con buckets de Cloud Storage - Comando
bq
- Comando
kubectl
PythonOperator
Usa PythonOperator para ejecutar código de Python arbitrario.
Cloud Composer ejecuta el código de Python en un contenedor que incluye paquetes para la versión de imagen de Cloud Composer usada en tu entorno.
Para instalar paquetes adicionales de Python, consulta cómo instalar dependencias de Python.
Operadores de Google Cloud
Para ejecutar tareas que usan productos de Google Cloud, usa el operadores de Google Cloud Airflow. Por ejemplo: Operadores de BigQuery consultar y procesar datos en BigQuery.
Hay muchos más operadores de Airflow para Google Cloud y servicios individuales que proporciona Google Cloud. Consulta Operadores de Google Cloud para ver la lista completa.
EmailOperator
Usa el EmailOperator para enviar correos electrónicos desde un DAG. Para enviar correo electrónico desde un entorno de Cloud Composer, Configurar tu entorno para usar SendGrid.
Notificaciones sobre fallas del operador
Configura email_on_failure
como True
para enviar una notificación por correo electrónico cuando un operador del DAG falle. Para enviar notificaciones por correo electrónico desde un entorno de Cloud Composer, debes configurar tu entorno para usar SendGrid.
Lineamientos del flujo de trabajo de DAG
Coloca cualquier biblioteca de Python personalizada en el archivo ZIP de un DAG en un directorio anidado. No coloques bibliotecas en el nivel superior del directorio de DAG.
Cuando Airflow escanea la carpeta
dags/
, Airflow solo busca DAG en los módulos de Python que se encuentran en el nivel superior de la carpeta de DAG y en el nivel superior de un archivo ZIP ubicado también en la carpetadags/
de nivel superior. Si Airflow encuentra un módulo de Python en un archivo ZIP que no contiene las substringsairflow
yDAG
, Airflow deja de procesar el archivo ZIP. Airflow solo muestra los DAG encontrados hasta ese momento.Para la tolerancia a errores, no definas varios objetos DAG en el mismo módulo de Python.
No uses subDAG. En cambio, Agrupar tareas dentro de los DAG.
Coloca los archivos que se requieren en el tiempo de análisis de DAG en la carpeta
dags/
, no en la carpetadata/
.Prueba DAG desarrollados o modificados según se recomienda en instrucciones para probar los DAG.
La herramienta de la CLI de desarrollo local de Composer optimiza Desarrollo del DAG de Apache Airflow para Cloud Composer 2 a través de la ejecución de un Airflow de forma local. Este entorno local de Airflow usa una imagen de un una versión específica de Cloud Composer 2.
Verifica que los DAG desarrollados no aumenten Demasiados tiempos de análisis de DAG.
Las tareas de Airflow pueden fallar por varios motivos. Para evitar fallas de todas las ejecuciones de DAG, recomendamos habilitar los reintentos de tareas. Si estableces la cantidad máxima de reintentos en
0
, no se realizará ningún reintento.Le recomendamos que anule los la opción
default_task_retries
con un valor para reintentos de tarea diferentes a0
. Además, puedes establecer el parámetroretries
a nivel de la tarea.Si quieres usar GPU en tus tareas de Airflow, crea un clúster de GKE independiente basado en nodos que usen máquinas con GPUs. Usa GKEStartPodOperator para ejecutar tus tareas.
Evita ejecutar tareas que consumen mucha memoria y CPU en el grupo de nodos del clúster, donde se ejecuten otros componentes de Airflow (programadores, trabajadores, servidores web). En su lugar, usa KubernetesPodOperator o GKEStartPodOperator.
Cuando implementes DAG en un entorno, sube solo los archivos que son absolutamente necesarias para interpretar y ejecutar DAG en la carpeta
/dags
.Limita la cantidad de archivos DAG en la carpeta
/dags
.Airflow analiza continuamente los DAG en la carpeta
/dags
. El análisis es un que recorre en bucle la carpeta DAG y la cantidad de archivos que que deben cargarse (con sus dependencias) afecta el rendimiento del análisis del DAG y la programación de tareas. Es mucho más eficiente usar 100 archivos con 100 DAG cada uno que 10,000 archivos con 1 DAG cada uno, por lo que se recomienda esa optimización. Esta optimización es un equilibrio entre el tiempo de análisis y la eficiencia de la creación y administración de DAG.También puedes considerar, por ejemplo, implementar 10,000 archivos DAG que podrías crear 100 archivos ZIP, cada uno con 100 archivos DAG.
Además de las sugerencias anteriores, si tienes más de 10,000 archivos DAG, generar DAG de manera programática podría ser una buena opción. Por ejemplo: puede implementar un solo archivo DAG de Python que genere Objetos del DAG (por ejemplo, 20, 100 objetos DAG).
Preguntas frecuentes sobre la escritura de DAG
¿Cómo minimizo la repetición de código si quiero ejecutar las mismas tareas o tareas similares en varios DAG?
Sugerimos definir bibliotecas y wrappers para minimizar la repetición de código.
¿Cómo vuelvo a usar el código entre los archivos DAG?
Coloca tus funciones de utilidad en un
biblioteca local de Python
e importarás las funciones. Puedes hacer referencia a las funciones en cualquier DAG que se encuentre
en la carpeta dags/
del bucket de tu entorno.
¿Cómo minimizo el riesgo de que surjan definiciones diferentes?
Por ejemplo, tienes dos equipos que desean agregar datos sin procesar a las métricas de ingresos. Los equipos escriben dos tareas con pequeñas diferencias entre sí que logran lo mismo. Se deben definir bibliotecas para trabajar con los datos de ingresos, de modo que los implementadores de DAG aclaren la definición de ingresos que se agrega.
¿Cómo configuro las dependencias entre los DAG?
Esto dependerá de cómo deseas definir la dependencia.
Si tienes dos DAG (DAG A y DAG B) y quieres que el DAG B se active después del DAG A, puedes poner un TriggerDagRunOperator
al final del DAG A.
Si el DAG B solo depende de un artefacto generado por el DAG A, como un mensaje de Pub/Sub, es posible que un sensor funcione mejor.
Si el DAG B se integra perfectamente al DAG A, es posible que puedas combinar los dos DAG en uno.
¿Cómo transfiero los ID de ejecución únicos a un DAG y sus tareas?
Por ejemplo, deseas transmitir nombres de clústeres de Dataproc y rutas de archivos.
Puedes generar un ID único aleatorio si muestras str(uuid.uuid4())
en
un elemento PythonOperator
. Esto coloca el ID en XComs
para que puedas consultar el ID en otros operadores a través de campos de plantillas.
Antes de generar un uuid
, considera si un ID específico de DagRun sería
más valiosos. También puedes hacer referencia a estos IDs en sustituciones de Jinja con
con macros.
¿Cómo separo las tareas en un DAG?
Cada tarea debe ser una unidad de trabajo idempotente. Por lo tanto, debes evitar encapsular un flujo de trabajo de varios pasos en una sola tarea, como un programa complejo que se ejecute en un PythonOperator
.
¿Debo definir varias tareas en un solo DAG para agregar datos de varias fuentes?
Por ejemplo, tienes varias tablas con datos sin procesar y quieres crear agregados diarios para cada una. Las tareas no dependen una de otra. ¿Debes crear una tarea y un DAG para cada tabla o crear un DAG general?
Si estás de acuerdo con que cada tarea comparta las mismas propiedades a nivel del DAG, como schedule_interval
, entonces tiene sentido definir varias tareas en un solo DAG. De lo contrario, para minimizar la repetición de código, se pueden generar varios DAG a partir de un único módulo de Python colocándolos en globals()
del módulo.
¿Cómo puedo limitar la cantidad de tareas simultáneas que se ejecutan en un DAG?
Por ejemplo, quieres evitar superar las cuotas o los límites de uso de la API o evitar ejecutar demasiados procesos simultáneos.
Puedes definir Grupos de Airflow en la IU web de Airflow y tareas asociadas con grupos existentes en tus DAG.
Preguntas frecuentes sobre el uso de operadores
¿Debo usar DockerOperator
?
No recomendamos usar
el DockerOperator
, a menos que se use para iniciar
contenedores en una instalación remota de Docker (no en la
clúster). En un entorno de Cloud Composer, el operador no tiene
acceso a daemons de Docker.
En su lugar, usa KubernetesPodOperator
o
GKEStartPodOperator
Estos operadores inician pods de Kubernetes en clústeres de Kubernetes o de GKE, respectivamente. Ten en cuenta que no recomendamos el lanzamiento de pods en el clúster de un entorno, ya que esto puede generar competencia en los recursos.
¿Debo usar SubDagOperator
?
No recomendamos usar SubDagOperator
.
Usa las alternativas que se sugieren en Cómo agrupar tareas.
¿Debería ejecutar código de Python solo en PythonOperators
para separar completamente los operadores de Python?
Según tu objetivo, tienes algunas opciones.
Si tu única preocupación es mantener dependencias independientes de Python, puedes usar PythonVirtualenvOperator
.
Considera usar KubernetesPodOperator
. Este operador te permite
para definir Pods de Kubernetes y ejecutarlos en otros clústeres.
¿Cómo agrego paquetes binarios personalizados o que no son de PyPI?
Puedes instalar paquetes alojados en repositorios de paquetes privados.
¿Cómo transmito uniformemente los argumentos a un DAG y sus tareas?
Puedes usar la compatibilidad integrada de Airflow para lo siguiente: Plantillas de Jinja para pasar argumentos que se pueden usar en campos con plantillas.
¿Cuándo ocurre la sustitución de plantilla?
La sustitución de plantilla ocurre en los trabajadores de Airflow justo antes de que se llame a la función pre_execute
de un operador. En la práctica, esto significa que las plantillas no se sustituyen hasta justo antes de que se ejecute una tarea.
¿Cómo puedo saber qué argumentos del operador admiten la sustitución de plantillas?
Los argumentos del operador que admiten la sustitución de plantillas de Jinja2 se marcan explícitamente como tales.
Busca el campo template_fields
en la definición del operador.
que contiene una lista de nombres de argumentos que se someten a la sustitución de plantillas.
Por ejemplo, consulta BashOperator
, que admite plantillas para los argumentos bash_command
y env
.
¿Qué sigue?
- Soluciona problemas de los DAG
- Programador de solución de problemas
- Operadores de Google
- Operadores de Google Cloud
- Instructivo de Apache Airflow