Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
This page describes how to use Cloud Run functions to trigger Cloud Composer DAGs in response to events.
Apache Airflow is designed to run DAGs on a regular schedule, but you can also trigger DAGs in response to events. One way to do this is to use Cloud Run functions to trigger Cloud Composer DAGs when a specified event occurs.
The example in this guide runs a DAG every time a change occurs in a Cloud Storage bucket. Changes to any object in a bucket trigger a function. This function makes a request to Airflow REST API of your Cloud Composer environment. Airflow processes this request and runs a DAG. The DAG outputs information about the change.
Before you begin
Check your environment's networking configuration
This solution does not work in Private IP and VPC Service Controls configurations because it is not possible to configure connectivity from Cloud Run functions to the Airflow web server in these configurations.
In Cloud Composer 2, you can use another approach: Trigger DAGs using Cloud Run functions and Pub/Sub Messages
Enable APIs for your project
Console
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Enable the Airflow REST API
Depending on your version of Airflow:
- For Airflow 2, the stable REST API is already enabled by default. If your environment has the stable API disabled, then enable the stable REST API.
- For Airflow 1, enable the experimental REST API.
Allow API calls to Airflow REST API using Webserver Access Control
Cloud Run functions can reach out to Airflow REST API either using IPv4 or IPv6 address.
If you are not sure what will be the calling IP range then use a default
configuration option in Webserver Access Control which is All IP addresses have access (default)
to not accidentally block your Cloud Run functions.
Create a Cloud Storage bucket
This example triggers a DAG in response to changes in a Cloud Storage bucket. create a new bucket to use in this example.
Get the Airflow web server URL
This example makes REST API requests to the Airflow web server endpoint.
You use the part of the Airflow web interface URL before .appspot.com
in your
Cloud Function code.
Console
In the Google Cloud console, go to the Environments page.
Click the name of your environment.
On the Environment details page, go to the Environment configuration tab.
The URL of the Airflow web server is listed in the Airflow web UI item.
gcloud
Run the following command:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Replace:
ENVIRONMENT_NAME
with the name of the environment.LOCATION
with the region where the environment is located.
Get the client_id of the IAM proxy
To make a request to the Airflow REST API endpoint, the function requires the client ID of the Identity and Access Management proxy that protects the Airflow web server.
Cloud Composer does not provide this information directly. Instead, make an unauthenticated request to the Airflow web server and capture the client ID from the redirect URL:
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
Replace AIRFLOW_URL
with the URL of the Airflow web interface.
In the output, search for the string following client_id
. For example:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
Save the following code in a file called get_client_id.py
. Fill in your
values for project_id
, location
, and composer_environment
, then run
the code in Cloud Shell or your local environment.
Upload a DAG to your environment
Upload a DAG to your environment. The following example DAG outputs the received DAG run configuration. You trigger this DAG from a function, which you create later in this guide.
Deploy a Cloud Function that triggers the DAG
You can deploy a Cloud Function using your preferred language supported by Cloud Run functions or Cloud Run. This tutorial demonstrates a Cloud Function implemented in Python and Java.
Specify Cloud Function configuration parameters
Trigger. For this example, select a trigger that works when a new object is created in a bucket, or an existing object gets overwritten.
Trigger Type. Cloud Storage.
Event Type. Finalize / Create.
Bucket. Select a bucket that must trigger this function.
Retry on failure. We recommend to disable this option for the purposes of this example. If you use your own function in a production environment, enable this option to handle transient errors.
Runtime service account, in the Runtime, build, connections and security settings section. Use one of the following options, depending on your preferences:
Select Compute Engine default service account. With default IAM permissions, this account can run functions that access Cloud Composer environments.
Create a custom service account that has the Composer User role and specify it as a runtime service account for this function. This option follows the minimum privilege principle.
Runtime and entry point, on the Code step. When adding code for this example, select the Python 3.7 or later runtime and specify
trigger_dag
as the entry point.
Add requirements
Specify the dependencies in the requirements.txt
file:
Put the following code to the main.py
file and make the following
replacements:
Replace the value of the
client_id
variable with theclient_id
value that you obtained earlier.Replace the value of the
webserver_id
variable with your tenant project ID, which is a part of the Airflow web interface URL before.appspot.com
. You obtained the Airflow web interface URL earlier.Specify the Airflow REST API version that you use:
- If you use the stable Airflow REST API, set the
USE_EXPERIMENTAL_API
variable toFalse
. - If you use the experimental Airflow REST API, no changes are needed. The
USE_EXPERIMENTAL_API
variable is already set toTrue
.
- If you use the stable Airflow REST API, set the
Test your function
To check that your function and DAG work as intended:
- Wait until your function deploys.
- Upload a file to your Cloud Storage bucket. As an alternative, you can trigger the function manually by selecting the Test the function action for it in Google Cloud console.
- Check the DAG page in the Airflow web interface. The DAG should have one active or already completed DAG run.
- In the Airflow UI, check task logs for this run. You should see
that the
print_gcs_info
task outputs the data received from the function to the logs:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
What's next
- Access Airflow UI
- Access Airflow REST API
- Write DAGs
- Write Cloud Run functions
- Cloud Storage triggers