Ricevere e analizzare i messaggi Pub/Sub relativi ai profili di dati

Questo documento fornisce esempi che mostrano come ricevere e analizzare le notifiche relative alle modifiche ai profili dei dati. Sensitive Data Protection invia questi aggiornamenti sotto forma di messaggi Pub/Sub.

Panoramica

Puoi configurare Sensitive Data Protection in modo che generi automaticamente profili dei dati in un'organizzazione, una cartella o un progetto. I profili di dati contengono metriche e metadati sui tuoi dati e ti aiutano a determinare dove si trovano i dati sensibili e ad alto rischio. Sensitive Data Protection genera report su queste metriche a vari livelli di dettaglio. Per informazioni sui tipi di dati che puoi profilare, consulta Risorse supportate.

Quando configuri il profiler dei dati, puoi attivare l'opzione per pubblicare messaggi Pub/Sub ogni volta che si verificano variazioni significative nei profili dei dati. I messaggi ti aiutano ad adottare immediatamente provvedimenti in risposta a queste modifiche. Di seguito sono riportati gli eventi che puoi ascoltare:

  • Viene creato il profilo di una risorsa di dati per la prima volta.
  • Un profilo viene aggiornato.
  • Il punteggio di rischio o sensibilità di un profilo aumenta.
  • Si è verificato un nuovo errore relativo ai tuoi profili di dati.

I messaggi Pub/Sub pubblicati dal profiler dei dati contengono un oggetto DataProfilePubSubMessage. Questi messaggi vengono sempre inviati in formato binario, quindi devi scrivere codice che li riceva e li analizzi.

Prezzi

Quando utilizzi Pub/Sub, la fatturazione avviene in base ai prezzi di Pub/Sub.

Prima di iniziare

Questa pagina presuppone quanto segue:

Prima di iniziare a lavorare sugli esempi, segui questi passaggi:

  1. Crea un argomento Pub/Sub e aggiungi una sottoscrizione. Non assegnare uno schema all'argomento.

    Per semplicità, gli esempi in questa pagina ascoltano un solo abbonamento. Tuttavia, in pratica, puoi creare un argomento e una sottoscrizione per ogni evento supportato da Sensitive Data Protection.

  2. Se non l'hai già fatto, configura il profiler dei dati per pubblicare messaggi Pub/Sub:

    1. Modifica la configurazione della scansione.

    2. Nella pagina Modifica configurazione scansione, attiva l'opzione Pubblica su Pub/Sub e seleziona gli eventi che vuoi ascoltare. Quindi, configura le impostazioni per ogni evento.

    3. Salva la configurazione della scansione.

  3. Concede all'agente di servizio Sensitive Data Protection l'accesso di pubblicazione all'argomento Pub/Sub. Un esempio di un ruolo con accesso alla pubblicazione è il ruolo Publisher Pub/Sub (roles/pubsub.publisher). L'agente di servizio Sensitive Data Protection è un indirizzo email nel formato:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Se utilizzi una configurazione di scansione a livello di organizzazione o di cartella, PROJECT_NUMBER è l'identificatore numerico del contenitore dell'agente di servizio. Se utilizzi una configurazione di scansione a livello di progetto,PROJECT_NUMBER è l'identificatore numerico del progetto.

  4. Installa e configura la libreria client Sensitive Data Protection per Java o Python.

Esempi

Gli esempi riportati di seguito mostrano come ricevere e analizzare i messaggi Pub/Sub pubblicati dal profiler dei dati. Puoi riutilizzare questi esempi ed eseguirne il deployment come funzioni Cloud Run attivate da eventi Pub/Sub. Per ulteriori informazioni, consulta il tutorial su Pub/Sub (2ª generazione.).

Negli esempi seguenti, sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto che contiene l'abbonamento Pub/Sub.
  • SUBSCRIPTION_ID: l'ID dell'abbonamento Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

Passaggi successivi