Writing DAGs (workflows)

This guide shows you how to write an Apache Airflow directed acyclic graph (DAG) that runs in a Cloud Composer environment.

Structuring a DAG

An Airflow DAG is defined in a Python file and is composed of the following components: A DAG definition, operators, and operator relationships.

  1. A DAG definition.

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

  2. Operators to describe the work to be done. An instantiation of an operator is called a task.

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

  3. Operator relationships to describe the order in which the work should be completed.

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

For example, the following workflow is composed of two tasks: a hello_python task and a goodbye_bash task.

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

See the Airflow tutorial and Airflow concepts for more information on defining Airflow DAGs.

Operators

The following examples show a few popular Airflow operators. For an authoritative reference of Airflow operators, see the Apache Airflow API Reference or browse the source code of the core and contrib operators.

BashOperator

Use the BashOperator to run command-line programs.

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer runs the provided commands in a Bash script on a worker. The worker is a Debian-based Docker container and includes several packages.

PythonOperator

Use the PythonOperator to run arbitrary Python code. Cloud Composer runs the Python code in a container that includes the following packages:

To install additional Python packages, see Installing Python Dependencies.

Google Cloud Platform Operators

Use the Google Cloud Platform Airflow operators to run tasks that use Google Cloud Platform products. Cloud Composer automatically configures an Airflow connection to the environment's project.

EmailOperator

Use the EmailOperator to send email from a DAG. To send email from a Cloud Composer environment, you must configure your environment to use SendGrid.

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Notifications

Set email_on_failure to True to send an email notification when an operator in the DAG fails. To send email notifications from a Cloud Composer environment, you must configure your environment to use SendGrid.

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

What's next

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Composer