Pub/Sub-Nachrichten zu Datenprofilen empfangen und analysieren

In diesem Dokument finden Sie Beispiele, die zeigen, wie Sie Benachrichtigungen zu Änderungen an Ihren Datenprofilen empfangen und analysieren. Der Dienst zum Schutz sensibler Daten sendet diese Aktualisierungen in Form von Pub/Sub-Nachrichten.

Übersicht

Sie können den Schutz sensibler Daten so konfigurieren, dass automatisch Profile für Daten in einer Organisation, einem Ordner oder einem Projekt generiert werden. Datenprofile enthalten Messwerte und Metadaten zu Ihren Daten und können ermitteln, wo sich sensible und risikoreiche Daten befinden. Der Schutz sensibler Daten meldet diese Messwerte auf verschiedenen Detailebenen. Informationen zu den Datentypen, die Sie profilieren können, finden Sie unter Unterstützte Ressourcen.

Beim Konfigurieren des Datenprofils können Sie die Option aktivieren, Pub/Sub-Nachrichten zu veröffentlichen, wenn sich Ihre Datenprofile erheblich ändern. Anhand der Nachrichten können Sie sofort auf diese Änderungen reagieren. Die folgenden Ereignisse können Sie beobachten:

  • Ein Daten-Asset wird zum ersten Mal profiliert.
  • Ein Profil wird aktualisiert.
  • Der Risiko- oder Vertraulichkeitsfaktor eines Profils steigt.
  • Es gibt einen neuen Fehler im Zusammenhang mit Ihren Datenprofilen.

Die vom Data Profiler veröffentlichten Pub/Sub-Nachrichten enthalten ein DataProfilePubSubMessage-Objekt. Diese Nachrichten werden immer im Binärformat gesendet. Sie müssen also Code schreiben, der sie empfängt und analysiert.

Preise

Wenn Sie Pub/Sub verwenden, werden Ihnen die Kosten gemäß den Pub/Sub-Preisen in Rechnung gestellt.

Hinweise

Auf dieser Seite wird Folgendes vorausgesetzt:

Führen Sie die folgenden Schritte aus, bevor Sie mit den Beispielen beginnen:

  1. Erstellen Sie ein Pub/Sub-Thema und fügen Sie ihm ein Abo hinzu. Weisen Sie dem Thema kein Schema zu.

    Der Einfachheit halber hören die Beispiele auf dieser Seite nur auf ein Abo. In der Praxis können Sie jedoch für jedes Ereignis, das vom Schutz sensibler Daten unterstützt wird, ein Thema und ein Abo erstellen.

  2. Konfigurieren Sie den Datenprofiler, falls noch nicht geschehen, so, dass Pub/Sub-Nachrichten veröffentlicht werden:

    1. Bearbeiten Sie die Scankonfiguration.

    2. Aktivieren Sie auf der Seite Scankonfiguration bearbeiten die Option In Pub/Sub veröffentlichen und wählen Sie die Ereignisse aus, auf die Sie hören möchten. Konfigurieren Sie dann die Einstellungen für jedes Ereignis.

    3. Speichern Sie die Scankonfiguration.

  3. Weisen Sie dem Dienst-Agent für den Schutz sensibler Daten Veröffentlichungszugriff auf das Pub/Sub-Thema zu. Ein Beispiel für eine Rolle mit Veröffentlichungszugriff ist die Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher). Der Dienst-Agent für den Schutz sensibler Daten ist eine E-Mail-Adresse im folgenden Format:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Wenn Sie mit einer Scankonfiguration auf Organisations- oder Ordnerebene arbeiten, ist PROJECT_NUMBER die numerische Kennung des Dienst-Agent-Containers. Wenn Sie eine Scankonfiguration auf Projektebene verwenden, ist PROJECT_NUMBER die numerische Kennung Ihres Projekts.

  4. Installieren und richten Sie die Clientbibliothek für den Schutz sensibler Daten für Java oder Python ein.

Beispiele

In den folgenden Beispielen wird gezeigt, wie Pub/Sub-Nachrichten empfangen und geparst werden, die vom Data Profiler veröffentlicht werden. Sie können diese Beispiele wiederverwenden und als Cloud Run-Funktionen bereitstellen, die durch Pub/Sub-Ereignisse ausgelöst werden. Weitere Informationen finden Sie in der Pub/Sub-Anleitung (2. Generation).

Ersetzen Sie in den folgenden Beispielen Folgendes:

  • PROJECT_ID: die ID des Projekts, das das Pub/Sub-Abo enthält.
  • SUBSCRIPTION_ID: die ID des Pub/Sub-Abos.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

Nächste Schritte