Trigger Cloud Composer DAGs with Cloud Run functions and Airflow REST API

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

This page describes how to use Cloud Run functions to trigger Cloud Composer DAGs in response to events.

Apache Airflow is designed to run DAGs on a regular schedule, but you can also trigger DAGs in response to events. One way to do this is to use Cloud Run functions to trigger Cloud Composer DAGs when a specified event occurs.

The example in this guide runs a DAG every time a change occurs in a Cloud Storage bucket. Changes to any object in a bucket trigger a function. This function makes a request to Airflow REST API of your Cloud Composer environment. Airflow processes this request and runs a DAG. The DAG outputs information about the change.

Before you begin

Check your environment's networking configuration

This solution does not work in Private IP and VPC Service Controls configurations because it is not possible to configure connectivity from Cloud Run functions to the Airflow web server in these configurations.

In Cloud Composer 2, you can use another approach: Trigger DAGs using Cloud Run functions and Pub/Sub Messages

Enable APIs for your project

Console

Enable the Cloud Composer and Cloud Run functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Run functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Enable the Airflow REST API

For Airflow 2, the stable REST API is already enabled by default. If your environment has the stable API disabled, then enable the stable REST API.

Allow API calls to Airflow REST API using Webserver Access Control

Cloud Run functions can reach out to Airflow REST API either using IPv4 or IPv6 address.

If you are not sure what will be the calling IP range then use a default configuration option in Webserver Access Control which is All IP addresses have access (default) to not accidentally block your Cloud Run functions.

Create a Cloud Storage bucket

This example triggers a DAG in response to changes in a Cloud Storage bucket. create a new bucket to use in this example.

Get the Airflow web server URL

This example makes REST API requests to the Airflow web server endpoint. You use the URL of the Airflow web server in your Cloud Function code.

Console

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. Click the name of your environment.

  3. On the Environment details page, go to the Environment configuration tab.

  4. The URL of the Airflow web server is listed in the Airflow web UI item.

gcloud

Run the following command:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

Replace:

  • ENVIRONMENT_NAME with the name of the environment.
  • LOCATION with the region where the environment is located.

Upload a DAG to your environment

Upload a DAG to your environment. The following example DAG outputs the received DAG run configuration. You trigger this DAG from a function, which you create later in this guide.

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

Deploy a Cloud Function that triggers the DAG

You can deploy a Cloud Function using your preferred language supported by Cloud Run functions or Cloud Run. This tutorial demonstrates a Cloud Function implemented in Python and Java.

Specify Cloud Function configuration parameters

  • Trigger. For this example, select a trigger that works when a new object is created in a bucket, or an existing object gets overwritten.

    • Trigger Type. Cloud Storage.

    • Event Type. Finalize / Create.

    • Bucket. Select a bucket that must trigger this function.

    • Retry on failure. We recommend to disable this option for the purposes of this example. If you use your own function in a production environment, enable this option to handle transient errors.

  • Runtime service account, in the Runtime, build, connections and security settings section. Use one of the following options, depending on your preferences:

    • Select Compute Engine default service account. With default IAM permissions, this account can run functions that access Cloud Composer environments.

    • Create a custom service account that has the Composer User role and specify it as a runtime service account for this function. This option follows the minimum privilege principle.

  • Runtime and entry point, on the Code step:

    • (Python) When adding code for this example, select the Python 3.7 or later runtime and specify trigger_dag_gcf as the entry point.

    • (Java) When adding code for this example, select the Java 17 runtime and specify com.example.Example as the entry point.

Add requirements

Python

Specify the dependencies in the requirements.txt file:

google-auth==2.19.1
requests==2.32.2

Java

Add the following dependencies to dependencies section in the pom.xml generated by Google Cloud Functions UI.

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-docs</artifactId>
      <version>v1-rev20210707-1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-credentials</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-oauth2-http</artifactId>
      <version>1.14.0</version>
    </dependency>

Python

Add the code for triggering DAGs using Airflow REST API. Create a file named composer2_airflow_rest_api.py and put the code for making Airflow REST API calls into this file.

Do not change any variables. The Cloud Function imports this file from the main.py file.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

Put the following code to the main.py file. Replace the value of the web_server_url variable with the Airflow web server address that you obtained earlier.

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""

from typing import Any

import composer2_airflow_rest_api

def trigger_dag_gcf(data, context=None):
    """
    Trigger a DAG and pass event data.

    Args:
      data: A dictionary containing the data for the event. Its format depends
      on the event.
      context: The context object for the event.

    For more information about the arguments, see:
    https://cloud.google.com/functions/docs/writing/background#function_parameters
    """

    # TODO(developer): replace with your values
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )
    # Replace with the ID of the DAG that you want to run.
    dag_id = 'composer_sample_trigger_response_dag'

    composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)

Java

Put the following code to the Example.java file. Replace the value of the webServerUrl variable with the Airflow web server address that you obtained earlier.


// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.example;

import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
 * Cloud Function that triggers an Airflow DAG in response to an event (in
 * this case a Cloud Storage event).
 */
public class Example implements BackgroundFunction<GcsEvent> {
  private static final Logger logger = Logger.getLogger(Example.class.getName());

  // TODO(developer): replace with your values
  // Replace webServerUrl with the Airflow web server address. To obtain this
  // URL, run the following command for your environment:
  // gcloud composer environments describe example-environment \
  //  --location=your-composer-region \
  //  --format="value(config.airflowUri)"
  @Override
  public void accept(GcsEvent event, Context context) throws Exception {
    String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
    String dagName = "composer_sample_trigger_response_dag";
    String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);

    logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
      dagName, event.name));
    logger.info(String.format("Triggering DAG via the following URL: %s", url));

    GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
        .createScoped("https://www.googleapis.com/auth/cloud-platform");
    HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
    HttpRequestFactory requestFactory =
      new NetHttpTransport().createRequestFactory(credentialsAdapter);

    Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
    Map<String, String> conf = new HashMap<String, String>();
    conf.put("bucket", event.bucket);
    conf.put("name", event.name);
    conf.put("generation", event.generation);
    conf.put("operation", context.eventType());
    json.put("conf", conf);
    HttpContent content = new JsonHttpContent(new GsonFactory(), json);
    HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
    request.getHeaders().setContentType("application/json");
    HttpResponse response;
    try {
      response = request.execute();
      int statusCode = response.getStatusCode();
      logger.info("Response code: " + statusCode);
      logger.info(response.parseAsString());
    } catch (HttpResponseException e) {
      // https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
      logger.info("Received HTTP exception");
      logger.info(e.getLocalizedMessage());
      logger.info("- 400 error: wrong arguments passed to Airflow API");
      logger.info("- 401 error: check if service account has Composer User role");
      logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
      logger.info("- 404 error: check Web Server URL");
    } catch (Exception e) {
      logger.info("Received exception");
      logger.info(e.getLocalizedMessage());
    }
  }

  /** Details of the storage event. */
  public static class GcsEvent {
    /** Bucket name. */
    String bucket;
    /** Object name. */
    String name;
    /** Object version. */
    String generation;
  }
}

Test your function

To check that your function and DAG work as intended:

  1. Wait until your function deploys.
  2. Upload a file to your Cloud Storage bucket. As an alternative, you can trigger the function manually by selecting the Test the function action for it in Google Cloud console.
  3. Check the DAG page in the Airflow web interface. The DAG should have one active or already completed DAG run.
  4. In the Airflow UI, check task logs for this run. You should see that the print_gcs_info task outputs the data received from the function to the logs:

Python

[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

Java

[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0

What's next