Cómo recibir y analizar mensajes de Pub/Sub sobre perfiles de datos

En este documento, se proporcionan ejemplos que demuestran cómo recibir y analizar notificaciones sobre cambios en tus perfiles de datos. La Protección de datos sensibles envía estas actualizaciones en forma de mensajes de Pub/Sub.

Descripción general

Puedes configurar la protección de datos sensibles para generar automáticamente perfiles sobre los datos en una organización, una carpeta o un proyecto. Los perfiles de datos contienen métricas y metadatos sobre tus datos y te ayudan a determinar dónde residen los datos sensibles y de alto riesgo. La protección de datos sensibles informa estas métricas en varios niveles de detalle. Para obtener información sobre los tipos de datos que puedes perfilar, consulta Recursos compatibles.

Cuando configures el generador de perfiles de datos, puedes activar la opción para publicar mensajes de Pub/Sub cada vez que se produzcan cambios significativos en tus perfiles de datos. Los mensajes te ayudan a tomar medidas inmediatas en respuesta a esos cambios. Los siguientes son los eventos que puedes escuchar:

  • Se genera un perfil de un recurso de datos por primera vez.
  • Se actualiza un perfil.
  • Aumenta el riesgo o la puntuación de sensibilidad de un perfil.
  • Hay un nuevo error relacionado con tus perfiles de datos.

Los mensajes de Pub/Sub que publica el generador de perfiles de datos contienen un objeto DataProfilePubSubMessage. Estos mensajes siempre se envían en formato binario, por lo que debes escribir código que los reciba y analice.

Precios

Cuando usas Pub/Sub, se te factura según los precios de Pub/Sub.

Antes de comenzar

En esta página, se supone lo siguiente:

Antes de comenzar a trabajar en los ejemplos, sigue estos pasos:

  1. Crea un tema de Pub/Sub y agrégale una suscripción. No asignes un esquema al tema.

    Para simplificar, los ejemplos de esta página solo escuchan una suscripción. Sin embargo, en la práctica, puedes crear un tema y una suscripción para cada evento que admita la Protección de datos sensibles.

  2. Si aún no lo hiciste, configura el generador de perfiles de datos para que publique mensajes de Pub/Sub:

    1. Edita la configuración de análisis.

    2. En la página Editar configuración de análisis, activa la opción Publicar en Pub/Sub y selecciona los eventos que deseas escuchar. Luego, configura la configuración para cada evento.

    3. Guarda la configuración de análisis.

  3. Otorga acceso de publicación al agente de servicio de Protección de datos sensibles en el tema de Pub/Sub. Un ejemplo de un rol que tiene acceso de publicación es el rol de publicador de Pub/Sub (roles/pubsub.publisher). El agente de servicio de Protección de datos sensibles es una dirección de correo electrónico con el siguiente formato:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Si trabajas con una configuración de análisis a nivel de la organización o de la carpeta, PROJECT_NUMBER es el identificador numérico del contenedor del agente de servicio. Si trabajas con una configuración de análisis a nivel del proyecto, PROJECT_NUMBER es el identificador numérico de tu proyecto.

  4. Instala y configura la biblioteca cliente de Sensitive Data Protection para Java o Python.

Ejemplos

En los siguientes ejemplos, se muestra cómo recibir y analizar los mensajes de Pub/Sub que publica el generador de perfiles de datos. Puedes adaptar estos ejemplos y, luego, implementarlos como funciones de Cloud Run que se activan con eventos de Pub/Sub. Para obtener más información, consulta el instructivo de Pub/Sub (2ª gen.).

En los siguientes ejemplos, reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto que contiene la suscripción a Pub/Sub.
  • SUBSCRIPTION_ID: Es el ID de la suscripción a Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

¿Qué sigue?