Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Before deploying DAGs to production, you can execute Airflow CLI sub-commands to parse DAG code in the same context under which the DAG is executed.
Testing during DAG creation
You can run a single task instance locally and view the log output. Viewing the output enables you to check for syntax and task errors. Testing locally does not check dependencies or communicate status to the database.
We recommend that you put the DAGs in a data/test
folder
in your test environment.
Create a test directory
In your environment's bucket, create a test directory and copy your DAGs to it.
gcloud storage cp BUCKET_NAME/dags \
BUCKET_NAME/data/test --recursive
Replace the following:
BUCKET_NAME
: the name of the bucket associated with your Cloud Composer environment.
Example:
gcloud storage cp gs://us-central1-example-environment-a12bc345-bucket/dags \
gs://us-central1-example-environment-a12bc345-bucket/data/test --recursive
For more information about uploading DAGs, see Add and update DAGs.
Check for syntax errors
To check for syntax errors in DAGs that you uploaded to the /data/test
folder, enter the following gcloud
command:
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
dags list -- --subdir /home/airflow/gcs/data/test
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
list_dags -- -sd /home/airflow/gcs/data/test
Replace the following:
ENVIRONMENT_NAME
: the name of the environment.ENVIRONMENT_LOCATION
: the region where the environment is located.
Check for task errors
To check for task-specific errors in DAGs that you uploaded to the /data/test
folder, run the following gcloud
command:
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
tasks test -- --subdir /home/airflow/gcs/data/test \
DAG_ID TASK_ID \
DAG_EXECUTION_DATE
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
test -- -sd /home/airflow/gcs/data/test DAG_ID \
TASK_ID DAG_EXECUTION_DATE
Replace the following:
ENVIRONMENT_NAME
: the name of the environment.ENVIRONMENT_LOCATION
: the region where the environment is located.DAG_ID
: the ID of the DAG.TASK_ID
: the ID of the task.DAG_EXECUTION_DATE
: the execution date of the DAG. This date is used for templating purposes. Regardless of the date you specify here, the DAG runs immediately.
Example:
Airflow 2
gcloud composer environments run \
example-environment \
--location us-central1 \
tasks test -- --subdir /home/airflow/gcs/data/test \
hello_world print_date 2021-04-22
Airflow 1
gcloud composer environments run example-environment \
--location us-central1 \
test -- -sd /home/airflow/gcs/data/test \
hello_world print_date 2021-04-22
Updating and testing a deployed DAG
To test updates to your DAGs in your test environment:
- Copy the deployed DAG that you want to update to
data/test
. - Update the DAG.
- Test the DAG.
- Make sure the DAG runs successfully.
- Turn off the DAG in your test environment.
- Go to the Airflow UI > DAGs page.
- If the DAG you're modifying runs constantly, turn off the DAG.
- To expedite outstanding tasks, click the task and Mark Success.
- Deploy the DAG to your production environment.
- Turn off the DAG in your production environment.
- Upload the updated DAG
to the
dags/
folder in your production environment.
FAQs for testing DAGs
How do I isolate DAG runs in my production and test environments?
For example, Airflow has a global repository of source code in the dags/
folder that all DAG runs share. You want to update source code in production
or test without interfering with running DAGs.
Airflow does not provide strong DAG isolation. We recommend that you maintain separate production and test Cloud Composer environments to prevent your test DAGs from interfering with your production DAGs.
How do I avoid DAG interference when I run integration tests from different GitHub branches
Use unique task names to prevent interference. For example, you can prefix your task IDs with the branch name.
What is a best practice for integration testing with Airflow?
We recommend that you use a dedicated environment for integration testing with Airflow. One way to signal the DAG run success is to write into a file in a Cloud Storage folder and then check the content in your own integration test cases.
How do I collaborate efficiently with other DAG contributors?
Each contributor can have a subdirectory in the data/
folder for development.
DAGs added to the data/
folder are not picked up automatically by the
Airflow scheduler or web server
DAG contributors can create manual DAG runs by using
the gcloud composer environments run
command and the test
sub-command
with the --subdir
flag to specify the contributor's development directory.
For example:
Airflow 2
gcloud composer environments run test-environment-name \
tasks test -- dag-id task-id execution-date \
--subdir /home/airflow/gcs/data/alice_dev
Airflow 1
gcloud composer environments run test-environment-name \
test -- dag-id task-id execution-date \
--subdir /home/airflow/gcs/data/alice_dev
How do I keep my deployment and production environments in sync?
To manage access:
For authentication, use service accounts.
For access control, use Identity and Access Management and Cloud Composer roles and permissions.
To deploy from development to production:
Ensure consistent configuration, such as environment variables and PyPI packages.
Ensure consistent DAG arguments. To avoid hard-coding, we recommend that you use Airflow macros and variables.
For example:
Airflow 2
gcloud composer environments run test-environment-name \ variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
Airflow 1
gcloud composer environments run test-environment-name \ variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
What's next
- Troubleshooting DAGs
- Adding and Updating DAGs
- Test, synchronize, and deploy your DAGs using version control