Triggering DAGs (workflows)

This page describes how to use Cloud Functions for event-based DAG triggers.

While Airflow is designed to run DAGs on a regular schedule, you can trigger DAGs in response to events, such as a change in a Cloud Storage bucket or a message pushed to Cloud Pub/Sub. To accomplish this, Cloud Composer DAGs can be triggered by Cloud Functions.

The example in this guide runs a DAG every time a change occurs in a Cloud Storage bucket. Object-change metadata are passed into the DAG's config.

Enabling APIs for your project

Enable the Cloud Composer, Cloud Functions, and Cloud Identity and Access Management (Cloud IAM) APIs.

Enable the APIs

Granting blob signing permissions to the Cloud Functions Service Account

To authenticate to Cloud IAP, grant the Appspot Service Account (used by Cloud Functions) the Service Account Token Creator role on itself. To do this, execute the following command in the gcloud command-line tool or Cloud Shell:

gcloud iam service-accounts add-iam-policy-binding \ \ \

Getting the client ID

To construct a token to authenticate to Cloud IAP, the function requires the client ID of the proxy that protects the Airflow webserver. The Cloud Composer API does not provide this information directly. Instead, make an unauthenticated request to the Airflow webserver and capture the client ID from the redirect URL. The following Python code sample demonstrates how to get the client ID:

import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See:
credentials, _ = google.auth.default(
authed_session = google.auth.transport.requests.AuthorizedSession(

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)

Creating your function

Create a function with the below index.js and package.json, filling in the first four constants (see Create a Function). Enable Retry on failure.

When you have finished, your function should resemble the following graphic:


'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

 * Triggered from a message on a Cloud Storage bucket.
 * IAP authorization based on:
 * and
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
  } catch (err) {
    throw new Error(err);

 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    aud: '',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('', {
    method: 'POST',
    body: form,
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,

 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    body: JSON.stringify(body),

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);

Update your package.json with the following dependencies.


  "name": "nodejs-docs-samples-functions-composer-storage-trigger",
  "version": "0.0.1",
  "dependencies": {
    "form-data": "^2.3.2",
    "node-fetch": "^2.2.0"
  "engines": {
    "node": ">=8.0.0"
  "private": true,
  "license": "Apache-2.0",
  "author": "Google Inc.",
  "repository": {
    "type": "git",
    "url": ""
  "devDependencies": {
    "@google-cloud/nodejs-repo-tools": "^3.3.0",
    "mocha": "^6.0.0",
    "proxyquire": "^2.1.0",
    "sinon": "^7.2.7"
  "scripts": {
    "test": "mocha test/*.test.js --timeout=20000"

Setting up your DAG

The following DAG contains a BashOperator which prints object-change metadata. In order for it to run in response to Cloud Functions events, copy it into your environment's DAGs folder (see Manage DAGs).

import datetime

import airflow
from airflow.operators import bash_operator

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),

with airflow.DAG(
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Testing your function

Upload a file to your Cloud Storage bucket. You should see that the DAG has been triggered and information about the Cloud Storage change has been logged by the BashOperator.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Composer