Trigger DAGs using Cloud Functions and Pub/Sub Messages

Cloud Composer 1 | Cloud Composer 2

This page guides you through creating an event-based push architecture by triggering Cloud Composer DAGs in response to Pub/Sub topic changes. Examples in this tutorial demonstrate handling the full cycle of Pub/Sub management, including subscription management, as a part of the DAG process. It is suitable for some of the common use cases when you need to trigger DAGs but don't want to set up extra access permissions.

For example, messages sent through Pub/Sub can be used as a solution if you don't want to provide direct access to a Cloud Composer environment for security reasons. You can configure a Cloud Function that creates Pub/Sub messages and publishes them on a Pub/Sub topic. You can then create a DAG that pulls Pub/Sub messages and then handles these messages.

In this specific example, you create a Cloud Function and deploy two DAGs. The first DAG pulls Pub/Sub messages and triggers the second DAG according to the Pub/Sub message content.

This tutorial assumes you are familiar with Python and the Google Cloud console.

Objectives

Costs

This tutorial uses the following billable components of Google Cloud:

After you finish this tutorial, you can avoid continued billing by deleting the resources you created. See Clean up for more detail.

Before you begin

For this tutorial, you need a Google Cloud project. Configure the project in the following way:

  1. In the Google Cloud console, select or create a project:

    Go to Project Selector

  2. Make sure that billing is enabled for your project. Learn how to check if billing is enabled on a project.

  3. Make sure that your Google Cloud project user has the following roles to create the necessary resources:

    • Service Account User (roles/iam.serviceAccountUser)
    • Pub/Sub Editor (roles/pubsub.editor)
    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Functions Admin (roles/cloudfunctions.admin)
    • Logs Viewer (roles/logging.viewer)
  4. Make sure that the service account that runs your Cloud Function has sufficient permissions in your project to access Pub/Sub. By default, Cloud Functions use the App Engine default service account. This service account has the Editor role, which has sufficient permissions for this tutorial.

Enable APIs for your project

Console

Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Enable the Cloud Composer API in your project by adding the following resource definitions to your Terraform script:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Replace <PROJECT_ID> with Project ID of your project. For example, example-project.

Create your Cloud Composer environment

Create a Cloud Composer 2 environment.

As a part of this procedure, you grant the Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) role to the Composer Service Agent account. Cloud Composer uses this account to perform operations in your Google Cloud project.

Create a Pub/Sub topic

This example triggers a DAG in response to a message pushed to a Pub/Sub topic. Create a Pub/Sub topic to use in this example:

Console

  1. In the Google Cloud console, go to the Pub/Sub Topics page.

    Go to Pub/Sub Topics

  2. Click Create Topic.

  3. In the Topic ID field, enter dag-topic-trigger as an ID for your topic.

  4. Leave other options at their defaults.

  5. Click Create Topic.

gcloud

To create a topic, run the gcloud pubsub topics create command in Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Add the following resource definitions to your Terraform script:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Replace <PROJECT_ID> with Project ID of your project. For example, example-project.

Upload your DAGs

Upload DAGs to your environment:

  1. Save the following DAG file on your local computer.
  2. Replace <PROJECT_ID> with Project ID of your project. For example, example-project.
  3. Upload the edited DAG file to your environment.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

The sample code contains two DAGs: trigger_dag and target_dag.

The trigger_dag DAG subscribes to a Pub/Sub topic, pulls Pub/Sub messages, and triggers another DAG specified in the DAG ID of the Pub/Sub message data. In this example, trigger_dag triggers the target_dag DAG, which outputs messages to the task logs.

The trigger_dag DAG contains the following tasks:

  • subscribe_task: Subscribe to a Pub/Sub topic.
  • pull_messages_operator: Read a Pub/Sub message data with PubSubPullOperator.
  • trigger_target_dag: Trigger another DAG (in this example, target_dag) according to the data in the messages pulled from the Pub/Sub topic.

The target_dag DAG contains just one task: output_to_logs. This task prints messages to the task log with one second delay.

Deploy a Cloud Function that publishes messages on a Pub/Sub topic

In this section, you deploy a Cloud Function that publishes messages on a Pub/Sub topic.

Create a Cloud Function and specify its configuration

Console

  1. In the Google Cloud console, go to the Cloud Functions page.

    Go to Cloud Functions

  2. Click Create function.

  3. In the Environment field, select 1st gen.

  4. In the Function name field, enter the name for your function: pubsub-publisher.

  5. In the Trigger type field, select HTTP.

  6. In the Authentication section, select Allow unauthenticated invocations. This option grants unauthenticated users the ability to invoke an HTTP function.

  7. Click Save.

  8. Click Next to move to the Code step.

Terraform

Consider using the Google Cloud console for this step, because there is no straightforward way to manage the function's source code from Terraform.

This example demonstrates how you can upload a Cloud Function from a local zip archive file by creating a Cloud Storage bucket, storing the file in this bucket, then using the file from the bucket as a source for the Cloud Function. If you use this approach, Terraform doesn't automatically update the source code of your function, even if you create a new archive file. To re-upload the function code, you can change the file name of the archive.

  1. Donwload the pubsub_publisher.py and the requirements.txt files.
  2. In the pubsub_publisher.py file, replace <PROJECT_ID> with the Project ID of your project. For example, example-project.
  3. Create a zip archive named pubsub_function.zip with the pbusub_publisner.py and the requirements.txt file.
  4. Save the zip archive to a directory where your Terraform script is stored.
  5. Add the following resource definitions to your Terraform script and replace <PROJECT_ID> with the Project ID of your project.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Specify Cloud Function code parameters

Console

  1. In the Code step, In the Runtime field, select the language runtime your function uses. In this example, select Python 3.10.

  2. In the Entry point field, enter pubsub_publisher. This is the code that is executed when your Cloud Function runs. The value of this flag must be a function name or a fully-qualified class name that exists in your source code.

Terraform

Skip this step. Cloud Function parameters are already defined in the google_cloudfunctions_function resource.

Upload your Cloud Function code

Console

In the Source code field, select the appropriate option for how you supply the function source code. In this tutorial, add your function code using the Cloud Functions Inline Editor. As an alternative, you can upload a ZIP file, or use Cloud Source Repositories.

  1. Put the following code example into the main.py file.
  2. Replace <PROJECT_ID> with Project ID of your project. For example, example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Skip this step. Cloud Function parameters are already defined in the google_cloudfunctions_function resource.

Specify your Cloud Function dependencies

Console

Specify the function dependencies in the requirements.txt metadata file:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.17.0

When you deploy your function, Cloud Functions downloads and installs dependencies declared in the requirements.txt file, one line per package. This file must be in the same directory as the main.py file that contains your function code. For more details, see Requirements Files in pip documentation.

Terraform

Skip this step. Cloud Function dependencies are defined in the requirements.txt file in the pubsub_function.zip archive.

Deploy your Cloud Function

Console

Click Deploy. When deployment finishes successfully, the function appears with a green check mark on the Cloud Functions page in the Google Cloud console.

Make sure that the service account that runs your Cloud Function has enough permissions in your project to access Pub/Sub.

Terraform

  1. Initialize Terraform:

    terraform init
    
  2. Review the configuration and verify that the resources that Terraform is going to create or update match your expectations:

    terraform plan
    
  3. To check whether your configuration is valid, run the following command:

    terraform validate
    
  4. Apply the Terraform configuration by running the following command and entering yes at the prompt:

    terraform apply
    

Wait until Terraform displays the "Apply complete!" message.

In the Google Cloud console, navigate to your resources in the UI to make sure that Terraform has created or updated them.

Test your Cloud Function

To check that your function publishes a message on a Pub/Sub topic and that the example DAGs work as intended:

  1. Check that the DAGs are active:

    1. In the Google Cloud console, go to the Environments page.

      Go to Environments

    2. In the list of environments, click the name of your environment. The Environment details page opens.

    3. Go to the DAGs tab.

    4. Check values in the State column for DAGs named trigger_dag and target_dag. Both DAGs must be in the Active state.

  2. Push a test Pub/Sub message. You can do it in Cloud Shell:

    1. In the Google Cloud console, go to the Functions page.

      Go to Cloud Functions

    2. Click the name of your function, pubsub-publisher.

    3. Go to the Testing tab.

    4. In Configure triggering event section, enter the following JSON key-value: {"message": "target_dag"}. Don't modify the key-value pair, because this message triggers the test DAG later.

    5. In the Test Command section, click Test in Cloud Shell.

    6. In Cloud Shell Terminal, wait until a command appears automatically. Run this command by pressing Enter.

    7. If the Authorize Cloud Shell message appears, click Authorize.

    8. Check that the message content corresponds to the Pub/Sub message. In this example, the output message must start with Message b'target_dag' with message_length 10 published to as a response from your function.

  3. Check that target_dag was triggered:

    1. Wait at least one minute, so that a new DAG run of trigger_dag completes.

    2. In the Google Cloud console, go to the Environments page.

      Go to Environments

    3. In the list of environments, click the name of your environment. The Environment details page opens.

    4. Go to the DAGs tab.

    5. Click trigger_dag to go to the DAG details page. On the Runs tab, a list of DAG runs for the trigger_dag DAG is displayed.

      This DAG runs every minute and processes all Pub/Sub messages sent from the function. If no messages were sent, then the trigger_target task is marked as Skipped in the DAG run logs. If DAGs were triggered, then the trigger_target task is marked as Success.

    6. Look through several recent DAG runs to locate a DAG run where all three tasks (subscribe_task, pull_messages_operator, and trigger_target) are in Success statuses.

    7. Go back to the DAGs tab and check that the Successful runs column for the target_dag DAG lists one successful run.

Summary

In this tutorial, you learned how to use Cloud Functions to publish messages on a Pub/Sub topic and deploy a DAG that subscribes to a Pub/Sub topic, pulls Pub/Sub messages, and triggers another DAG specified in the DAG ID of the message data.

There are also alternative ways of creating and managing Pub/Sub subscriptions and triggering DAGs that are outside of the scope of this tutorial. For example, you can use Cloud Functions to trigger Airflow DAGs when a specified event occurs. Have a look at our tutorials to try out the other Google Cloud features for yourself.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources or keep the project and delete the individual resources.

Delete the project

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Delete individual resources

If you plan to explore multiple tutorials and quickstarts, reusing projects can help you avoid exceeding project quota limits.

Console

  1. Delete the Cloud Composer environment. You also delete the environment's bucket during this procedure.
  2. Delete the Pub/Sub topic, dag-topic-trigger.
  3. Delete the Cloud Function.

    1. In the Google Cloud console, go to Cloud Functions.

      Go to Cloud Functions

    2. Click the checkbox for the function that you want to delete, pubsub-publisher.

    3. Click Delete, and then follow the instructions.

Terraform

  1. Make sure that your Terraform script doesn't contain entries for resources that are still required by your project. For example, you might want to keep some APIs enabled and IAM permissions still assigned (if you added such definitions to your Terraform script).
  2. Run terraform destroy.
  3. Manually delete the environment's bucket. Cloud Composer doesn't delete it automatically. You can do it from the Google Cloud console or Google Cloud CLI.

What's next