接收和解析有关数据分析文件的 Pub/Sub 消息

本文档提供了一些示例,演示了如何接收和解析有关数据分析文件更改的通知。敏感数据保护以 Pub/Sub 消息的形式发送这些更新。

概览

您可以配置敏感数据保护,以自动生成有关组织、文件夹或项目中的数据的配置文件。数据分析文件包含有关数据的指标和元数据,可帮助您确定敏感数据和高风险数据所在的位置。敏感数据保护会以各种详细级别报告这些指标。如需了解您可以分析的数据类型,请参阅支持的资源

配置数据分析器时,您可以启用相关选项,以便在数据分析文件发生重大变化时发布 Pub/Sub 消息。这些消息可帮助您立即采取措施来响应这些更改。以下是您可以监听的事件:

  • 首次对表进行分析。
  • 配置文件已更新。
  • 个人资料的风险或敏感度分数会增加。
  • 有一个与您的数据分析文件相关的新错误。

数据分析器发布的 Pub/Sub 消息包含 DataProfilePubSubMessage 对象。这些消息始终以二进制格式发送,因此您需要编写用于接收和解析这些消息的代码。

价格

使用 Pub/Sub 时,您需要根据 Pub/Sub 价格付费。

准备工作

本页面假定您满足以下条件:

在开始处理示例之前,请按以下步骤操作:

  1. 创建 Pub/Sub 主题并为其添加订阅。请勿为该主题指定架构。

    为简单起见,本页中的示例仅监听一个订阅。但在实践中,您可以为敏感数据保护支持的每个事件创建主题和订阅。

  2. 配置数据分析器以发布 Pub/Sub 消息(如果您尚未这样做):

    1. 修改扫描配置。

    2. 修改扫描配置页面上,启用发布到 Pub/Sub 选项,然后选择要监听的事件。然后,为每个事件配置设置

    3. 保存扫描配置。

  3. 向敏感数据保护服务代理授予 Pub/Sub 主题的发布访问权限。例如,Pub/Sub Publisher 角色 (roles/pubsub.publisher) 就是具有发布访问权限的角色。敏感数据保护服务代理是一个电子邮件地址,格式如下:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    如果您使用的是组织或文件夹级扫描配置,则 PROJECT_NUMBER服务代理容器的数字标识符。如果您使用的是项目级扫描配置,则 PROJECT_NUMBER 是项目的数字标识符。

  4. 安装并设置适用于 Java 或 Python 的敏感数据保护客户端库

示例

以下示例演示了如何接收和解析数据性能分析器发布的 Pub/Sub 消息。您可以调整这些示例的用途,并将其部署为由 Pub/Sub 事件触发的 Cloud Functions 函数。如需了解详情,请参阅 Pub/Sub 教程(第 2 代)

在以下示例中,替换以下内容:

  • PROJECT_ID:包含 Pub/Sub 订阅的项目的 ID。
  • SUBSCRIPTION_ID:Pub/Sub 订阅的 ID。

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2

project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

后续步骤