本文档提供了示例,演示了如何接收和解析有关数据配置文件更改的通知。敏感数据保护功能会以 Pub/Sub 消息的形式发送这些更新。
概览
您可以配置敏感数据保护,以自动生成有关组织、文件夹或项目中数据的配置文件。数据分析文件包含有关数据的指标和元数据,可帮助您确定敏感数据和高风险数据所在的位置。敏感数据保护会以不同详细级别报告这些指标。如需了解您可以分析的数据类型,请参阅支持的资源。
配置数据分析器时,您可以开启相应选项,以便在数据分析文件发生重大变化时发布 Pub/Sub 消息。这些消息可帮助您立即采取行动来应对这些更改。您可以监听以下事件:
- 首次分析数据资产。
- 更新个人资料。
- 配置文件的风险或敏感度得分增加。
- 您的数据配置文件出现了新错误。
数据分析器发布的 Pub/Sub 消息包含 DataProfilePubSubMessage
对象。这些消息始终以二进制格式发送,因此您需要编写用于接收和解析这些消息的代码。
价格
使用 Pub/Sub 时,您需要根据 Pub/Sub 价格付费。
准备工作
本页面假定您满足以下条件:
- 您熟悉如何使用 Pub/Sub。如需了解详情,请参阅快速入门使用控制台在 Pub/Sub 中发布和接收消息。
- 您已经在组织、文件夹或项目级层创建了扫描配置。
- 您熟悉如何配置 Google Cloud 客户端库。
在开始处理示例之前,请按以下步骤操作:
创建 Pub/Sub 主题并为其添加订阅。请勿为主题分配架构。
为简单起见,本页面上的示例仅监听一个订阅。不过,在实践中,您可以为 Sensitive Data Protection 支持的每种事件创建一个主题和订阅。
如果您尚未配置数据分析器以发布 Pub/Sub 消息,请执行以下操作:
向敏感数据保护服务代理授予对 Pub/Sub 主题的发布权限。具有发布权限的角色示例包括 Pub/Sub Publisher 角色 (
roles/pubsub.publisher
)。Sensitive Data Protection 服务代理是采用以下格式的电子邮件地址:service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
如果您使用的是组织级或文件夹级扫描配置,PROJECT_NUMBER 是服务代理容器的数字标识符。如果您使用的是项目级扫描配置,PROJECT_NUMBER 是项目的数字标识符。
安装并设置适用于 Java 或 Python 的 Sensitive Data Protection 客户端库。
示例
以下示例演示了如何接收和解析数据分析器发布的 Pub/Sub 消息。您可以将这些示例重新用于其他用途,并将其部署为由 Pub/Sub 事件触发的 Cloud Run 函数。如需了解详情,请参阅 Pub/Sub 教程(第 2 代)。
在以下示例中,替换以下内容:
- PROJECT_ID:包含 Pub/Sub 订阅的项目的 ID。
- SUBSCRIPTION_ID:Pub/Sub 订阅的 ID。
Java
import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DataProfilePubSubMessageParser {
public static void main(String... args) throws Exception {
String projectId = "PROJECT_ID";
String subscriptionId = "SUBSCRIPTION_ID";
int timeoutSeconds = 5;
// The `ProjectSubscriptionName.of` method creates a fully qualified identifier
// in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver =
(PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
try {
DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
pubsubMessage.getData());
System.out.println(
"PubsubMessage with ID: " + pubsubMessage.getMessageId()
+ "; message size: " + pubsubMessage.getData().size()
+ "; event: " + message.getEvent()
+ "; profile name: " + message.getProfile().getName()
+ "; full resource: " + message.getProfile().getFullResource());
consumer.ack();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
};
// Create subscriber client.
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
ApiService apiService = subscriber.startAsync();
apiService.awaitRunning();
System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
timeoutSeconds);
subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
} catch (TimeoutException ignored) {
} finally {
subscriber.stopAsync();
}
}
}
Python
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2
project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message.data}.")
dlp_msg = dlp_v2.DataProfilePubSubMessage()
dlp_msg._pb.ParseFromString(message.data)
print("Parsed message: ", dlp_msg)
print("--------")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
print("Done waiting.")
后续步骤
- 详细了解数据分析文件。
- 了解如何在组织、文件夹或项目级创建扫描配置。
- 请参阅演示如何使用 Pub/Sub 触发器编写、部署和触发简单事件驱动型 Cloud Run 函数的教程。