Receive and parse Pub/Sub messages about data profiles

This document provides examples that demonstrate how to receive and parse notifications about changes to your data profiles. Sensitive Data Protection sends these updates in the form of Pub/Sub messages.

Overview

You can configure Sensitive Data Protection to automatically generate profiles about data across an organization, folder, or project. Data profiles contain metrics and metadata about your data and help you determine where sensitive and high-risk data reside. Sensitive Data Protection reports these metrics at various levels of detail. For information about the types of data you can profile, see Supported resources.

When configuring the data profiler, you can turn on the option to publish Pub/Sub messages whenever significant changes in your data profiles occur. The messages help you take immediate action in response to those changes. The following are the events that you can listen for:

  • A data asset is profiled for the first time.
  • A profile is updated.
  • The risk or sensitivity score of a profile increases.
  • There is a new error related to your data profiles.

The Pub/Sub messages that the data profiler publishes contain a DataProfilePubSubMessage object. These messages are always sent in binary format, so you need to write code that receives and parses them.

Pricing

When you use Pub/Sub, you are billed according to Pub/Sub pricing.

Before you begin

This page assumes the following:

Before you start working on the examples, follow these steps:

  1. Create a Pub/Sub topic and add a subscription for it. Don't assign a schema to the topic.

    For simplicity, the examples on this page listen to only one subscription. However, in practice, you can create a topic and subscription for each event that Sensitive Data Protection supports.

  2. If you haven't already done so, configure the data profiler to publish Pub/Sub messages:

    1. Edit your scan configuration.

    2. On the Edit scan configuration page, turn on the Publish to Pub/Sub option and select the events that you want to listen for. Then, configure the settings for each event.

    3. Save the scan configuration.

  3. Grant the Sensitive Data Protection service agent publishing access on the Pub/Sub topic. An example of a role that has publishing access is the Pub/Sub Publisher role (roles/pubsub.publisher). The Sensitive Data Protection service agent is an email address in the format:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    If you're working with an organization- or folder-level scan configuration, the PROJECT_NUMBER is the numerical identifier of the service agent container. If you're working with a project-level scan configuration, the PROJECT_NUMBER is the numerical identifier of your project.

  4. Install and set up the Sensitive Data Protection client library for Java or Python.

Examples

The following examples demonstrate how to receive and parse Pub/Sub messages that the data profiler publishes. You can repurpose these examples and deploy them as Cloud Run functions that are triggered by Pub/Sub events. For more information, see Pub/Sub tutorial (2nd gen).

In the following examples, replace the following:

  • PROJECT_ID: the ID of the project that contains the Pub/Sub subscription.
  • SUBSCRIPTION_ID: the ID of the Pub/Sub subscription.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

What's next