Reduce Airflow DAG parse times in Cloud Composer
Christian Yarros
Strategic Cloud Engineer, Google
This comprehensive blog presents various approaches for monitoring, troubleshooting, and minimizing DAG parse times, leading to notable performance improvements in Cloud Composer / Airflow:
Increase environment scalability by efficiently handling larger workloads and accommodating more DAGs.
Improve environment stability by limiting the chance of task overlaps and resource contention.
Enhance productivity and overall efficiency for developers through faster feedback loops and reduced processing time.
A low DAG parse time serves as a reliable indicator of a healthy Cloud Composer / Airflow environment
Getting started
What is an Airflow DAG?
An Airflow DAG (Directed Acyclic Graph) is a collection of tasks that are organized in a way that reflects their relationships and dependencies. DAGs are defined in Python scripts, and they are the core concept of Airflow.
A DAG defines four things:
The tasks that need to be run
The order in which the tasks need to be run
The dependencies between the tasks
The schedule for running the tasks
DAGs are a powerful way to define and manage complex workflows. They can be used to automate tasks, schedule tasks, and monitor the execution of tasks.
What is the Airflow Scheduler?
The Airflow Scheduler monitors all tasks and DAGs, then triggers the task instances once dependent tasks are complete. Once every 30 seconds by default, the Scheduler collects DAG parsing results and checks whether any active tasks can be triggered.
What is the DAG Processor?
As of Airflow 2.3.0, the DAG Processor is separate from the Airflow Scheduler. For more information about this change, check out AIP-43 DAG Processor separation.
Monitoring and alerting
Monitoring DAG parse times
In Google Cloud console you can use the Monitoring page and the Logs tab to inspect DAG parse times.
On Cloud Composer environment
Run the following commands to check DAG parse times on the Cloud Composer environment.:
Locally using time command
Make sure to run it several times in succession to account for caching effects. Compare the results before and after the optimization (in the same conditions - using the same machine, environment etc.) in order to assess the impact of any optimization.
Sample output:
The important metric is the "real time" - which tells you how long time it took to process the DAG. Note that when loading the file this way, you are starting a new interpreter so there is an initial loading time that is not present when Airflow parses the DAG. You can assess the time of initialization by running:
Result:
In this case the initial interpreter startup time is ~ 0.07s which is about 10% of time needed to parse the example_python_operator.py above so the actual parsing time is about ~ 0.62 s for the example DAG.
What is an ideal parse time metric?
On the Monitoring dashboard, in the DAG Statistics section, observe graphs for the total DAG parse time. If the number exceeds about 10 seconds, your Schedulers might be overloaded with DAG parsing and cannot run DAGs effectively.
How can I receive alerts for long parse times?
You can create alerting policies to monitor the values of metrics and to notify you when those metrics violate a condition. This can also be done through the Composer Monitoring Dashboard.
DAG code optimization
Generalized DAG code improvements
Check out Optimize Cloud Composer via Better Airflow DAGs to view a generalized checklist of activities when authoring Apache Airflow DAGs. These items follow best practices determined by Google Cloud and the open source community. A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or thousands of DAGs. Each item will benefit your Cloud Composer environment and your development process. The two highest priorities should be limiting top-level code and avoiding the use of variables/xcoms in top-level code.
Limit top-level code
Follow established best practices. You should avoid writing the top level code which is not necessary to create Operators and build DAG relations between them. This is because of the design decision for the Scheduler of Airflow and the impact the top-level code parsing speed on both performance and scalability of Airflow.
One of the important factors impacting DAG loading time, that might be overlooked by Python developers is that top-level imports might take surprisingly a lot of time (in the order of seconds) and they can generate a lot of overhead and this can be easily avoided by converting them to local imports inside Python callables for example.
Avoid the use of Variables and Xcoms in top-level code
If you are using Variable.get() in top level code, every time the .py file is parsed, Airflow executes a Variable.get() which opens a session to the DB. This can dramatically slow down parse times.
Use JSON dictionaries or Jinja templates as values if absolutely necessary. (one connection for many values inside dict)
DAG folder cleanup
Remove unused DAGs, unnecessary files from the DAGs folder
Airflow Scheduler wastes time and resources parsing files in DAGs folder that aren’t used.
Use .airflowignore
An .airflowignore file specifies the directories or files in DAG_FOLDER
or PLUGINS_FOLDER
that Airflow should intentionally ignore. Airflow supports two syntax flavors for patterns in the file, as specified by the DAG_IGNORE_FILE_SYNTAX
configuration parameter (added in Airflow 2.3): regexp and glob.
More files ignored = less files parsed by Airflow Scheduler.
Review paused DAGs
Paused DAGs are still continuously parsed by the Airflow Scheduler. Determine why each DAG is paused and whether it should be removed, ignored, or unpaused.
Airflow configurations
min_file_process_interval
The Scheduler parses your DAG files every min_file_process_interval number of seconds. Airflow starts using your updated DAG code only after this interval ends.
Consider increasing this interval when you have a high number of DAGs that do not change too often, or observe a high Scheduler load in general. Consider decreasing this interval to parse your DAGs faster. Updates to DAGs are reflected after this interval. Keeping this number low will increase CPU usage.
For example, if you have >1000 dag files, raise the min_file_process_interval to 600 (10 minutes), 6000 (100 minutes), or a higher value.
dag_dir_list_interval
Dag_dir_list_interval
determines how often Airflow should scan the DAGs directory in seconds. A lower value here means that new DAGs will be processed faster, but this comes at the cost of CPU usage.
Increasing the DAG directory listing interval reduces the Scheduler load associated with discovery of new DAGs in the environment's bucket. Consider increasing this interval if you deploy new DAGs infrequently. Consider decreasing this interval if you want Airflow to react faster to newly deployed DAG files.
parsing_processes
The DAG Processor can run multiple processes in parallel to parse DAGs, and parsing_processes
(formerly max_threads) determines how many of those processes can run in parallel. Increasing this value can help to serialize DAGs if you have a large number of them. By default, this is set to 2.
file_parsing_sort_mode
Evaluate the following file_parsing_sort_mode options if you are running more than one Airflow Scheduler. The Scheduler will list and sort the dag files to decide the parsing order.
modified_time
: Sort by modified time of the files. This is useful on a large scale to parse the recently modified DAGs first. (default)random_seeded_by_host
: Sort randomly across multiple Schedulers but with the same order on the same host. This is useful when running with Scheduler in HA mode where each Scheduler can parse different DAG files.alphabetical
: Sort by filename
When there are a lot (>1000) of dags files, you can prioritize parsing of new files by changing the file_parsing_sort_mode
to modified_time
.
Cloud Composer upgrades
If you’ve gotten this far and still observe long DAG parse times, you’ll need to consider adding more resources to your Cloud Composer Environment. Note: this will add to the overall cost of your Cloud Composer environment.
Change/Increase the number of Airflow Schedulers
Adjusting the number of Schedulers improves the Scheduler capacity and resilience of Airflow scheduling. Caution: Don't configure more than three Airflow Schedulers in your Cloud Composer environment without special consideration.
If you increase the number of Schedulers, this increases the traffic to and from the Airflow database. We recommend using two Airflow Schedulers in most scenarios.
Increase CPU/Memory of Airflow Schedulers
You can specify the amount of CPUs, memory, and disk space used by your environment. In this way, you can increase performance of your environment, in addition to horizontal scaling provided by using multiple workers and Schedulers.
Conclusion
By following these next steps, you can maximize the benefits of Cloud Composer / Airflow, enhance the performance of your environment, and create a smoother development experience.