Recevoir et analyser des messages Pub/Sub sur les profils de données

Ce document fournit des exemples qui montrent comment recevoir et analyser des notifications sur les modifications apportées à vos profils de données. La protection des données sensibles envoie ces mises à jour sous la forme de messages Pub/Sub.

Présentation

Vous pouvez configurer la protection des données sensibles pour générer automatiquement des profils sur les données au sein d'une organisation, d'un dossier ou d'un projet. Les profils de données contiennent des métriques et des métadonnées sur vos données, et vous permettent de déterminer l'emplacement des données sensibles et à haut risque. La protection des données sensibles signale ces métriques à différents niveaux de détail. Pour en savoir plus sur les types de données que vous pouvez profiler, consultez la section Ressources compatibles.

Lorsque vous configurez le profileur de données, vous pouvez activer l'option permettant de publier des messages Pub/Sub chaque fois que des modifications importantes sont apportées à vos profils de données. Ces messages vous aident à prendre des mesures immédiates en réponse à ces modifications. Vous pouvez écouter les événements suivants:

  • Un élément de données est profilé pour la première fois.
  • Un profil est mis à jour.
  • Le score de risque ou de sensibilité d'un profil augmente.
  • Une nouvelle erreur est liée à vos profils de données.

Les messages Pub/Sub que le profileur de données publie contiennent un objet DataProfilePubSubMessage. Ces messages sont toujours envoyés au format binaire. Vous devez donc écrire du code qui les reçoit et les analyse.

Tarifs

Lorsque vous utilisez Pub/Sub, vous êtes facturé selon les tarifs de Pub/Sub.

Avant de commencer

Cette page suppose que vous avez déjà:

Avant de commencer à travailler sur les exemples, procédez comme suit:

  1. Créez un sujet Pub/Sub et ajoutez-y un abonnement. N'attribuez pas de schéma au sujet.

    Pour des raisons de simplicité, les exemples de cette page n'écoutent qu'un seul abonnement. Toutefois, en pratique, vous pouvez créer un sujet et un abonnement pour chaque événement compatible avec la protection des données sensibles.

  2. Si ce n'est pas déjà fait, configurez le profileur de données pour publier des messages Pub/Sub:

    1. Modifiez votre configuration d'analyse.

    2. Sur la page Modifier la configuration d'analyse, activez l'option Publier sur Pub/Sub, puis sélectionnez les événements que vous souhaitez écouter. Ensuite, configurez les paramètres de chaque événement.

    3. Enregistrez la configuration de l'analyse.

  3. Accordez à l'agent de service de protection des données sensibles un accès en publication sur le sujet Pub/Sub. Le rôle Diffuseur Pub/Sub (roles/pubsub.publisher) est un exemple de rôle disposant d'un accès en publication. L'agent de service de la protection des données sensibles est une adresse e-mail au format suivant:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Si vous utilisez une configuration d'analyse au niveau de l'organisation ou du dossier, PROJECT_NUMBER correspond à l'identifiant numérique du conteneur de l'agent de service. Si vous utilisez une configuration d'analyse au niveau du projet, PROJECT_NUMBER est l'identifiant numérique de votre projet.

  4. Installez et configurez la bibliothèque cliente de protection des données sensibles pour Java ou Python.

Examples

Les exemples suivants montrent comment recevoir et analyser les messages Pub/Sub publiés par le profileur de données. Vous pouvez réutiliser ces exemples et les déployer en tant que fonctions Cloud Run déclenchées par des événements Pub/Sub. Pour en savoir plus, consultez le tutoriel Pub/Sub (2e génération).

Dans les exemples suivants, remplacez les éléments suivants:

  • PROJECT_ID: ID du projet contenant l'abonnement Pub/Sub.
  • SUBSCRIPTION_ID: ID de l'abonnement Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

Étape suivante