Recibe y analiza mensajes de Pub/Sub sobre los perfiles de datos

En este documento, se proporcionan ejemplos que demuestran cómo recibir y analizar notificaciones sobre cambios en tus perfiles de datos. La protección de datos sensibles envía estas actualizaciones en forma de mensajes de Pub/Sub.

Descripción general

Puedes configurar la protección de datos sensibles para que genere automáticamente perfiles sobre los datos en una organización, una carpeta o un proyecto. Los perfiles de datos contienen métricas y metadatos sobre tus datos y te ayudan a determinar dónde residen los datos sensibles y de alto riesgo. La protección de datos sensibles informa estas métricas en varios niveles de detalle. Para obtener información sobre los tipos de datos de los que puedes generar perfiles, consulta Recursos admitidos.

Cuando configuras el generador de perfiles de datos, puedes activar la opción para publicar mensajes de Pub/Sub cada vez que se produzcan cambios significativos en tus perfiles de datos. Los mensajes te ayudan a tomar medidas inmediatas en respuesta a esos cambios. Estos son los eventos que puedes escuchar:

  • Se crea un perfil de una tabla por primera vez.
  • Se actualiza un perfil.
  • El riesgo o la puntuación de sensibilidad de un perfil aumenta.
  • Hay un nuevo error relacionado con tus perfiles de datos.

Los mensajes de Pub/Sub que publica el generador de perfiles de datos contienen un objeto DataProfilePubSubMessage. Estos mensajes siempre se envían en formato binario, por lo que debes escribir un código que los reciba y los analice.

Precios

Cuando usas Pub/Sub, se te factura según los precios de Pub/Sub.

Antes de comenzar

En esta página, se supone lo siguiente:

Antes de empezar a trabajar con los ejemplos, sigue estos pasos:

  1. Crea un tema de Pub/Sub y agrégale una suscripción. No asignes un esquema al tema.

    Para simplificar, en los ejemplos de esta página, solo se escucha una suscripción. Sin embargo, en la práctica, puedes crear un tema y una suscripción para cada evento que admita la protección de datos sensibles.

  2. Si aún no lo hiciste, configura el generador de perfiles de datos para publicar mensajes de Pub/Sub:

    1. Edita la configuración de tu análisis.

    2. En la página Editar configuración de análisis, activa la opción Publicar en Pub/Sub y selecciona los eventos que deseas escuchar. Luego, establece la configuración de cada evento.

    3. Guarda la configuración del análisis.

  3. Otorga acceso de publicación al agente de servicio de protección de datos sensibles en el tema de Pub/Sub. Un ejemplo de una función que tiene acceso de publicación es la función de publicador de Pub/Sub (roles/pubsub.publisher). El agente de servicio de protección de datos sensibles es una dirección de correo electrónico con el siguiente formato:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Si trabajas con una configuración de análisis a nivel de organización o carpeta, PROJECT_NUMBER es el identificador numérico del contenedor del agente de servicio. Si trabajas con una configuración de análisis a nivel de proyecto, PROJECT_NUMBER es el identificador numérico de tu proyecto.

  4. Instala y configura la biblioteca cliente de protección de datos sensibles para Java o Python.

Ejemplos

En los siguientes ejemplos, se muestra cómo recibir y analizar mensajes de Pub/Sub que publica el generador de perfiles de datos. Puedes reutilizar estos ejemplos y, luego, implementarlos como Cloud Functions que se activan mediante eventos de Pub/Sub. Para obtener más información, consulta el instructivo de Pub/Sub (2ª gen..).

En los siguientes ejemplos, reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto que contiene la suscripción a Pub/Sub.
  • SUBSCRIPTION_ID: Es el ID de la suscripción a Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2

project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

¿Qué sigue?