데이터 프로필에 대한 Pub/Sub 메시지 수신 및 파싱

이 문서에서는 데이터 프로필 변경사항에 대한 알림을 수신하고 파싱하는 방법을 보여주는 예시를 제공합니다. Sensitive Data Protection은 Pub/Sub 메시지 형식으로 이러한 업데이트를 전송합니다.

개요

Sensitive Data Protection을 구성하여 조직, 폴더 또는 프로젝트 전체에서 데이터에 대한 프로필을 자동으로 생성할 수 있습니다. 데이터 프로필은 데이터에 대한 측정항목과 메타데이터를 포함하며 민감한 정보와 고위험 데이터를 저장할 위치를 결정하는 데 도움이 됩니다. Sensitive Data Protection은 이러한 측정항목을 다양한 세부 수준에서 보고합니다. 프로파일링할 수 있는 데이터 유형에 대한 자세한 내용은 지원되는 리소스를 참조하세요.

데이터 프로파일러를 구성할 때 데이터 프로필에서 중요한 변경사항이 발생할 때마다 Pub/Sub 메시지를 게시하는 옵션을 사용 설정할 수 있습니다. 이 메시지를 통해 이러한 변경사항에 대한 조치를 즉각적으로 취할 수 있습니다. 리슨할 수 있는 이벤트는 다음과 같습니다.

  • 테이블이 처음으로 프로파일링됩니다.
  • 프로필이 업데이트됩니다.
  • 프로필의 위험 또는 민감도 점수가 증가합니다.
  • 데이터 프로필과 관련된 새로운 오류가 발생했습니다.

데이터 프로파일러가 게시하는 Pub/Sub 메시지에는 DataProfilePubSubMessage 객체가 포함됩니다. 이러한 메시지는 항상 바이너리 형식으로 전송되므로 메시지를 수신하고 파싱하는 코드를 작성해야 합니다.

가격 책정

Pub/Sub를 사용하는 경우 Pub/Sub 가격 책정에 따라 요금이 청구됩니다.

시작하기 전에

이 페이지의 필요 조건은 다음과 같습니다.

예시 작업을 시작하기 전에 다음 단계를 수행합니다.

  1. Pub/Sub 주제를 만들고 이에 대한 구독을 추가합니다. 주제에 스키마를 할당하지 마세요.

    간결함을 위해 이 페이지의 예시에서는 하나의 구독만 수신합니다. 그러나 실제로는 Sensitive Data Protection에서 지원하는 각 이벤트에 대해 주제 및 구독을 만들 수 있습니다.

  2. 아직 만들지 않았다면 Pub/Sub 메시지를 게시하도록 데이터 프로파일러를 구성합니다.

    1. 스캔 구성을 수정합니다.

    2. 스캔 구성 수정 페이지에서 Pub/Sub에 게시 옵션을 사용 설정하고 리슨할 이벤트를 선택합니다. 그런 다음 각 이벤트의 설정을 구성합니다.

    3. 스캔 구성을 저장합니다.

  3. Pub/Sub 주제에 대한 Sensitive Data Protection 서비스 에이전트 게시 액세스 권한을 부여합니다. 게시 액세스 권한이 있는 역할의 예시로는 Pub/Sub 게시자 역할(roles/pubsub.publisher)이 있습니다. Sensitive Data Protection 서비스 에이전트는 다음 형식의 이메일 주소입니다.

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    조직 또는 폴더 수준 스캔 구성을 사용하는 경우 PROJECT_NUMBER서비스 에이전트 컨테이너의 숫자 식별자입니다. 프로젝트 수준 스캔 구성을 사용하는 경우 PROJECT_NUMBER는 프로젝트의 숫자 식별자입니다.

  4. 자바 또는 Python을 위한 Sensitive Data Protection 클라이언트 라이브러리를 설치하고 설정합니다.

예시

다음 예시에서는 데이터 프로파일러가 게시하는 Pub/Sub 메시지를 수신하고 파싱하는 방법을 보여줍니다. 이러한 예시의 용도를 변경하고 Pub/Sub 이벤트에서 트리거하는 Cloud Functions로 배포할 수 있습니다. 자세한 내용은 Pub/Sub 튜토리얼(2세대)를 참조하세요.

다음 예시에서 다음을 바꿉니다.

  • PROJECT_ID: Pub/Sub 구독이 포함된 프로젝트의 ID입니다.
  • SUBSCRIPTION_ID: Pub/Sub 주제의 ID입니다.

자바

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2

project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

다음 단계