Receive and parse Pub/Sub messages about data profiles

Stay organized with collections Save and categorize content based on your preferences.

This document provides examples that demonstrate how to receive and parse notifications about changes to your data profiles. Cloud DLP sends these updates in the form of Pub/Sub messages.


You can configure Cloud DLP to automatically generate profiles about BigQuery data across an organization, folder, or project. Data profiles contain metrics and metadata about your tables and help you determine where sensitive and high-risk data reside. Cloud DLP reports these metrics at the project, table, and column levels. For more information, see Data profiles for BigQuery data.

When configuring the data profiler, you can turn on the option to publish Pub/Sub messages whenever significant changes in your data profiles occur. The messages help you take immediate action in response to those changes. The following are the events that you can listen for:

  • A table is profiled for the first time.
  • A profile is updated.
  • The risk or sensitivity score of a profile increases.
  • There is a new error related to your data profiles.

The Pub/Sub messages that the data profiler publishes contain a DataProfilePubSubMessage object. These messages are always sent in binary format, so you need to write code that receives and parses them.

When you use Pub/Sub, you are billed according to Pub/Sub pricing.

Before you begin

This page assumes the following:

Before you start working on the examples, follow these steps:

  1. Create a Pub/Sub topic and add a subscription for it. Don't assign a schema to the topic.

    For simplicity, the examples on this page listen to only one subscription. However, in practice, you can create a topic and subscription for each event that Cloud DLP supports.

  2. If you haven't already done so, configure the data profiler to publish Pub/Sub messages:

    1. Edit your scan configuration.

    2. On the Edit scan configuration page, turn on the Publish to Pub/Sub option and select the events that you want to listen for. Then, configure the settings for each event.

    3. Save the scan configuration.

  3. Grant the Cloud DLP service agent publishing access on the Pub/Sub topic. An example of a role that has publishing access is the Pub/Sub Publisher role (roles/pubsub.publisher). The Cloud DLP service agent is an email address in the format:

    If you're working with an organization- or folder-level scan configuration, the PROJECT_NUMBER is the numerical identifier of the service agent container. If you're working with a project-level scan configuration, the PROJECT_NUMBER is the numerical identifier of your project.

  4. Install and set up the Cloud DLP client library for Java or Python.


The following examples demonstrate how to receive and parse Pub/Sub messages that the data profiler publishes. You can repurpose these examples and deploy them as Cloud Functions that are triggered by Pub/Sub events. For more information, see Pub/Sub tutorial (2nd gen).

In the following examples, replace the following:

  • PROJECT_ID: the ID of the project that contains the Pub/Sub subscription.
  • SUBSCRIPTION_ID: the ID of the Pub/Sub subscription.


import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
          } catch (InvalidProtocolBufferException e) {

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {


from import pubsub_v1
from concurrent.futures import TimeoutError
from import dlp_v2

project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    print("Parsed message: ", dlp_msg)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

What's next