Before deploying DAGs to production, you can execute Airflow CLI sub-commands to parse DAG code in the same context under which the DAG is executed.
Testing during DAG creation
You can run a single task instance locally and view the log output. Viewing the output enables you to check for syntax and task errors. Testing locally does not check dependencies or communicate status to the database.
We recommend that you put the DAGs in a data/test
folder
in your test environment.
Checking for PyPI package errors
Because PyPI dependencies might cause conflicts with dependencies that Airflow requires, we recommend that you install your desired Python packages locally in an Airflow worker container and test the package.
Determine the Cloud Composer environment's GKE cluster.
View and choose an Airflow worker pod.
kubectl get pods --all-namespaces
Look for a pod with a name like
airflow-worker-1a2b3c-x0yz
.Connect to a remote shell in an Airflow worker container.
kubectl -n composer-1-6-0-airflow-example-namespace \ exec -it airflow-worker-1a2b3c-x0yz -c airflow-worker -- /bin/bash
While connected to the remote shell, your command prompt shows the name of the Airflow worker pod, such as
airflow-worker-1a2b3c-x0yz:
.For the version of Python running in your environment, install the Python package in the Airflow worker container, such as:
sudo python2 -m pip install "[PACKAGE]"
Test for compatibility in the Airflow worker container.
- Check for syntax errors.
airflow list_dags
- Render the template.
airflow test --dry_run [DAG_ID] [TASK_ID] [EXECUTION_DATE]
Check for task errors.
airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE]
- Check for syntax errors.
Uninstall the Python package from the Airflow worker container, such as:
sudo python2 -m pip uninstall "[PACKAGE]"
Checking for syntax errors
- In the Cloud Storage bucket for your environment, create a test directory.
To check for syntax errors, enter the following
gcloud
command:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ list_dags -- -sd /home/airflow/gcs/data/test
where:
ENVIRONMENT_NAME
is the name of the environment.LOCATION
is the Compute Engine region where the environment is located.
For example:
gcloud composer environments run \ test-environment --location us-central1 \ list_dags -- -sd /home/airflow/gcs/data/test
Checking for task errors
To check for task-specific errors, enter the following gcloud
command:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ test -- -sd /home/airflow/gcs/data/test DAG_ID \ TASK_ID DAG_EXECUTION_DATE
where:
ENVIRONMENT_NAME
is the name of the environment.LOCATION
is the Compute Engine region where the environment is located.DAG_ID
is the ID of the DAG.TASK_ID
is the ID of the task.DAG_EXECUTION_DATE
is the execution date of the DAG. This date is used for templating purposes. Regardless of the date you specify here, the DAG runs immediately.
For example:
gcloud composer environments run test-environment --location us-central1 \ -- -sd /home/airflow/gcs/data/test-dags hello_world print_date 2018-09-03
Updating and testing a deployed DAG
To test updates to your DAGs in your test environment:
- Copy the deployed DAG that you want to update to
data/test
. - Update the DAG.
- Test the DAG.
- Make sure the DAG runs successfully.
- Turn off the DAG in your test environment.
- Go to the Airflow UI > DAGs page.
- If the DAG you're modifying runs constantly, turn off the DAG.
- To expedite outstanding tasks, click the task and Mark Success.
- Deploy the DAG to your production environment.
- Turn off the DAG in your production environment.
- Upload the updated DAG
to the
dags/
folder in your production environment.