Optimize Cloud Composer via Better Airflow DAGs
Strategic Cloud Engineer
Try Google Cloud
Start building on Google Cloud with $300 in free credits and 20+ always free products.Free trial
Hosting, orchestrating, and managing data pipelines is a complex process for any business. Google Cloud offers Cloud Composer - a fully managed workflow orchestration service - enabling businesses to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers. Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language. Apache Airflow allows users to create directed acyclic graphs (DAGs) of tasks, which can be scheduled to run at specific intervals or triggered by external events.
This guide contains a generalized checklist of activities when authoring Apache Airflow DAGs. These items follow best practices determined by Google Cloud and the open source community. A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or even thousands of DAGs. Each item will benefit your Cloud Composer environment and your development process.
1. Standardize file names. Help other developers browse your collection of DAG files.
a. ex) team_project_workflow_version.py
2. DAGs should be deterministic.
a. A given input will always produce the same output.
3. DAGs should be idempotent.
a. Triggering the DAG multiple times has the same effect/outcome.
4. Tasks should be atomic and idempotent.
a. Each task should be responsible for one operation that can be re-run independently of the others. In an atomized task, a success in part of the task means a success of the entire task.
5. Simplify DAGs as much as possible.
a. Simpler DAGs with fewer dependencies between tasks tend to have better scheduling performance because they have less overhead. A linear structure (e.g. A -> B -> C) is generally more efficient than a deeply nested tree structure with many dependencies.
Standardize DAG Creation
6. Add an owner to your default_args.
a. Determine whether you’d prefer the email address / id of a developer, or a distribution list / team name.
with DAG() as dag: instead of dag
= DAG()a. Prevent the need to pass the dag object to every operator or task group.
8. Set a version in the DAG ID.
a. Update the version after any code change in the DAG.
b. This prevents deleted Task logs from vanishing from the UI, no-status tasks generated for old dag runs, and general confusion of when DAGs have changed.
c. Airflow open-source has plans to implement versioning in the future.
9. Add tags to your DAGs.
a. Help developers navigate the Airflow UI via tag filtering.
b. Group DAGs by organization, team, project, application, etc.
10. Add a DAG description.
a. Help other developers understand your DAG.
11. Pause your DAGs on creation.
a. This will help avoid accidental DAG runs that add load to the Cloud Composer environment.
catchup=False to avoid automatic catch ups overloading your Cloud Composer Environment.
13. Set a
dagrun_timeout to avoid dags not finishing, and holding Cloud Composer Environment resources or introducing collisions on retries.
14. Set SLAs at the DAG level to receive alerts for long-running DAGs.
a. Airflow SLAs are always defined relative to the start time of the DAG, not to individual tasks.
b. Ensure that sla_miss_timeout is less than the dagrun_timeout.
c. Example: If your DAG usually takes 5 minutes to successfully finish, set the
sla_miss_timeout to 7 minutes and the
dagrun_timeout to 10 minutes. Determine these thresholds based on the priority of your DAGs.
15. Ensure all tasks have the same
start_date by default by passing arg to DAG during instantiation
16. Use a static
start_date with your DAGs.
a. A dynamic start_date is misleading, and can cause failures when clearing out failed task instances and missing DAG runs.
retries as a
default_arg applied at the DAG level and get more granular for specific tasks only where necessary.
a. A good range is 1–4 retries. Too many retries will add unnecessary load to the Cloud Composer environment.
Example putting all the above together:
18. Define what should occur for each callback function. (send an email, log a context, message slack channel, etc.). Depending on the DAG you may be comfortable doing nothing.
19. Use Task Groups to organize Tasks.
Reduce the Load on Your Composer Environment
20. Use Jinja Templating / Macros instead of python functions.
a. Airflow's template fields allow you to incorporate values from environment variables and jinja templates into your DAGs. This helps make your DAGs idempotent (meaning multiple invocations do not change the result) and prevents unnecessary function execution during Scheduler heartbeats.
b. The Airflow engine passes a few variables by default that are accessible in all templates.
Contrary to best practices, the following example defines variables based on datetime Python functions:
If this code is in a DAG file, these functions execute on every Scheduler heartbeat, which may not be performant. Even more importantly, this doesn't produce an idempotent DAG. You can't rerun a previously failed DAG run for a past date because datetime.today() is relative to the current date, not the DAG execution date.
A better way of implementing this is by using an Airflow Variable as such:
21. Avoid creating your own additional Airflow Variables.
a. The metadata database stores these variables and requires database connections to retrieve them. This can affect the performance of the Cloud Composer Environment. Use Environment Variables or Google Cloud Secrets instead.
22. Avoid running all DAGs on the exact same schedules (disperse workload as much as possible).
a. Prefer to use cron expressions for schedule intervals compared to airflow macros or time_deltas. This allows a more rigid schedule and it’s easier to spread out workloads throughout the day, making it easier on your Cloud Composer environment.
b. Crontab.guru can help with generating specific cron expression schedules. Check out the examples here.
23. Avoid XComs except for small amounts of data.
a. These add storage and introduce more connections to the database.
b. Use JSON dicts as values if absolutely necessary. (one connection for many values inside dict)
24. Avoid adding unnecessary objects in the dags/ Google Cloud Storage path.
a. If you must, add an .airflowignore file to GCS paths that the Airflow Scheduler does not need to parse. (sql, plug-ins, etc.)
25. Set execution timeouts for tasks.
26. Use Deferrable Operators over Sensors when possible.
a. A deferrable operator can suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to a Trigger. As a result, while it suspends (defers), it is not taking up a worker slot and your cluster will have fewer/lesser resources wasted on idle Operators or Sensors.
27. When using Sensors, always define
mode, poke_interval, and
a. Sensors require Airflow workers to run.
b. Sensor checking every n seconds (i.e. poke_interval < 60)? Use
mode=poke. A sensor in
mode=poke will continuously poll every n seconds and hold Airflow worker resources.
c. Sensor checking every n minutes (i.e. poke_interval >= 60)? Use
mode=reschedule. A sensor in
mode=reschedule will free up Airflow worker resources between poke intervals.
28. Offload processing to external services (BigQuery, Dataproc, Cloud Functions, etc.) to minimize load on the Cloud Composer environment.
a. These services usually have their own Airflow Operators for you to utilize.
29. Do not use sub-DAGs.
a. Sub-DAGs were a feature in older versions of Airflow that allowed users to create reusable groups of tasks within DAGs. However, Airflow 2.0 deprecated sub-DAGs because they caused performance and functional issues.
30. Use Pub/Sub for DAG-to-DAG dependencies.
a. Here is an example for multi-cluster / dag-to-dag dependencies.
31. Make DAGs load faster.
a. Avoid unnecessary “Top-level” Python code. DAGs with many imports, variables, functions outside of the DAG will introduce greater parse times for the Airflow Scheduler and in turn reduce the performance and scalability of Cloud Composer / Airflow.
b. Moving imports and functions within the DAG can reduce parse time (in the order of seconds).
c. Ensure that developed DAGs do not increase DAG parse times too much.
Improve Development and Testing
32. Implement “self-checks” (via Sensors or Deferrable Operators).
a. To ensure that tasks are functioning as expected, you can add checks to your DAG. For example, if a task pushes data to a BigQuery partition, you can add a check in the next task to verify that the partition generates and that the data is correct.
33. Look for opportunities to dynamically generate similar tasks/task groups/DAGs via Python code.
a. This can simplify and standardize the development process for DAGs.
34. Implement unit-testing for your DAGs
35. Perform local development via the Composer Local Development CLI Tool.
a. Composer Local Development CLI tool streamlines Apache Airflow DAG development for Cloud Composer 2 by running an Airflow environment locally. This local Airflow environment uses an image of a specific Cloud Composer version.
36. If possible, keep a staging Cloud Composer Environment to fully test the complete DAG run before deploying in the production.
a. Parameterize your DAG to change the variables, e.g., the output path of Google Cloud Storage operation or the database used to read the configuration. Do not hard code values inside the DAG and then change them manually according to the environment.
In summary, this blog provides a comprehensive checklist of best practices for developing Airflow DAGs for use in Google Cloud Composer. By following these best practices, developers can help ensure that Cloud Composer is working optimally and that their DAGs are well-organized and easy to manage.
For more information about Cloud Composer, check out the following related blog posts and documentation pages: