Como escrever DAGs (fluxos de trabalho)

Neste guia, você aprende a escrever um gráfico acíclico direcionado (DAG, na sigla em inglês) do Apache Airflow, que é executado em um ambiente do Cloud Composer.

Como estruturar um DAG

Um DAG do Airflow é definido em um arquivo Python e composto pelos seguintes elementos: definição, operadores e relações de operador. Nos snippets de código a seguir, você vê exemplos de cada elemento fora de contexto:

  1. Uma definição de DAG (em inglês).

    Airflow 2

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

    Airflow 1

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

  2. Os operadores, para descrever o trabalho a ser feito. A instanciação de um operador é chamada de tarefa (em inglês).

    Airflow 2

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

    Airflow 1

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

  3. Relações de tarefa para descrever a ordem em que o trabalho será concluído.

    Airflow 2

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

    Airflow 1

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

O fluxo de trabalho a seguir é um exemplo completo de trabalho e é composto por duas tarefas: uma tarefa hello_python e uma goodbye_bash:

Airflow 2

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Consulte o tutorial e os conceitos do Airflow para mais informações sobre a definição de DAGs.

Operadores

Nos exemplos a seguir, você vê alguns operadores conhecidos do Airflow. Para ter uma referência autoritativa deles, consulte a referência da API Apache Airflow (em inglês) ou veja o código-fonte dos operadores core, contrib e providers.

BashOperator

Use o BashOperator para executar programas da linha de comando.

Airflow 2

from airflow.operators import bash
    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f'bq ls {bq_dataset_name} || bq mk {bq_dataset_name}')

Airflow 1

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

O Cloud Composer executa os comandos fornecidos em um script "Bash" em um worker. O worker é um contêiner do Docker baseado no Debian e inclui vários pacotes.

PythonOperator

Use o PythonOperator para executar o código arbitrário do Python.

O Cloud Composer executa o código do Python em um contêiner que inclui pacotes da versão de imagem do Cloud Composer usada no seu ambiente.

Para instalar mais pacotes Python, consulte Como instalar dependências do Python.

Operadores do Google Cloud

Use os operadores do Google Cloud Airflow para executar tarefas que usam produtos do Google Cloud. O Cloud Composer configura automaticamente uma conexão do Airflow no projeto do ambiente.

EmailOperator

Use o EmailOperator (em inglês) para enviar e-mails de um DAG. Para enviar e-mails de um ambiente do Cloud Composer, você precisa configurar o ambiente para usar o SendGrid.

Airflow 2

from airflow.operators import email
    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Airflow 1

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Notificações

Defina email_on_failure como True para enviar uma notificação por e-mail quando um operador no DAG falhar. Para enviar notificações por e-mail de um ambiente do Cloud Composer, você precisa configurar o ambiente para usar o SendGrid.

Airflow 2

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': project_id
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Airflow 1

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Diretrizes

  1. Coloque as bibliotecas Python personalizadas em um arquivo ZIP do DAG em um diretório aninhado. Não inclua bibliotecas no nível superior do diretório de DAGs.

    Quando o Airflow verifica a pasta dags/, o Airflow verifica somente DAGs nos módulos Python que estão no nível superior da pasta DAGs e no nível superior de um arquivo ZIP localizado também na pasta de nível superior dags/. Se o Airflow encontrar um módulo Python em um arquivo ZIP que não contenha substrings airflow e DAG, o Airflow interromperá o processamento do arquivo ZIP. O Airflow retorna apenas os DAGs encontrados até esse ponto.

  2. Para ter tolerância a falhas, não defina vários objetos de DAG no mesmo módulo do Python.

  3. Não defina subDAGs como objetos de nível superior.

    Em geral, o Airflow pega objetos DAG no namespace global de um módulo no diretório dags/ como DAGs de nível superior. Todos os subDAGs definidos como objetos de nível superior são executados de acordo com as próprias programações, além dos agendamentos de outros DAGs que incorporam os subDAGs.

  4. Coloque arquivos que são necessários no tempo de análise do DAG no diretório dags/ que não está no diretório data/. O diretório data/ não está montado no servidor da Web.

Perguntas frequentes sobre como escrever DAGs

Como diminuir a repetição de código para executar tarefas iguais ou semelhantes em vários DAGs?

É recomendável definir bibliotecas e wrappers para diminuir a repetição de código.

Como faço para reutilizar código entre arquivos de DAGs?

Coloque as funções utilitárias em uma biblioteca local do Python e as importe. É possível referenciar as funções em qualquer DAG localizado na pasta dags/ no bucket do ambiente.

Como diminuo o risco de surgirem diferentes definições?

Por exemplo, tenho duas equipes que querem agregar dados brutos em métricas de receita. Elas escrevem duas tarefas ligeiramente diferentes que fazem a mesma coisa. Defina bibliotecas para trabalhar com os dados de receita. Assim, os implementadores de DAGs precisam esclarecer a definição da receita que está sendo agregada.

Como faço para definir dependências entre DAGs?

Isso depende de como você quer definir a dependência.

Se você tem dois DAGs (DAG A e DAG B) e quer que o DAG B seja acionado após o DAG A, é possível colocar um TriggerDagRunOperator no final do DAG A.

Se o DAG B depender somente de um artefato gerado pelo DAG A, como uma mensagem do Pub/Sub, um sensor funcionará melhor.

Se o DAG B estiver muito integrado ao DAG A, talvez seja possível mesclar os dois em um único DAG.

Como transmito códigos exclusivos de execução para um DAG e às tarefas dele?

Por exemplo, quero transmitir caminhos de arquivo e nomes de cluster do Dataproc.

É possível retornar str(uuid.uuid4()) em PythonOperator para gerar um ID exclusivo aleatório. Isso coloca o ID em XComs para que você possa consultá-lo em outros operadores por meio de campos com modelo.

Antes de gerar um uuid, considere se um ID específico do DagRun seria mais importante. Também é possível referenciar esses IDs nas substituições por meio de macros.

Como separar tarefas em um DAG?

Cada tarefa precisa ser uma unidade de trabalho idempotente. Consequentemente, evite encapsular um fluxo de trabalho de várias etapas em uma única tarefa, como um programa complexo em execução em um PythonOperator.

O mecanismo nativo do Airflow para reutilizar o código de definição de fluxo de trabalho é SubDagOperator. No entanto, há ressalvas ao usar esse operador no Cloud Composer.

É recomendado definir várias tarefas em um único DAG para agregar dados de várias origens?

Por exemplo, tenho várias tabelas com dados brutos e quero criar agregações diárias para cada uma delas. As tarefas não são dependentes entre si. Preciso criar uma tarefa e um DAG para cada tabela ou gerar um DAG geral?

Se estiver tudo bem para você que cada tarefa tenha as mesmas propriedades no nível do DAG, como schedule_interval, defina várias tarefas em um único DAG. Caso contrário, para diminuir a repetição de código, é possível gerar muitos DAGs em um único módulo Python. Basta colocá-los nos globals() do módulo.

Como limitar o número de tarefas simultâneas em execução em um DAG?

Por exemplo, quero evitar exceder os limites/cotas de uso da API ou impedir a execução de muitos processos simultâneos.

É possível definir pools do Airflow na interface de usuário da Web do Airflow e associar tarefas a pools atuais nos DAGs.

Perguntas frequentes sobre o uso de operadores

Devo usar DockerOperator?

Não recomendamos o uso de DockerOperator, a menos que ele seja usado para iniciar contêineres em uma instalação remota do Docker (ou seja, em nenhum lugar do cluster de um ambiente). Em um ambiente do Cloud Composer, o operador não tem acesso aos daemons do Docker.

Em vez disso, use KubernetesPodOperator ou GKEPodOperator. Esses operadores podem iniciar pods do Kubernetes em clusters do Kubernetes ou do GKE, respectivamente. Não é recomendável iniciar pods no cluster de um ambiente, já que isso pode levar à concorrência de recursos.

Devo usar SubDagOperator?

Não recomendamos o uso de SubDagOperator.

Ainda que o SubDagOperator possa fornecer o encapsulamento, as tarefas SubDag exigem um slot para tarefas. Em caso de falha em um worker do Airflow que está executando uma tarefa SubDag, todas as tarefas no SubDag falham e resultam em fluxos de trabalho não confiáveis.

Posso executar o código do Python somente em PythonOperators para separar completamente os operadores Python?

Dependendo do seu objetivo, você tem algumas opções.

Se a única preocupação for manter dependências separadas do Python, use PythonVirtualenvOperator.

Considere usar KubernetesPodOperator. Esse operador permite definir pods do Kubernetes e executá-los em outros clusters.

Como usar o KubernetesPodOperator fora do Google Cloud?

É possível montar um arquivo de configuração que especifica como autenticar com o cluster do GKE e colocar o arquivo na pasta /data no bucket do ambiente.

Essa pasta é ativada no ambiente do Cloud Composer.

Como adiciono pacotes personalizados não PyPI ou binários?

É possível instalar pacotes hospedados em repositórios de pacotes particulares.

Também é possível usar o KubernetesPodOperator para executar um pod do Kubernetes com sua própria imagem criada com pacotes personalizados.

Como faço para transmitir argumentos uniformemente para um DAG e as tarefas dele?

Use o suporte integrado do Airflow para modelos Jinja para transmitir argumentos que podem ser usados em campos com modelo.

Quando ocorre a substituição do modelo?

A substituição de modelo ocorre nos workers do Airflow pouco antes da função pre_execute de um operador ser chamada. Na prática, isso significa que os modelos não são substituídos até um pouco antes de uma tarefa ser executada.

Como sei quais argumentos do operador são compatíveis com a substituição do modelo?

Os argumentos do operador que são compatíveis com a substituição do modelo Jinja2 são explicitamente marcados como tal.

Procure o campo template_fields na definição Operador, que contém uma lista de nomes de argumentos que passarão por substituição de modelo.

Por exemplo, consulte o BashOperator, que é compatível com modelos para os argumentos bash_command e env.

A seguir