Adding and Updating DAGs (workflows)

This page describes how to determine the storage bucket for your environment and how to add, update, and delete a DAG from your environment.

Cloud Composer uses Cloud Storage to store Apache Airflow DAGs, also known as workflows. Each environment has an associated Cloud Storage bucket. Cloud Composer schedules only the DAGs in the Cloud Storage bucket.

Before you begin

  • Because Apache Airflow does not provide strong DAG isolation, we recommend that you maintain separate production and test environments to prevent DAG interference. For more information, see Testing DAGs.
  • The following permissions are required to add and update plugins in the Cloud Storage bucket for the Cloud Composer environment:
    • storage.objectAdmin to upload files.
    • composer.environments.get to look up the DAG destination bucket. This permission is not required when using the Cloud Storage API or gsutil.
  • DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface.

Determine the storage bucket name

To determine the name of the storage bucket associated with your environment:

Console

  1. Open the Environments page in the Cloud Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open the Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. (Optional) To view the bucket in Cloud Storage, click the bucket name.

gcloud

Enter the following command:

gcloud composer environments describe ENVIRONMENT_NAME \
  --location LOCATION \
  --format="get(config.dagGcsPrefix)"

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located.
  • --format is an option to specify only the dagGcsPrefix property instead of all environment details.

The dagGcsPrefix property shows the bucket name:

gs://region-environment_name-random_id-bucket/

rest

For information about credentials, see Cloud Composer Authentication.

  1. Call the projects.locations.environments.get method.
  2. Read the config.dagGcsPrefix from the Environment response.

rpc

For information about credentials, see Cloud Composer Authentication.

  1. Call the Environments.GetEnvironment method.
  2. Read the config.dag_gcs_prefix from the Environment response.

python

Use the google-auth library to get credentials and use the requests library to call the REST API.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Add or update a DAG

To add or update a DAG, move the Python .py file for the DAG to the environment's dags folder in Cloud Storage.

Console

  1. Open the Environments page in the Cloud Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open the Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. To view the bucket in Cloud Storage, click the bucket name.

    By default, the dags folder opens.

  4. Click Upload Files and select the local copy of the DAG you want to upload.

  5. To upload the file to the dags folder, click Open.

gcloud

To add or update a DAG:

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source LOCAL_FILE_TO_UPLOAD

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located.
  • LOCAL_FILE_TO_UPLOAD is the DAG to upload.

Update a DAG that has active DAG runs

If you update a DAG that has active DAG runs:

  • All currently executing tasks finish using the original DAG file.
  • All tasks that are scheduled but are not currently running use the updated DAG file.
  • All tasks that are no longer present in the updated DAG file are marked as removed.

Updating DAGs that run on a frequent schedule

After you upload a DAG file, it takes some time for Airflow to load this file and update the DAG. If your DAG runs on a frequent schedule, you might want to ensure that the DAG uses the updated version of the DAG file. To do so:

  1. Pause the DAG in the Airflow UI.
  2. Upload an updated DAG file.
  3. Wait until you see the updates in the Airflow UI. This means that the DAG was correctly parsed by the scheduler and updated in the Airflow database.

    If the Airflow UI displays the updated DAGs, this does not guarantee that Airflow workers have the updated version of the DAG file. This happens because DAG files are synced independently for schedulers and workers.

  4. You might want to extend the waiting time to make sure that the DAG file is synced with all workers in your environment. The synchronization happens several times per minute. In a healthy environment, waiting for about 20-30 seconds is enough for all workers to sync.

  5. (Optional) If you want to be completely sure that all workers have the new version of the DAG file, inspect logs for each individual worker. To do so:

    1. Open the Logs tab for your environment in the Console.
    2. Go to Composer logs > Infrastructure > Cloud Storage sync item and inspect logs for every worker in your environment. Look for the most recent Syncing dags directory log item that has a timestamp after you uploaded the new DAG file. If you see a Finished syncing item that follows it, then the DAGs are synchronized successfully on this worker.
  6. Un-pause the DAG.

Deleting a DAG

This section describes how to delete a DAG.

Deleting a DAG in your environment

To delete a DAG, remove the Python .py file for the DAG from the environment's dags folder in Cloud Storage. Deleting a DAG does not remove the DAG metadata from the Airflow web interface.

Console

  1. Go to the Environments page in the Cloud Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open its Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. To view the bucket in Cloud Storage, click the bucket name.

    By default, the dags folder opens.

  4. Click the checkbox next to the DAG you want to delete.

  5. At the top of the Cloud Console, click Delete.

  6. In the dialog that appears, click OK.

gcloud

gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located.
  • DAG_NAME.py is the DAG to delete.
  • Airflow 1.9.0: The metadata for deleted DAGs remains visible in the Airflow web interface.

  • Airflow 1.10.0 or later: You can use the gcloud tool to remove the DAG metadata.

Removing a DAG from the Airflow web interface

To remove the metadata for a DAG from the Airflow web interface, enter:

Airflow 1.10 CLI

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

Airflow 2.0 CLI

  gcloud beta composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located.
  • DAG_NAME is the name of the DAG to delete.

What's next