Edit on GitHub
Report issue
Page history

Alert-based event archiver with Cloud Monitoring and Pub/Sub

Author(s): @ptone ,   Published: 2019-03-15

Preston Holmes | Solution Architect | Google

Contributed by Google employees.

Pub/Sub is often used for large data flows and is capable of handling large throughput, but it is also often used for high-value event streams that have lower volumes or are more irregular in their arrival rates.

Dataflow templates allow you to easily deploy a Dataflow job to move events from Pub/Sub to Cloud Storage. This will run a streaming Dataflow job continuously and may be expensive to run when the event volume is low, or if you have multiple medium-volume topics to archive.

Pub/Sub retains messages for 7 days in a subscription resource before they are deleted. This means that lower-volume topics can be periodically archived as batches into Cloud Storage objects without running a continuous streaming job. Cloud Function Pub/Sub triggers can't be used for this directly, since they are invoked for each Pub/Sub message individually and you generally want to archive in batches.

This solution uses the built-in metrics for subscription resources in Pub/Sub subscriptions to trigger an archiving task based on a combination of backlog size and age.


  • Create a notification webhook.
  • Create a Cloud Monitoring alerting policy with Pub/Sub conditions.
  • Create a pair of Cloud Functions that respond to the alert.
  • Verify that data gets archived from Pub/Sub to Cloud Storage.



There are several components to the architecture:

  • Data ingest Pub/Sub topic: This is where events of interest are received.
  • Cloud Monitoring alert, which consists of the following:
    • Alerting policy watching metrics related to a subscription associated with the data ingest topic
    • Webhook notification channel that points to a Cloud Function
  • Pub/Sub topic, which durably records the alert incident relaying it to the Archiver function
  • Archiver function, which uses streaming pull with Pub/Sub and streaming write to Cloud Storage to efficiently batch and move event data from Pub/Sub to Cloud Storage.

Before you begin

  1. Create a Google Cloud project for this tutorial to allow for easier cleanup.
  2. Create a new workspace in the Google Cloud project that you created above.
  3. Enable Cloud Functions:

    gcloud services enable cloudfunctions.googleapis.com

Setting up the automation

Commands in this tutorial assume that you are running from the tutorial folder:

git clone https://github.com/GoogleCloudPlatform/community.git
cd community/tutorials/cloud-pubsub-drainer/

Install components

You will use the command-line interface for Cloud Monitoring resources, which is provided by the gcloud alpha component:

gcloud components install alpha

Set environment variables

These environment variables are used throughout the project:

export FULL_PROJECT=$(gcloud config list project --format "value(core.project)")
export PROJECT_ID="$(echo $FULL_PROJECT | cut -f2 -d ':')"

Create a data archive bucket

This is the bucket into which we archive data from Pub/Sub:

gsutil mb gs://$PROJECT_ID-data-archive

Create Pub/Sub resources

gcloud pubsub topics create demo-data
gcloud pubsub subscriptions create bulk-drainer --topic demo-data
gcloud pubsub topics create drain-tasks

The demo-data topic is the main topic where data is sent that we want to archive. It may have several subscriptions to it that react to events or process streaming data. In Pub/Sub, each subscription gets its own durable copy of the data.

The bulk-drainer subscription is created to act as the storage buffer dedicated to the archiver task.

The drain-tasks topic serves as a durable relay of the alert occurrence.

Deploy the archiving function

The Archiver function is awakened by a condition-based alert and drains any outstanding events in the bulk-drainer subscription into chunked objects in the data-archive bucket.

cd drainer-func
gcloud functions deploy Archiver \
    --runtime go111 \
    --trigger-topic drain-tasks \
    --update-env-vars BUCKET_NAME=$PROJECT_ID-data-archive,SUBSCRIPTION_NAME=bulk-drainer,AUTH_TOKEN=abcd

How much is consolidated per object is configured in the code; it can be set to a number of messages or a number of bytes.

Deploy webhook relay

Cloud Monitoring does not currently have a native Pub/Sub alert notification channel. We use a small HTTP Cloud Function to act as a simple relay. The AUTH_TOKEN environment variable sets an expected shared secret between the Cloud Monitoring notification channel and the webhook.

gcloud functions deploy StackDriverRelay --runtime go111 --trigger-http --update-env-vars AUTH_TOKEN=abcd

Allow Cloud Monitoring to call the webhook

Cloud Function IAM Alpha Users Only

Cloud Monitoring can only reach publicly accesible webhooks. If you are using a project with function authorization enabled, you need to make it reachable:

gcloud alpha functions add-iam-policy-binding StackDriverRelay --member "allUsers" --role "roles/cloudfunctions.invoker"

Create a Cloud Monitoring notification channel

In Cloud Monitoring, a notification channel is a resource that can be used with one or more alerting policies. These can be created in the Cloud Console or through an API. Using the URL from our deployed function, you create a new channel and then get the channel identifier as an environment variable:

cd ..
export URL=$(gcloud functions describe StackDriverRelay --format='value(httpsTrigger.url)')

gcloud alpha monitoring channels create --channel-content-from-file channel.json --channel-labels url=$URL?token=abcd

export CHANNEL=$(gcloud alpha monitoring channels list --filter='displayName="Archiver"' --format='value("name")')

Create a Cloud Monitoring alerting policy

Alerting policies describe conditions that result in the alert firing as true. This is where we want to create an alert condition if messages in our bulk-drainer Pub/Sub subscription get either too numerous or too old. See the contents of policy.json in the tutorial repository for details. The duration and size thresholds are set low so that the solution can be tested and demonstrated quickly.

After the policy is created, the ID is retrieved and then it is updated with to use the Notification Channel created earlier.

gcloud alpha monitoring policies create --policy-from-file=policy.json

export POLICY=$(gcloud alpha monitoring policies list --filter='displayName="archive-pubsub"' --format='value("name")')

gcloud alpha monitoring policies update $POLICY --add-notification-channels=$CHANNEL

At this point, the solution is fully deployed. You can see and review the policy in the alerting section of the console. You can see notification channels in the workspace settings area of the console.

Testing the solution

Create test data

The loader script creates some synthetic test data

cd ../loader
go get
# note you may see a warning about gopath if you are in Cloud Shell
go run main.go

You should see output that looks like:

bulking out
done bulking out
2019/03/05 16:55:33 Published Batch
2019/03/05 16:55:34 Published Batch
2019/03/05 16:55:35 Published Batch
2019/03/05 16:55:36 Published Batch
2019/03/05 16:55:37 Published Batch
2019/03/05 16:55:38 Published Batch
2019/03/05 16:55:39 Published Batch
2019/03/05 16:55:40 Published Batch
2019/03/05 16:55:41 Published Batch
2019/03/05 16:55:42 Published Batch

To test the age-based condition in the policy, let the loader run just for a moment, cancel by pressing Ctrl-C, and then wait a few minutes until the condition is triggered.

alerting policy

For a trigger based on backlog size, let the loader tool run for several minutes before canceling. (Do not let this script run indefinitely, since it will continue to generate billable volumes of data.)

Note that Cloud Monitoring metrics do not appear instantaneously; it takes some time for them to show in the alert policy charts. Also note that for the condition to fire, it has to be true for 1 minute (this is a configurable part of policy).

While you wait for an alert to fire, you can to check out the alerting policy overview.

Check the function logs

The Relay function logs should show that the function was called.

relay function logs

The Archiver function logs should show the archiving activity, including how many messages were archived.

archiver function logs

Check the archive bucket

If you look in the data archive bucket in the Cloud Console storage browser you will see a set of nested folders by year/month/day and then named for the time the archive event occurred.

The size archive size is set to 1MB in this tutorial, though that is adjustable in the function code.

storage export

Limits of the pattern

This pattern is not appropriate for all cases. Notably, function invocations are limited to 10 minutes. This means that the alert policy should trigger often enough that the archive task completes within this timeframe.

If the data volume into Pub/Sub is too much to be archived by such a periodic task, it is better handled by a proper streaming Dataflow job.

Cleaning up and next steps

gcloud functions delete Archiver
gcloud functions delete StackDriverRelay
gcloud pubsub subscriptions delete bulk-drainer --topic demo-data
gcloud pubsub topics delete drain-tasks
gcloud pubsub topics delete demo-data

You can choose to delete the notifications and alert policy in the console.

Next steps

There are several ways to extend this pattern:

Submit a tutorial

Share step-by-step guides

Submit a tutorial

Request a tutorial

Ask for community help

Submit a request

View tutorials

Search Google Cloud tutorials

View tutorials

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see our Site Policies. Java is a registered trademark of Oracle and/or its affiliates.