Adding and Updating DAGs (workflows)

This page describes how to determine the storage bucket for your environment and how to add, update, and delete a DAG from your environment.

Cloud Composer uses Cloud Storage to store Apache Airflow DAGs, also known as workflows. Each environment has an associated Cloud Storage bucket. Cloud Composer schedules only the DAGs in the Cloud Storage bucket.

Determining the storage bucket name

To determine the name of the storage bucket associated with your environment:

Console

  1. Open the Environments page in the GCP Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open the Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. (Optional) To view the bucket in Cloud Storage, click the bucket name.

GCLOUD COMMAND

Enter the following command, replacing the VARIABLES with appropriate values:

 gcloud beta composer environments describe ENVIRONMENT_NAME \
 --format="csv[no-heading](config.dagGcsPrefix)"
 

The dagGcsPrefix property in the output shows the bucket name:

gs://region-environment_name-random_id-bucket/

REST API

For information about credentials, see Cloud Composer Authentication.

  1. Call the projects.locations.environments.get method.
  2. Read the config.dagGcsPrefix from the Environment response.

RPC API

For information about credentials, see Cloud Composer Authentication.

  1. Call the Environments.GetEnvironment method.
  2. Read the config.dag_gcs_prefix from the Environment response.

Python

Use the google-auth library to get credentials and use the requests library to call the REST API.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Adding or updating a DAG

To add or update a DAG, move the Python .py file for the DAG to the environment's dags folder in Cloud Storage.

Console

  1. Open the Environments page in the GCP Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open the Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. To view the bucket in Cloud Storage, click the bucket name.

    By default, the dags folder opens.

  4. Click Upload Files and select the local copy of the DAG you want to upload.
  5. To upload the file to the dags folder, click Open.

gcloud

Enter the following command, replacing the VARIABLES with appropriate values:

 gcloud beta composer environments storage dags import \
 --environment ENVIRONMENT_NAME \
 --source LOCAL_FILE_TO_UPLOAD
 

Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface.

Deleting a DAG

To delete a DAG, remove the Python .py file for the DAG from the environment's dags folder in Cloud Storage.

Console

  1. Go to the Environments page in the GCP Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open its Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. To view the bucket in Cloud Storage, click the bucket name.

    By default, the dags folder opens.

  4. Click the checkbox next to the DAG you want to delete..
  5. At the top of the GCP Console, click Delete.
  6. In the dialog that appears, click OK.

gcloud

Enter the following command, replacing the VARIABLES with appropriate values:

 gcloud beta composer environments storage dags delete \
 --environment ENVIRONMENT_NAME DAG_NAME.py

Note: Using this command does not remove the DAG from the Airflow web interface.

Cloud Composer removes the DAG from the Cloud Storage and the schedule. DAG changes occur within 3-5 minutes.

What's next

Send feedback about...

Cloud Composer