Como escrever DAGs (fluxos de trabalho)

Neste guia, você aprende a escrever um gráfico acíclico direcionado (DAG, na sigla em inglês) do Apache Airflow, que é executado em um ambiente do Cloud Composer.

Como estruturar um DAG

Um DAG do Airflow é definido em um arquivo Python e composto pelos seguintes elementos: definição, operadores e relações de operador. Nos snippets de código a seguir, você vê exemplos de cada elemento fora de contexto:

  1. Uma definição de DAG.

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
  2. Os operadores, para descrever o trabalho a ser feito. A instanciação de um operador é chamada de tarefa.

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')
  3. As relações de operador, para descrever a ordem em que o trabalho será concluído.

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

O fluxo de trabalho a seguir é um exemplo completo de trabalho composto de duas tarefas, hello_python e goodbye_bash:

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Consulte o tutorial e os conceitos do Airflow para mais informações sobre a definição de DAGs.

Operadores

Nos exemplos a seguir, você vê alguns operadores conhecidos do Airflow. Para ter uma referência autoritativa deles, consulte a referência da API do Apache Airflow (em inglês) ou veja o código-fonte dos operadores core e contrib (em inglês).

BashOperator

Use o BashOperator para executar programas da linha de comando.

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

O Cloud Composer executa os comandos fornecidos em um script "Bash" em um worker. O worker é um contêiner do Docker baseado no Debian e inclui vários pacotes.

PythonOperator

Use o PythonOperator para executar o código arbitrário do Python. O Cloud Composer executa o código Python em um contêiner que inclui os seguintes pacotes:

Para instalar mais pacotes Python, consulte Como instalar dependências do Python.

Operadores do Google Cloud Platform

Use os operadores do Google Cloud Platform para Airflow para executar tarefas que utilizem produtos do Google Cloud Platform. O Cloud Composer configura automaticamente uma conexão do Airflow no projeto do ambiente.

EmailOperator

Use o EmailOperator para enviar e-mails de um DAG. Para enviar e-mails de um ambiente do Cloud Composer, você precisa configurar o ambiente para usar o SendGrid.

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Notificações

Defina email_on_failure como True para enviar uma notificação por e-mail quando um operador no DAG falhar. Para enviar notificações por e-mail de um ambiente do Cloud Composer, você precisa configurar o ambiente para usar o SendGrid.

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Perguntas frequentes

Como criar DAGs

Como usar operadores

A seguir

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…