Recevoir et analyser des messages Pub/Sub concernant les profils de données

Ce document fournit des exemples qui montrent comment recevoir et analyser des notifications concernant les modifications apportées à vos profils de données. La protection des données sensibles envoie ces mises à jour sous la forme de messages Pub/Sub.

Présentation

Vous pouvez configurer la protection des données sensibles pour générer automatiquement des profils sur les données d'une organisation, d'un dossier ou d'un projet. Les profils de données contiennent des métriques et des métadonnées sur vos données, et vous aident à déterminer où résident les données sensibles et à haut risque. La protection des données sensibles enregistre ces métriques à différents niveaux de détail. Pour en savoir plus sur les types de données que vous pouvez profiler, consultez la page Ressources acceptées.

Lors de la configuration du profileur de données, vous pouvez activer l'option permettant de publier des messages Pub/Sub chaque fois que des modifications importantes se produisent dans vos profils de données. Ils vous aident à prendre des mesures immédiates en réponse à ces modifications. Voici les événements que vous pouvez écouter:

  • Une table est profilée pour la première fois.
  • Un profil est mis à jour.
  • Le score de risque ou de sensibilité d'un profil augmente.
  • Une nouvelle erreur est survenue concernant vos profils de données.

Les messages Pub/Sub publiés par le profileur de données contiennent un objet DataProfilePubSubMessage. Ces messages sont toujours envoyés au format binaire. Vous devez donc écrire le code qui les reçoit et les analyse.

Tarification

L'utilisation de Pub/Sub est facturée selon les tarifs de Pub/Sub.

Avant de commencer

Cette page suppose ce qui suit:

Avant de commencer à travailler sur les exemples, procédez comme suit:

  1. Créez un sujet Pub/Sub et ajoutez-lui un abonnement. Ne pas attribuer de schéma au sujet

    Pour plus de simplicité, les exemples de cette page n'écoutent qu'un seul abonnement. Toutefois, en pratique, vous pouvez créer un sujet et un abonnement pour chaque événement pris en charge par la protection des données sensibles.

  2. Si vous ne l'avez pas déjà fait, configurez le Profileur de données pour publier des messages Pub/Sub:

    1. Modifiez votre configuration d'analyse.

    2. Sur la page Modifier la configuration d'analyse, activez l'option Publier sur Pub/Sub, puis sélectionnez les événements que vous souhaitez écouter. Ensuite, configurez les paramètres de chaque événement.

    3. Enregistrez la configuration d'analyse.

  3. Accordez l'accès en publication à l'agent de service de protection des données sensibles pour le sujet Pub/Sub. Le rôle Diffuseur Pub/Sub (roles/pubsub.publisher) est un exemple de rôle disposant d'un accès en publication. L'agent de service de protection des données sensibles est une adresse e-mail au format suivant:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Si vous utilisez une configuration d'analyse au niveau de l'organisation ou d'un dossier, PROJECT_NUMBER est l'identifiant numérique du conteneur de l'agent de service. Si vous utilisez une configuration d'analyse au niveau du projet, PROJECT_NUMBER est l'identifiant numérique de votre projet.

  4. Installez et configurez la bibliothèque cliente de protection des données sensibles pour Java ou Python.

Examples

Les exemples suivants montrent comment recevoir et analyser les messages Pub/Sub publiés par le profileur de données. Vous pouvez réutiliser ces exemples et les déployer en tant que Cloud Functions déclenchées par des événements Pub/Sub. Pour en savoir plus, consultez le tutoriel Pub/Sub (2e génération).

Dans les exemples suivants, remplacez ce qui suit:

  • PROJECT_ID: ID du projet contenant l'abonnement Pub/Sub.
  • SUBSCRIPTION_ID: ID de l'abonnement Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2

project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

Étapes suivantes