Adding and Updating DAGs (workflows)

This page describes how to determine the storage bucket for your environment and how to add, update, and delete a DAG from your environment.

Cloud Composer uses Cloud Storage to store Apache Airflow DAGs, also known as workflows. Each environment has an associated Cloud Storage bucket. Cloud Composer schedules only the DAGs in the Cloud Storage bucket.

Before you begin

  • The following permission is required to add and update DAGs in the Cloud Storage bucket for the Cloud Composer environment: storage.objectAdmin. For more information, see Cloud Composer Access Control.
  • DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface.

Determining the storage bucket name

To determine the name of the storage bucket associated with your environment:

Console

  1. Open the Environments page in the GCP Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open the Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. (Optional) To view the bucket in Cloud Storage, click the bucket name.

GCLOUD COMMAND

Enter the following command:

 gcloud composer environments describe ENVIRONMENT_NAME \
     --location LOCATION
     --format="get(config.dagGcsPrefix)" 

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located.
  • --format is an option to specify only the dagGcsPrefix property instead of all environment details.

The dagGcsPrefix property shows the bucket name:

gs://region-environment_name-random_id-bucket/

REST API

For information about credentials, see Cloud Composer Authentication.

  1. Call the projects.locations.environments.get method.
  2. Read the config.dagGcsPrefix from the Environment response.

RPC API

For information about credentials, see Cloud Composer Authentication.

  1. Call the Environments.GetEnvironment method.
  2. Read the config.dag_gcs_prefix from the Environment response.

Python

Use the google-auth library to get credentials and use the requests library to call the REST API.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Adding or updating a DAG

To add or update a DAG, move the Python .py file for the DAG to the environment's dags folder in Cloud Storage.

Console

  1. Open the Environments page in the GCP Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open the Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. To view the bucket in Cloud Storage, click the bucket name.

    By default, the dags folder opens.

  4. Click Upload Files and select the local copy of the DAG you want to upload.
  5. To upload the file to the dags folder, click Open.

gcloud

To add or update a DAG:

 gcloud composer environments storage dags import \
     --environment ENVIRONMENT_NAME \
     --location LOCATION \
     --source LOCAL_FILE_TO_UPLOAD 

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located.
  • LOCAL_FILE_TO_UPLOAD is the DAG to upload.

Deleting a DAG

To delete a DAG, remove the Python .py file for the DAG from the environment's dags folder in Cloud Storage. Currently, the metadata for deleted DAGs remains visible in the Airflow UI.

Console

  1. Go to the Environments page in the GCP Console.
    Open the Environments page
  2. In the Name column, click the name of the environment to open its Environment details page.

    On the Configuration tab, the name of the Cloud Storage bucket is shown to the right of the DAGs folder.

  3. To view the bucket in Cloud Storage, click the bucket name.

    By default, the dags folder opens.

  4. Click the checkbox next to the DAG you want to delete..
  5. At the top of the GCP Console, click Delete.
  6. In the dialog that appears, click OK.

gcloud

 gcloud composer environments storage dags delete \
     --environment ENVIRONMENT_NAME \
     --location LOCATION \
     DAG_NAME.py 

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located.
  • DAG_NAME.py is the DAG to delete.

Note: Using this command does not remove the DAG from the Airflow web interface.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Composer