Add and update DAGs

Cloud Composer 1 | Cloud Composer 2

This page describes how to manage DAGs in your Cloud Composer environment.

Cloud Composer uses a Cloud Storage bucket to store DAGs of your Cloud Composer environment. Your environment synchronizes DAGs from this bucket to Airflow components such as Airflow workers and schedulers.

Before you begin

  • Because Apache Airflow does not provide strong DAG isolation, we recommend that you maintain separate production and test environments to prevent DAG interference. For more information, see Testing DAGs.
  • Make sure that your account has enough permissions to manage DAGs.
  • Changes to DAG propagate to Airflow within 3-5 minutes. You can see task status in the Airflow web interface.

Access the bucket of your environment

To access the bucket associated with your environment:

Console

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, find a row with the name of your environment and in the DAGs folder column click the DAGs link. The Bucket details page opens. It shows contents of the /dags folder in your environment's bucket.

gcloud

gcloud CLI has separate commands to add and delete DAGs in your environment's bucket.

If you want to interact with your environment's bucket, you can also use the gsutil command-line tool. To get the address of your environment's bucket run the following gcloud CLI command:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Replace:

  • ENVIRONMENT_NAME with the name of the environment.
  • LOCATION with the region where the environment is located.

Example:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Construct an environments.get API request. In the Environment resource, in the EnvironmentConfig resource, in the dagGcsPrefix resource is the address of your environment's bucket.

Example:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use the google-auth library to get credentials and use the requests library to call the REST API.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Add or update a DAG

To add or update a DAG, move the Python .py file for the DAG to the /dags folder in the environment's bucket.

Console

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, find a row with the name of your environment and in the DAGs folder column click the DAGs link. The Bucket details page opens. It shows contents of the /dags folder in your environment's bucket.

  3. Click Upload files. Then select the Python .py file for the DAG using the browser's dialog and confirm.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Replace:

  • ENVIRONMENT_NAME with the name of the environment.
  • LOCATION with the region where the environment is located.
  • LOCAL_FILE_TO_UPLOAD is the Python .py file for the DAG.

Example:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Update a DAG that has active DAG runs

If you update a DAG that has active DAG runs:

  • All currently executing tasks finish using the original DAG file.
  • All tasks that are scheduled but are not currently running use the updated DAG file.
  • All tasks that are no longer present in the updated DAG file are marked as removed.

Updating DAGs that run on a frequent schedule

After you upload a DAG file, it takes some time for Airflow to load this file and update the DAG. If your DAG runs on a frequent schedule, you might want to ensure that the DAG uses the updated version of the DAG file. To do so:

  1. Pause the DAG in the Airflow UI.
  2. Upload an updated DAG file.
  3. Wait until you see the updates in the Airflow UI. This means that the DAG was correctly parsed by the scheduler and updated in the Airflow database.

    If the Airflow UI displays the updated DAGs, this does not guarantee that Airflow workers have the updated version of the DAG file. This happens because DAG files are synced independently for schedulers and workers.

  4. You might want to extend the waiting time to make sure that the DAG file is synced with all workers in your environment. The synchronization happens several times per minute. In a healthy environment, waiting for about 20-30 seconds is enough for all workers to sync.

  5. (Optional) If you want to be completely sure that all workers have the new version of the DAG file, inspect logs for each individual worker. To do so:

    1. Open the Logs tab for your environment in Google Cloud console.

    2. Go to Composer logs > Infrastructure > Cloud Storage sync item and inspect logs for every worker in your environment. Look for the most recent Syncing dags directory log item that has a timestamp after you uploaded the new DAG file. If you see a Finished syncing item that follows it, then the DAGs are synchronized successfully on this worker.

  6. Un-pause the DAG.

Deleting a DAG

This section describes how to delete a DAG.

Deleting a DAG in your environment

To delete a DAG, remove the Python .py file for the DAG from the environment's /dags folder in your environment's bucket.

Console

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, find a row with the name of your environment and in the DAGs folder column click the DAGs link. The Bucket details page opens. It shows contents of the /dags folder in your environment's bucket.

  3. Select the DAG file, click Delete, and confirm the operation.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Replace:

  • ENVIRONMENT_NAME with the name of the environment.
  • LOCATION with the region where the environment is located.
  • DAG_FILE with the Python .py file for the DAG.

Example:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Removing a DAG from the Airflow UI

You can remove a DAG from the Airflow UI in Airflow 1.10.0 or later versions.

To remove the metadata for a DAG from the Airflow web interface:

Airflow UI

  1. Go to the Airflow UI for your environment.
  2. For the DAG, click Delete DAG.

gcloud

In Airflow 1 versions earlier than 1.14.0, run the following command in gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

In Airflow 2, Airflow 1.14.0 and later versions, run the following command in gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Replace:

  • ENVIRONMENT_NAME with the name of the environment.
  • LOCATION with the region where the environment is located.
  • DAG_NAME is the name of the DAG to delete.

What's next