What is Cloud Composer?
Cloud Composer is a managed Apache Airflow service that helps you create, schedule, monitor and manage workflows. Cloud Composer automation helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command line tools, so you can focus on your workflows and not your infrastructure.
What version of Apache Airflow does Cloud Composer use?
The Cloud Composer Versions List contains information on the latest offered versions of Apache Airflow.
Is the Apache Airflow web interface provided?
Yes, the Apache Airflow web interface is provided on Cloud Composer environments. The Overview of Cloud Composer describes the Airflow web server deployment. For further info on how to access the web server, see Airflow Web Interface.
Is the Apache Airflow command line interface supported?
How do I minimize code repetition if I want to run the same or similar tasks in multiple DAGs?
We suggest defining libraries and wrappers to minimize the code repetition.
How do I reuse code between DAG files?
How do I minimize the risk of different definitions arising?
For example, I have two teams that want to aggregate raw data into revenue metrics. The teams write two slightly different tasks that accomplish the same thing.
Define libraries to work with the revenue data so that the DAG implementers must clarify the definition of revenue that’s being aggregated.
How do I set dependencies between DAGs?
This depends on how you want to define the dependency.
- If you have two DAGs (DAG A and DAG B) and you want DAG B to trigger after DAG A, you can put a
TriggerDagRunOperatorat the end of Dag A.
- If DAG B depends only on an artifact that DAG A generates, such as a Pub/Sub message, then a sensor might work better.
- If DAG B is integrated closely with DAG A, you might be able to merge the two DAGs into one DAG.
How do I pass unique run IDs to a DAG and its tasks?
For example, I want to pass Dataproc cluster names and file paths.
You can generate a random unique ID by returning
str(uuid.uuid4()) in a
PythonOperator. This will place the ID into xcoms so that you can refer to the ID in other operators via templated fields.
Before generating a
uuid, consider whether a DagRun-specific ID would be more valuable. You can also reference these IDs in Jinja substitutions via macros.
Where do I draw the line between tasks in a DAG?
Each task should be an idempotent unit of work. Consequently, you should avoid encapsulating a multi-step workflow within a single task, such as a complex program running in a
Should I define multiple tasks in a single DAG to aggregate data from multiple sources?
For example, I have multiple tables with raw data and want to create daily aggregates for each table. The tasks are not dependent on each other. Should I create one task and DAG for each table or create one general DAG?
If you are okay with each task sharing the same DAG-level properties, such as
schedule_interval, then it makes sense to define multiple tasks in a single DAG. Otherwise, to minimize code repetition, multiple DAGs can be generated from a single Python module by placing them into the module’s globals.
How do I limit the number of concurrent tasks running in a DAG?
For example, I want to avoid exceeding API usage limits/quotas or avoid running too many simultaneous processes.
You can define Airflow pools in the Airflow web UI and associate tasks with existing pools in your DAGs.
Do you have an example of using templates to generate DAGs?
For an example, see the blog post, Airflow, Meta Data Engineering, and a Data Platform for the World’s Largest Democracy.
Should I use the
We do not recommend using the
DockerOperator, unless it's used to launch containers on a remote Docker installation (i.e. nowhere within an environment's GKE cluster). Composer does not mount each GKE node's Docker daemon within each Airflow worker, so the operator will not have access to Docker daemons unless a user installs them manually (and they would not persist across pod restarts).
As a workaround, we recommend using
GKEPodOperator instead, which can launch Kubernetes pods into Kubernetes or GKE clusters respectively. Note that we don't recommend launching pods into an environment's GKE cluster, as this can lead to resource competition.
Should I use the
We do not recommend using the
SubDagOperator can provide encapsulation, SubDag tasks require
a task slot. If the worker running the SubDag task dies, all tasks within
the SubDag fail, resulting in unreliable workflows.
Should I run Python code only in
to fully separate Python operators?
Depending on your goal, you have a few options.
How do I use the
KubernetesPodOperator outside Google Cloud?
You can mount a config file that specifies
how to authenticate with the GKE cluster and place the file
This folder is mounted across the Cloud Composer environment.
How do I add custom binary or non-PyPI packages?
You can install packages hosted in private package repositories.
You can also use the
KubernetesPodOperator. This operator
enables you to run a Kubernetes pod with your own image built with custom packages.
How do I uniformly pass arguments to a DAG and its tasks?
Airflow’s built-in support for Jinja templating enables users to pass arguments that can be used in templated fields.
When does template substitution happen?
Template substitution occurs on the workers just before the
function of an operator is called. In practice,
this means that templates are not substituted until just before a task runs.
How do I know which operator arguments support template substitution?
Operator arguments that support Jinja2 template substitution are explicitly marked as such in the Operator's source code.
Look for the
template_fields field in the Operator definition,
which contains a list of argument names that will undergo template
substitution. For example, see the
which supports templating
How do I isolate DAG runs in my production and test environments?
For example, Airflow has a global repository of source code in the
dags/ folder that
all DAG runs share. I want to update source code in production or test without interfering
with running DAGS by changing the code mid way.
Airflow does not provide strong DAG isolation. We recommend that you maintain separate production and test Cloud Composer environments to prevent your test DAGs from interfering with your production DAGs.
How do I avoid DAG interference when I run integration tests from different GitHub branches?
Use unique task names to prevent interference. For example, you can prefix your task IDs with the branch name.
What is a best practice for integration testing with Airflow?
We recommend that you use a dedicated environment for integration testing with Airflow. One way to signal the DAG run success is to write into a file in a Cloud Storage folder and check the content in your own integration test cases.
How do I collaborate efficiently with other DAG contributors?
Each contributor can have a subdirectory in the
data/ folder for development.
DAGs added to the
data/ folder won't be picked up automatically
by the scheduler or webserver.
DAG contributors can create manual DAG runs
by using the
gcloud composer environments run command and the
test sub-command with the
--subdir flag to specify the
contributor's development directory.
gcloud composer environments run test-environment-name \ test -- dag-id task-id execution-date --subdir /home/airflow/gcs/data/alice_dev
How do I keep my deployment and production environments in sync?
- To manage access:
- To deploy from development to production:
- Ensure consistent configuration, such as environment variables and PyPI packages.
- Ensure consistent DAG arguments. To avoid hard coding, we recommend that you use Airflow macros and variables, for example:
gcloud composer environments run test-environment-name \ variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE