This page describes how to use Cloud Functions for event-based DAG triggers.
While Airflow is designed to run DAGs on a regular schedule, you can trigger DAGs in response to events. One way to accomplish this is to use Cloud Functions to trigger Cloud Composer DAGs when a specified event occurs. For example, you can create a function that triggers a DAG when an object changes in a Cloud Storage bucket, or when a message is pushed to a Pub/Sub topic.
The example in this guide runs a DAG every time a change occurs in a Cloud Storage bucket. Changes to any object in a bucket trigger a function. This function makes a request to Airflow REST API of your Cloud Composer environment. Airflow processes this request and runs a DAG. The DAG outputs information about the change.
Before you begin
Enable APIs for your project
Enable the Cloud Composer and Cloud Functions APIs.Enable the Airflow web server REST API
By default, the API authentication feature is disabled in Airflow 1.10.11 and later versions. The Airflow web server denies all requests that you make. You use requests to trigger DAGs, so enable this feature.
To enable the API authentication feature, override the following Airflow configuration option:
Section | Key | Value | Notes |
---|---|---|---|
api |
auth_backend |
airflow.api.auth.backend.default |
The default is airflow.api.auth.backend.deny_all |
Get the Airflow web server URL
Your function makes requests to the Airflow web server endpoint, so obtain the URL of the Airflow web server.
Console
To obtain the Airflow web server URL:
- Open the Environments page.
- Click the name of your environment.
- Under Environment configuration, see the Airflow web UI item.
gcloud
To obtain the Airflow web server URL, run the following command:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format='value(config.airflowUri)'
Replace:
ENVIRONMENT_NAME
with the name of the environment.LOCATION
with the Compute Engine region where the environment is located.
Get the client_id of the IAM proxy
To make a request to the Airflow REST API endpoint, the function requires the client ID of the IAM proxy that protects the Airflow web server.
Cloud Composer does not provide this information directly. Instead, make an unauthenticated request to the Airflow web server and capture the client ID from the redirect URL:
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep "location:"
Replace AIRFLOW_URL
with the URL of the Airflow web interface.
In the output, search for the client_id
string ending in
apps.googleusercontent.com
. For example:
location: https://accounts.google.com/o/oauth2/v2/auth? client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com&response_type= ...
Create a Cloud Storage bucket
Since this example triggers a DAG in response to changes in a Cloud Storage bucket, create a new bucket to use in this example.
Trigger a DAG from Cloud Functions
Upload a DAG to your environment
Upload a DAG to your environment. The following example DAG outputs the received DAG run configuration. You trigger this DAG from a function that you create later in this guide.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='composer_sample_trigger_response_dag',
start_date=days_ago(1),
schedule_interval=None) as dag:
# Print the received dag_run configuration.
# The DAG run configuration contains information about the
# Cloud Storage object change.
t1 = BashOperator(
task_id='print_gcs_info',
bash_command='echo Triggered from GCF: {{ dag_run.conf }}',
dag=dag)
t1
Deploy a Cloud Function that triggers the DAG
Deploy a Python Cloud Function using the following configuration parameters and content.
Specify Cloud Function configuration parameters
Trigger. For this example, select a trigger that works when a new object is created in a bucket, or an existing object gets overwritten.
Trigger Type. Cloud Storage.
Event Type. Finalize / Create.
Bucket. Select a bucket that must trigger this function.
Retry on failure. We recommend to disable this option for the purposes of this example. If you use your own function in a production environment, enable this option to handle transient errors.
Runtime service account. Use one of the following options, depending on your preferences:
Select Compute Engine default service account. With default IAM permissions, this account can run functions that access Cloud Composer environments.
Create a custom service account that has the Composer User role and specify it as a runtime service account for this function. This option follows the minimum privilege principle.
Runtime and entry point. When adding code for this example, select the Python 3.7 runtime and specify
trigger_dag
as an entry point.
Add requirements
Specify the dependencies in the requirements.txt
file:
Add the function code
Put the following code to the main.py
file and make the following replacements:
- Replace the value of the
client_id
variable with theclient_id
value obtained on a previous step. - Replace the value of the
webserver_id
variable with the Airflow web interface URL obtained on a previous step.
Test your function
To check that your function and DAG work as intended:
- Wait until your function is deployed.
- Upload a file to your Cloud Storage bucket. As an alternative, you can trigger the function manually by selecting the Test function action for it in the Google Cloud Console.
- Check the DAG page in the Airflow web interface. The DAG should have one active or already completed DAG run.
- In the Airflow web interface, check task logs for this run. You should see
that the
print_gcs_info
task outputs the data received from the function to the logs:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0
What's next
- Airflow web interface
- Writing DAGs
- Writing Cloud Functions
- Google Cloud Storage Triggers
- Triggering Cloud Functions from DAGs