Triggering DAGs

This page describes how to use Cloud Functions for event-based DAG triggers.

While Airflow is designed to run DAGs on a regular schedule, you can trigger DAGs in response to events, such as a change in a Cloud Storage bucket or a message pushed to Cloud Pub/Sub. To accomplish this, Cloud Composer DAGs can be triggered by Cloud Functions.

The example in this guide runs a DAG every time a change occurs in a Cloud Storage bucket. Object-change metadata are passed into the DAG's config.

Enabling APIs for your project

Enable the Google Cloud Functions and Cloud Identity and Google Identity and Access Management (IAM) APIs.

Enable the APIs

Granting blob signing permissions to the Cloud Functions Service Account

To authenticate to Cloud IAP, grant the Appspot Service Account (used by Cloud Functions) the Service Account Token Creator role on itself. To do this, execute the following command in the gcloud command-line tool or Cloud Shell:

gcloud iam service-accounts add-iam-policy-binding \
your-project-id@appspot.gserviceaccount.com \
--member=serviceAccount:your-project-id@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Getting the client ID

To construct a token to authenticate to Cloud IAP, the function requires the client ID of the proxy that protects the Airflow webserver. The Cloud Composer API does not provide this information directly. Instead, make an unauthenticated request to the Airflow webserver and capture the client ID from the redirect URL. The following Python code sample demonstrates how to get the client ID:

import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string['client_id'][0])

Creating your function

Create a function with the below index.js and package.json, filling in the first four constants (see Create a Function).

When you have finished, your function should resemble the following graphic:

index.js

'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} callback The callback function.
 */
exports.triggerDag = function triggerDag (event, callback) {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {'conf': JSON.stringify(event.data)};

  // Make the request
  authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT)
    .then(function iapAuthorizationCallback (iap) {
      makeIapPostRequest(WEBSERVER_URL, BODY, iap.idToken, USER_AGENT, iap.jwt);
    })
    .then(_ => callback(null))
    .catch(callback);
};

/**
   * @param {string} clientId The client id associated with the Composer webserver application.
   * @param {string} projectId The id for the project containing the Cloud Function.
   * @param {string} userAgent The user agent string which will be provided with the webserver request.
   */
function authorizeIap (clientId, projectId, userAgent) {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(JSON.stringify({alg: 'RS256', typ: 'JWT'}))
    .toString('base64');

  var jwt = '';
  var jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  return fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'}
    })
    .then(res => res.json())
    .then(function obtainAccessTokenCallback (tokenResponse) {
      if (tokenResponse.error) {
        return Promise.reject(tokenResponse.error);
      }
      var accessToken = tokenResponse.access_token;
      var iat = Math.floor(new Date().getTime() / 1000);
      var claims = {
        iss: SERVICE_ACCOUNT,
        aud: 'https://www.googleapis.com/oauth2/v4/token',
        iat: iat,
        exp: iat + 60,
        target_audience: clientId
      };
      jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
      var toSign = [JWT_HEADER, jwtClaimset].join('.');

      return fetch(
        `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
        {
          method: 'POST',
          body: JSON.stringify({'bytesToSign': Buffer.from(toSign).toString('base64')}),
          headers: {
            'User-Agent': userAgent,
            'Authorization': `Bearer ${accessToken}`
          }
        });
    })
    .then(res => res.json())
    .then(function signJsonClaimCallback (body) {
      if (body.error) {
        return Promise.reject(body.error);
      }
      // Request service account signature on header and claimset
      var jwtSignature = body.signature;
      jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
      var form = new FormData();
      form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
      form.append('assertion', jwt);
      return fetch(
        'https://www.googleapis.com/oauth2/v4/token', {
          method: 'POST',
          body: form
        });
    })
    .then(res => res.json())
    .then(function returnJwt (body) {
      if (body.error) {
        return Promise.reject(body.error);
      }
      return {
        jwt: jwt,
        idToken: body.id_token
      };
    });
}

/**
   * @param {string} url The url that the post request targets.
   * @param {string} body The body of the post request.
   * @param {string} idToken Bearer token used to authorize the iap request.
   * @param {string} userAgent The user agent to identify the requester.
   * @param {string} jwt A Json web token used to authenticate the request.
   */
function makeIapPostRequest (url, body, idToken, userAgent, jwt) {
  var form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  return fetch(
    url, {
      method: 'POST',
      body: form
    })
    .then(function makeIapPostRequestCallback () {
      return fetch(url, {
        method: 'POST',
        headers: {
          'User-Agent': userAgent,
          'Authorization': `Bearer ${idToken}`
        },
        body: JSON.stringify(body)
      });
    });
}

Update your package.json with the following dependencies.

package.json

{
  "name": "nodejs-docs-samples-functions-composer-storage-trigger",
  "version": "0.0.1",
  "dependencies": {
    "form-data": "^2.3.2",
    "node-fetch": "^2.2.0"
  },
  "engines": {
    "node": ">=4.3.2"
  },
  "private": true,
  "license": "Apache-2.0",
  "author": "Google Inc.",
  "repository": {
    "type": "git",
    "url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
  },
  "devDependencies": {
    "@google-cloud/nodejs-repo-tools": "^2.2.5",
    "ava": "0.25.0",
    "proxyquire": "2.0.0",
    "semistandard": "^12.0.1",
    "sinon": "4.4.2"
  },
  "scripts": {
    "lint": "repo-tools lint",
    "test": "ava -T 20s --verbose test/*.test.js"
  }
}

Setting up your DAG

The following DAG contains a BashOperator which prints object-change metadata. In order for it to run in response to Cloud Functions events, copy it into your environment's DAGs folder (see Manage DAGs).

gcs_response_dag.py

import datetime

import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Testing your function

Upload a file to your Cloud Storage bucket. You should see that the DAG has been triggered and information about the Cloud Storage change has been logged by the BashOperator.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Composer