This guide shows you how to write an Apache Airflow directed acyclic graph (DAG)
that runs in a Cloud Composer environment.
Structuring a DAG
An Airflow DAG is defined in a Python file and is composed of the following
components: A DAG definition, operators, and operator relationships. The
following code snippets show examples of each component out of context:
import datetime
from airflow import models
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2018, 1, 1),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
Operators
to describe the work to be done. An instantiation of an operator is called a
task.
from airflow.operators import bash_operator
from airflow.operators import python_operator
def greeting():
import logging
logging.info('Hello World!')
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
The following workflow is a complete working example and is composed of two tasks: a hello_python task and a goodbye_bash task:
from __future__ import print_function
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2018, 1, 1),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
def greeting():
import logging
logging.info('Hello World!')
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
The following examples show a few popular Airflow operators. For an
authoritative reference of Airflow operators, see the Apache Airflow API
Reference
or browse the source code of the
core
and
contrib
operators.
BashOperator
Use the
BashOperator
to run command-line programs.
from airflow.operators import email_operator
# Send email confirmation
email_summary = email_operator.EmailOperator(
task_id='email_summary',
to=models.Variable.get('email'),
subject='Sample BigQuery notify data ready',
html_content="""
Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
12AM. The most popular question was '{question_title}' with
{view_count} views. Top 100 questions asked are now available at:
{export_location}.
""".format(
min_date=min_query_date,
max_date=max_query_date,
question_title=(
'{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
'key=\'return_value\')[0][0] }}'
),
view_count=(
'{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
'key=\'return_value\')[0][1] }}'
),
export_location=output_file))
Notifications
Set email_on_failure to True to send an email notification when an operator
in the DAG fails. To send email notifications from a Cloud Composer
environment, you must
configure your environment to use SendGrid.
from airflow import models
default_dag_args = {
'start_date': yesterday,
# Email whenever an Operator in the DAG fails.
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
Guidelines
Place any custom Python libraries in a DAG's ZIP archive in a nested
directory. Do not place libraries at the top level of the DAGs directory.
When Airflow scans the dags/ folder, Airflow only checks for DAGs in Python
modules that are in the top-level of the DAGs folder and in the top level
of a ZIP archive also located in the top-level dags/ folder. If Airflow
encounters a Python module in a ZIP archive that does not contain both airflow
and DAG substrings, Airflow stops processing the ZIP archive. Airflow
returns only the DAGs found up to that point.
For fault tolerance, do not define multiple DAG objects in the same Python
module.
Do not define subDAGs as top-level objects.
In general, Airflow picks up DAG objects in the global namespace of a
module in the dags/ directory as top-level DAGs. Any subDags defined as
top-level objects execute on their own schedules in addition to the schedules of
other DAGs that embed the subDags.
Place files that are required at DAG parse time in the dags/ directory not
in the data/ directory. The data/ directory is not mounted in
the webserver.
FAQs
Creating DAGs
How do I minimize code repetition if I want to run the
same or similar tasks in multiple DAGs?
We suggest
defining libraries and wrappers to minimize the code repetition.
How do I reuse code between DAG files?
Put your utility functions in a local Python library and import the functions. You
can reference the functions in any DAG in the dags/ folder.
How do I minimize the risk of different definitions arising?
For example, I have two teams that want to aggregate raw data into revenue metrics. The teams write two slightly different tasks that accomplish the same thing.
Define libraries to work with the revenue data so that the DAG implementers must clarify the definition of revenue that’s being aggregated.
How do I set dependencies between DAGs?
This depends on how you want to define the dependency.
If you have two DAGs (DAG A and DAG B) and you want DAG B to trigger after DAG A, you can put a TriggerDagRunOperator at the end of Dag A.
If DAG B depends only on an artifact that DAG A generates, such as a Pub/Sub message, then a sensor might work better.
If DAG B is integrated closely with DAG A, you might be able to merge the two DAGs into one DAG.
How do I pass unique run IDs to a DAG and its tasks?
For example, I want to pass Dataproc cluster names and file paths.
You can generate a random unique ID by returning str(uuid.uuid4()) in a PythonOperator. This will place the ID into xcoms so that you can refer to the ID in other operators via templated fields.
Before generating a uuid, consider whether a DagRun-specific ID would be more valuable. You can also reference these IDs in Jinja substitutions via macros.
Where do I draw the line between tasks in a DAG?
Each task should be an idempotent unit of work. Consequently, you should avoid encapsulating a multi-step workflow within a single task, such as a complex program running in a PythonOperator.
Should I define multiple tasks in a single DAG to aggregate data from multiple sources?
For example, I have multiple tables with raw data and want to create daily aggregates for each table. The tasks are not dependent on each other. Should I create one task and DAG for each table or create one general DAG?
If you are okay with each task sharing the same DAG-level properties, such as schedule_interval, then it makes sense to define multiple tasks in a single DAG. Otherwise, to minimize code repetition, multiple DAGs can be generated from a single Python module by placing them into the module’s globals.
How do I limit the number of concurrent tasks running in a DAG?
For example, I want to avoid exceeding API usage limits/quotas or avoid running too many simultaneous processes.
You can define Airflow pools in the
Airflow web UI and associate tasks with existing pools in your DAGs.
Do you have an example of using templates to generate DAGs?
We do not recommend using the DockerOperator, unless it's used to launch containers on a remote Docker installation (i.e. nowhere within an environment's GKE cluster). Composer does not mount each GKE node's Docker daemon within each Airflow worker, so the operator will not have access to Docker daemons unless a user installs them manually (and they would not persist across pod restarts).
As a workaround, we recommend using KubernetesPodOperator or GKEPodOperator instead, which can launch Kubernetes pods into Kubernetes or GKE clusters respectively. Note that we don't recommend launching pods into an environment's GKE cluster, as this can lead to resource competition.
We do not recommend using the SubDagOperator.
Although the SubDagOperator can provide encapsulation, SubDag tasks require
a task slot. If the worker running the SubDag task dies, all tasks within
the SubDag fail, resulting in unreliable workflows.
Should I run Python code only in DockerOperators
to fully separate Python operators?
Depending on your goal, you have a few options.
If your only concern is maintaining separate Python dependencies, you might be able to
use the PythonVirtualenvOperator.
Although you can use the DockerOperator, also consider using the KubernetesPodOperator,
which allows you to define Kubernetes Pods and run the pods in other clusters.
How do I use the KubernetesPodOperator outside Google Cloud?
You can mount a config file that specifies
how to authenticate with the GKE cluster and place the file
in the /data folder.
This folder is mounted across the Cloud Composer environment.
Template substitution occurs on the workers just before the pre_execute
function of an operator is called. In practice,
this means that templates are not substituted until just before a task runs.
How do I know which operator arguments support template substitution?
Operator arguments that support Jinja2 template
substitution are explicitly marked as such in the Operator's source code.
Look for the template_fields field in the Operator definition,
which contains a list of argument names that will undergo template
substitution. For example, see the
BashOperator,
which supports templating
for the bash_command and env arguments.