This document provides examples that demonstrate how to receive and parse notifications about changes to your data profiles. Sensitive Data Protection sends these updates in the form of Pub/Sub messages.
Overview
You can configure Sensitive Data Protection to automatically generate profiles about data across an organization, folder, or project. Data profiles contain metrics and metadata about your data and help you determine where sensitive and high-risk data reside. Sensitive Data Protection reports these metrics at various levels of detail. For information about the types of data you can profile, see Supported resources.
When configuring the data profiler, you can turn on the option to publish Pub/Sub messages whenever significant changes in your data profiles occur. The messages help you take immediate action in response to those changes. The following are the events that you can listen for:
- A data asset is profiled for the first time.
- A profile is updated.
- The risk or sensitivity score of a profile increases.
- There is a new error related to your data profiles.
The Pub/Sub messages that the data profiler publishes contain a
DataProfilePubSubMessage
object. These messages are always sent in binary
format, so you need to write code that receives and parses them.
Pricing
When you use Pub/Sub, you are billed according to Pub/Sub pricing.
Before you begin
This page assumes the following:
- You are familiar with using Pub/Sub. For an introduction, see the quickstart Publish and receive messages in Pub/Sub by using the console.
- You already have a scan configuration at the organization, folder or project level.
- You are familiar with configuring Google Cloud client libraries.
Before you start working on the examples, follow these steps:
Create a Pub/Sub topic and add a subscription for it. Don't assign a schema to the topic.
For simplicity, the examples on this page listen to only one subscription. However, in practice, you can create a topic and subscription for each event that Sensitive Data Protection supports.
If you haven't already done so, configure the data profiler to publish Pub/Sub messages:
Edit your scan configuration.
On the Edit scan configuration page, turn on the Publish to Pub/Sub option and select the events that you want to listen for. Then, configure the settings for each event.
Save the scan configuration.
Grant the Sensitive Data Protection service agent publishing access on the Pub/Sub topic. An example of a role that has publishing access is the Pub/Sub Publisher role (
roles/pubsub.publisher
). The Sensitive Data Protection service agent is an email address in the format:service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
If you're working with an organization- or folder-level scan configuration, the PROJECT_NUMBER is the numerical identifier of the service agent container. If you're working with a project-level scan configuration, the PROJECT_NUMBER is the numerical identifier of your project.
Install and set up the Sensitive Data Protection client library for Java or Python.
Examples
The following examples demonstrate how to receive and parse Pub/Sub messages that the data profiler publishes. You can repurpose these examples and deploy them as Cloud Run functions that are triggered by Pub/Sub events. For more information, see Pub/Sub tutorial (2nd gen).
In the following examples, replace the following:
- PROJECT_ID: the ID of the project that contains the Pub/Sub subscription.
- SUBSCRIPTION_ID: the ID of the Pub/Sub subscription.
Java
import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DataProfilePubSubMessageParser {
public static void main(String... args) throws Exception {
String projectId = "PROJECT_ID";
String subscriptionId = "SUBSCRIPTION_ID";
int timeoutSeconds = 5;
// The `ProjectSubscriptionName.of` method creates a fully qualified identifier
// in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver =
(PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
try {
DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
pubsubMessage.getData());
System.out.println(
"PubsubMessage with ID: " + pubsubMessage.getMessageId()
+ "; message size: " + pubsubMessage.getData().size()
+ "; event: " + message.getEvent()
+ "; profile name: " + message.getProfile().getName()
+ "; full resource: " + message.getProfile().getFullResource());
consumer.ack();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
};
// Create subscriber client.
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
ApiService apiService = subscriber.startAsync();
apiService.awaitRunning();
System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
timeoutSeconds);
subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
} catch (TimeoutException ignored) {
} finally {
subscriber.stopAsync();
}
}
}
Python
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2
project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message.data}.")
dlp_msg = dlp_v2.DataProfilePubSubMessage()
dlp_msg._pb.ParseFromString(message.data)
print("Parsed message: ", dlp_msg)
print("--------")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
print("Done waiting.")
What's next
- Learn more about data profiles.
- Learn how to create a scan configuration at the organization, folder or project level.
- Work through a tutorial that demonstrates how to write, deploy, and trigger a simple event-driven Cloud Run function with a Pub/Sub trigger.