Escribe DAG (flujos de trabajo)

En esta guía, se muestra cómo escribir un grafo acíclico dirigido por Apache Airflow (DAG) que se ejecuta en un entorno de Cloud Composer.

Estructura un DAG

Un DAG de Airflow se define en un archivo de Python y está compuesto por los siguientes componentes: una definición de DAG, operadores y relaciones de operador. En los siguientes fragmentos de código, se muestran ejemplos de cada componente fuera de contexto:

  1. Una definición de DAG

    Airflow 2

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

    Airflow 1

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

  2. Operadores para describir el trabajo que se debe realizar. Una creación de instancias de un operador se denomina tarea.

    Airflow 2

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

    Airflow 1

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

  3. Relaciones de tareas para describir el orden en el que se debe completar el trabajo

    Airflow 2

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

    Airflow 1

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

El siguiente flujo de trabajo es un ejemplo de trabajo completo y consta de dos tareas: una tarea hello_python y una goodbye_bash:

Airflow 2

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Consulta el instructivo de Airflow y los conceptos de Airflow para obtener más información sobre cómo definir los DAG de Airflow.

Operadores

En los siguientes ejemplos, se muestran algunos operadores populares de Airflow. Para obtener una referencia autorizada de operadores de Airflow, consulta laReferencia de la API de Apache Airflow o explora el código fuente denúcleo ,contribuir yproveedores operadores.

BashOperator

Usa BashOperator para ejecutar programas de línea de comandos.

Airflow 2

from airflow.operators import bash
    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f'bq ls {bq_dataset_name} || bq mk {bq_dataset_name}')

Airflow 1

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer ejecuta los comandos proporcionados en una secuencia de comandos de Bash en un trabajador. El trabajador es un contenedor de Docker basado en Debian que incluye varios paquetes.

PythonOperator

Usa PythonOperator para ejecutar código de Python arbitrario.

Cloud Composer ejecuta el código de Python en un contenedor que incluye paquetes para la versión de imagen de Cloud Composer usada en tu entorno.

Para instalar paquetes adicionales de Python, consulta Instala dependencias de Python.

Operadores de Google Cloud

Usa los operadores de Airflow de Google Cloud para ejecutar tareas que usen productos de Google Cloud. Cloud Composer configura automáticamente una conexión de Airflow en el proyecto del entorno.

EmailOperator

Usa el EmailOperator para enviar correos electrónicos desde un DAG. Para enviar correos electrónicos desde un entorno de Cloud Composer, debes configurar tu entorno para usar SendGrid.

Airflow 2

from airflow.operators import email
    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Airflow 1

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Notificaciones

Configura email_on_failure como True para enviar una notificación por correo electrónico cuando un operador del DAG falle. Para enviar notificaciones por correo electrónico desde un entorno de Cloud Composer, debes configurar tu entorno para usar SendGrid.

Airflow 2

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': project_id
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Airflow 1

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Lineamientos

  1. Coloca cualquier biblioteca de Python personalizada en el archivo ZIP de un DAG en un directorio anidado. No coloques bibliotecas en el nivel superior del directorio de DAG.

    Cuando Airflow escanea la carpeta dags/, Airflow solo busca DAG en los módulos de Python que se encuentran en el nivel superior de la carpeta de DAG y en el nivel superior de un archivo ZIP ubicado también en la carpeta dags/ de nivel superior. Si Airflow encuentra un módulo de Python en un archivo ZIP que no contiene las substrings airflow y DAG, Airflow deja de procesar el archivo ZIP. Airflow solo muestra los DAG encontrados hasta ese momento.

  2. Para la tolerancia a errores, no definas varios objetos DAG en el mismo módulo de Python.

  3. No definas subDAG como objetos de nivel superior.

    En general, Airflow selecciona objetos DAG en el espacio de nombres global de un módulo en el directorio dags/ como DAG de nivel superior. Los subDags definidos como objetos de nivel superior se ejecutan en sus propias programaciones además de las programaciones de otros DAG que incorporan los subDags.

  4. Coloca los archivos que se requieren en el tiempo de análisis de DAG en el directorio dags/ que no está en el directorio data/. El directorio data/ no está activado en el servidor web.

Preguntas frecuentes para escribir DAG

¿Cómo puedo minimizar la repetición de código si quiero ejecutar las mismas tareas o tareas similares en varios DAG?

Sugerimos definir bibliotecas y wrappers para minimizar la repetición de código.

¿Cómo vuelvo a usar el código entre los archivos DAG?

Coloca las funciones de utilidad en una biblioteca local de Python y, luego, importa las funciones. Puedes hacer referencia a las funciones en cualquier DAG ubicado en la carpeta dags/ del bucket de tu entorno.

¿Cómo minimizo el riesgo de que surjan definiciones diferentes?

Por ejemplo, tienes dos equipos que desean agregar datos sin procesar en las métricas de ingresos. Los equipos escriben dos tareas ligeramente diferentes que logran lo mismo. Define las bibliotecas para trabajar con los datos de ingresos, de modo que los implementadores de DAG deben aclarar la definición de ingresos que se agrega.

¿Cómo configuro las dependencias entre los DAG?

Esto depende de cómo desees definir la dependencia.

Si tienes dos DAG (DAG A y DAG B) y quieres que el DAG B se active después del DAG A, puedes colocar un TriggerDagRunOperator al final del Dag A ,

Si el DAG B solo depende de un artefacto que genera el DAG A, como un mensaje de Pub/Sub, entonces un sensor podría funcionar mejor.

Si el DAG B está integrado estrechamente con el DAG A, es posible que puedas combinar los dos DAG en un DAG.

¿Cómo transfiero los ID de ejecución únicos a un DAG y sus tareas?

Por ejemplo, deseas pasar nombres de clústeres de Dataproc y rutas de archivos.

Puedes generar un ID único aleatorio si muestras str(uuid.uuid4()) en un PythonOperator. Esto coloca el ID en XComs para que puedas hacer referencia al ID en otros operadores a través de campos con plantillas.

Antes de generar un uuid, considera si un ID específico de DagRun sería más valioso. También puedes hacer referencia a estos ID en sustituciones de Jinja mediante macros.

¿Cómo puedo separar las tareas de un DAG?

Cada tarea debe ser una unidad de trabajo idempotente. Por lo tanto, debes evitar encapsular un flujo de trabajo de varios pasos en una sola tarea, como un programa complejo que se ejecuta en un PythonOperator.

El mecanismo nativo de Airflow para reutilizar el código de definición de flujo de trabajo es el SubDagOperator. Sin embargo, hay advertencias para el uso de este operador en Cloud Composer.

¿Debo definir varias tareas en un solo DAG para agregar datos de varias fuentes?

Por ejemplo, tienes varias tablas con datos sin procesar y deseas crear agregados diarios para cada una. Las tareas no dependen una de otra. ¿Debes crear una tarea y un DAG para cada tabla o crear un DAG general?

Si aceptas que cada tarea comparta las mismas propiedades a nivel del DAG, como schedule_interval, tiene sentido definir varias tareas en un solo DAG. De lo contrario, para minimizar la repetición de código, se pueden generar varios DAG desde un solo módulo de Python colocándolos en el globals() del módulo.

¿Cómo puedo limitar la cantidad de tareas simultáneas que se ejecutan en un DAG?

Por ejemplo, deseas evitar exceder las limitaciones o las cuotas de uso de la API, o evitar ejecutar demasiados procesos simultáneos.

Puedes definir grupos de Airflow en la IU web de Airflow y asociar tareas con grupos existentes en tus DAG.

Preguntas frecuentes sobre el uso de operadores

¿Debo usar DockerOperator?

No recomendamos usar DockerOperator, a menos que se use para iniciar contenedores en una instalación remota de Docker (no dentro del clúster de un entorno). En un entorno de Cloud Composer, el operador no tiene acceso a los daemons de Docker.

En su lugar, usa KubernetesPodOperator o GKEPodOperator. Estos operadores pueden iniciar Pods de Kubernetes en clústeres de Kubernetes o GKE, respectivamente. Tenga en cuenta que no recomendamos iniciar los Pods en el clúster de un entorno, ya que esto puede generar una competencia de recursos.

¿Debo usar SubDagOperator?

No recomendamos usar SubDagOperator.

Aunque SubDagOperator puede proporcionar encapsulamiento, las tareas de SubDag requieren una ranura de tareas. Si un trabajador de Airflow que ejecuta la tarea de SubDag falla, todas las tareas dentro de SubDag fallarán, lo que generará flujos de trabajo poco confiables.

¿Debo ejecutar el código de Python solo en PythonOperators para separar los operadores de Python por completo?

Según su objetivo, tiene algunas opciones.

Si tu única preocupación es mantener dependencias de Python independientes, puedes usar PythonVirtualenvOperator.

Considera usar la KubernetesPodOperator. que le permite definir Pods de Kubernetes y ejecutar los Pods en otros clústeres

¿Cómo utilizo KubernetesPodOperator fuera de Google Cloud?

Puedes activar un archivo de configuración que especifique cómo autenticar con el clúster de GKE y colocar el archivo en la carpeta /data en el bucket de tu entorno.

Esta carpeta está activada en el entorno de Cloud Composer.

¿Cómo agrego paquetes binarios personalizados o que no son de PyPI?

Puedes instalar paquetes alojados en repositorios de paquetes privados.

También puedes usar la KubernetesPodOperator para ejecutar un pod de Kubernetes con tu propia imagen compilada con paquetes personalizados.

¿Cómo transmito uniformemente los argumentos a un DAG y sus tareas?

Puedes usar la compatibilidad integrada de Airflow para la plantilla de Jinja a fin de pasar argumentos que se pueden usar en campos con plantillas.

¿Cuándo se realiza la sustitución de plantillas?

La sustitución de plantillas se produce en los trabajadores de Airflow justo antes de llamar a la función pre_execute de un operador. En la práctica, esto significa que las plantillas no se sustituyen hasta justo antes de que se ejecute una tarea.

¿Cómo puedo saber qué argumentos del operador admiten la sustitución de plantillas?

Los argumentos del operador que admiten la sustitución de plantillas de Jinja2 se marcan de manera explícita como tales.

Busca el campo template_fields en la definición del operador, que contiene una lista de nombres de argumentos que se someterán a la sustitución de plantillas.

Por ejemplo, consulta BashOperator, que admite plantillas para los argumentos bash_command y env.

¿Qué sigue?