计算数据集的 k-匿名性

k-匿名性是数据集的一个属性,指示其记录的可重标识性。如果数据集中每个人的准标识符与该数据集中至少 k - 1 个其他人相同,则该数据集具有 k-匿名性。

您可以根据数据集的一个或多个列或字段计算 k-匿名性值。本主题演示了如何使用 Cloud Data Loss Prevention (DLP) 计算数据集的 k-匿名性值。如需从总体上详细了解 k-匿名性或风险分析,请参阅风险分析概念主题,然后再继续。

准备工作

在继续操作之前,请确保您已完成以下步骤:

  1. 登录您的 Google 帐号。
  2. 在 Google Cloud Console 的“项目选择器”页面上,选择或创建一个 Google Cloud 项目。
  3. 转到项目选择器
  4. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能
  5. 启用 Cloud DLP。
  6. 启用 Cloud DLP

  7. 选择要分析的 BigQuery 数据集。Cloud DLP 通过扫描 BigQuery 表来计算 k-匿名性指标
  8. 在数据集中确定一个标识符(如果适用)和至少一个准标识符。如需了解详情,请参阅风险分析术语和技术

计算 k-匿名性

每当风险分析作业运行时,Cloud DLP 都会进行风险分析。您必须先创建作业,方法是使用 Cloud Console、发送 DLP API 请求或使用 Cloud DLP 客户端库。

控制台

  1. 在 Cloud Console 中,打开 Cloud DLP。

    转到 Cloud DLP

  2. 创建菜单上,将光标指向作业或作业触发器,然后选择重标识风险分析

  3. 新的风险分析作业 (New risk analysis job) 页面的选择输入数据部分中,首先通过输入包含表的项目的 ID、表的数据集 ID 以及所指定的表的名称来指定要扫描的 BigQuery 表。

  4. 要计算的隐私权指标下,选择 k-匿名性

  5. 作业 ID 部分中,您可以视需要为作业提供自定义标识符,然后选择 Cloud DLP 将在其中处理您的数据的资源位置。完成操作后,请点击继续

  6. 定义字段部分中,您可为 k-匿名性风险作业指定标识符和准标识符。Cloud DLP 会访问您在上一步中指定的 BigQuery 表的元数据,并尝试填充字段列表。

    1. 选择相应的复选框,将字段指定为标识符 (ID) 或准标识符 (QI)。您必须选择 0 个或 1 个标识符以及至少 1 个准标识符。
    2. 如果 Cloud DLP 无法填充字段,请点击输入字段名称以手动输入一个或多个字段,并将每个字段设置为标识符或准标识符。完成操作后,请点击继续
  7. 添加操作部分中,您可以添加可选操作,以便在风险作业完成时执行。可用的选项包括:

    • 保存到 BigQuery:将风险分析扫描的结果保存到 BigQuery 表中。
    • 发布到 Pub/Sub:将通知发布到 Pub/Sub 主题
    • 通过电子邮件发送通知:向您发送电子邮件,其中包含结果。完成操作后,请点击创建

k-匿名性风险分析作业会立即启动。

协议

如需运行新的风险分析作业以计算 k-匿名性,请向 projects.dlpJobs 资源发送一个请求,其中 PROJECT_ID 表示项目标识符

https://dlp.googleapis.com/v2/projects/PROJECT_ID/dlpJobs

该请求包含一个由以下项组成的 RiskAnalysisJobConfig 对象:

  • PrivacyMetric 对象。您可以在此处添加 KAnonymityConfig 对象来表明您正在计算 k-匿名性。

  • BigQueryTable 对象。通过包括以下所有项指定要扫描的 BigQuery 表格:

    • projectId:表格所属项目的 ID。
    • datasetId:表格的数据集 ID。
    • tableId:表格的名称。
  • 由一个或多个 Action 对象组成的对象集,这些对象表示在作业完成时要按给定顺序运行的操作。每个 Action 对象都可以包含以下操作之一:

    KAnonymityConfig 对象中,指定以下内容:

    • quasiIds[]:要扫描的一个或多个准标识符(FieldId 对象),用于计算 k-匿名性。当您指定多个准标识符时,它们会被视为单个复合键。结构体和重复数据类型不受支持,但只要嵌套字段本身不是结构体或嵌套在重复字段中,嵌套字段就受支持。
    • entityId:可选的标识符值;如果设置此项,则指示在计算 k-匿名性时应将与每个不同 entityId 对应的所有行组合在一起。通常,entityId 将是表示唯一用户身份的列,例如客户 ID 或用户 ID。 如果 entityId 出现在多个具有不同准标识符值的行中,这些行将连接形成一个多集,以用作该实体的准标识符。如需详细了解实体 ID,请参阅风险分析概念主题中的实体 ID 与 k-匿名性计算

向 DLP API 发送请求后,它将立即启动风险分析作业。

Java

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库


import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.Action.PublishToPubSub;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.KAnonymityResult;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.KAnonymityResult.KAnonymityEquivalenceClass;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.KAnonymityResult.KAnonymityHistogramBucket;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.PrivacyMetric;
import com.google.privacy.dlp.v2.PrivacyMetric.KAnonymityConfig;
import com.google.privacy.dlp.v2.RiskAnalysisJobConfig;
import com.google.privacy.dlp.v2.Value;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

class RiskAnalysisKAnonymity {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String datasetId = "your-bigquery-dataset-id";
    String tableId = "your-bigquery-table-id";
    String topicId = "pub-sub-topic";
    String subscriptionId = "pub-sub-subscription";
    calculateKAnonymity(projectId, datasetId, tableId, topicId, subscriptionId);
  }

  public static void calculateKAnonymity(
      String projectId, String datasetId, String tableId, String topicId, String subscriptionId)
      throws ExecutionException, InterruptedException, IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {

      // Specify the BigQuery table to analyze
      BigQueryTable bigQueryTable =
          BigQueryTable.newBuilder()
              .setProjectId(projectId)
              .setDatasetId(datasetId)
              .setTableId(tableId)
              .build();

      // These values represent the column names of quasi-identifiers to analyze
      List<String> quasiIds = Arrays.asList("Age", "Mystery");

      // Configure the privacy metric for the job
      List<FieldId> quasiIdFields =
          quasiIds.stream()
              .map(columnName -> FieldId.newBuilder().setName(columnName).build())
              .collect(Collectors.toList());
      KAnonymityConfig kanonymityConfig =
          KAnonymityConfig.newBuilder().addAllQuasiIds(quasiIdFields).build();
      PrivacyMetric privacyMetric =
          PrivacyMetric.newBuilder().setKAnonymityConfig(kanonymityConfig).build();

      // Create action to publish job status notifications over Google Cloud Pub/Sub
      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      PublishToPubSub publishToPubSub =
          PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();
      Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

      // Configure the risk analysis job to perform
      RiskAnalysisJobConfig riskAnalysisJobConfig =
          RiskAnalysisJobConfig.newBuilder()
              .setSourceTable(bigQueryTable)
              .setPrivacyMetric(privacyMetric)
              .addActions(action)
              .build();

      // Build the request to be sent by the client
      CreateDlpJobRequest createDlpJobRequest =
          CreateDlpJobRequest.newBuilder()
              .setParent(LocationName.of(projectId, "global").toString())
              .setRiskJob(riskAnalysisJobConfig)
              .build();

      // Send the request to the API using the client
      DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);

      // Set up a Pub/Sub subscriber to listen on the job completion status
      final SettableApiFuture<Boolean> done = SettableApiFuture.create();

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      MessageReceiver messageHandler =
          (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
            handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
          };
      Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
      subscriber.startAsync();

      // Wait for job completion semi-synchronously
      // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
      try {
        done.get(15, TimeUnit.MINUTES);
      } catch (TimeoutException e) {
        System.out.println("Job was not completed after 15 minutes.");
        return;
      } finally {
        subscriber.stopAsync();
        subscriber.awaitTerminated();
      }

      // Build a request to get the completed job
      GetDlpJobRequest getDlpJobRequest =
          GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();

      // Retrieve completed job status
      DlpJob completedJob = dlpServiceClient.getDlpJob(getDlpJobRequest);
      System.out.println("Job status: " + completedJob.getState());

      // Get the result and parse through and process the information
      KAnonymityResult kanonymityResult = completedJob.getRiskDetails().getKAnonymityResult();
      List<KAnonymityHistogramBucket> histogramBucketList =
          kanonymityResult.getEquivalenceClassHistogramBucketsList();
      for (KAnonymityHistogramBucket result : histogramBucketList) {
        System.out.printf(
            "Bucket size range: [%d, %d]\n",
            result.getEquivalenceClassSizeLowerBound(), result.getEquivalenceClassSizeUpperBound());

        for (KAnonymityEquivalenceClass bucket : result.getBucketValuesList()) {
          List<String> quasiIdValues =
              bucket.getQuasiIdsValuesList().stream()
                  .map(Value::toString)
                  .collect(Collectors.toList());

          System.out.println("\tQuasi-ID values: " + String.join(", ", quasiIdValues));
          System.out.println("\tClass size: " + bucket.getEquivalenceClassSize());
        }
      }
    }
  }

  // handleMessage injects the job and settableFuture into the message reciever interface
  private static void handleMessage(
      DlpJob job,
      SettableApiFuture<Boolean> done,
      PubsubMessage pubsubMessage,
      AckReplyConsumer ackReplyConsumer) {
    String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
    if (job.getName().equals(messageAttribute)) {
      done.set(true);
      ackReplyConsumer.ack();
    } else {
      ackReplyConsumer.nack();
    }
  }
}

Node.js

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const projectId = 'my-project';

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = 'my-project';

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

// A set of columns that form a composite key ('quasi-identifiers')
// const quasiIds = [{ name: 'age' }, { name: 'city' }];
async function kAnonymityAnalysis() {
  const sourceTable = {
    projectId: tableProjectId,
    datasetId: datasetId,
    tableId: tableId,
  };
  // Construct request for creating a risk analysis job

  const request = {
    parent: `projects/${projectId}/locations/global`,
    riskJob: {
      privacyMetric: {
        kAnonymityConfig: {
          quasiIds: quasiIds,
        },
      },
      sourceTable: sourceTable,
      actions: [
        {
          pubSub: {
            topic: `projects/${projectId}/topics/${topicId}`,
          },
        },
      ],
    },
  };

  // Create helper function for unpacking values
  const getValue = obj => obj[Object.keys(obj)[0]];

  // Run risk analysis job
  const [topicResponse] = await pubsub.topic(topicId).get();
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  setTimeout(() => {
    console.log(' Waiting for DLP job to fully complete');
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  const histogramBuckets =
    job.riskDetails.kAnonymityResult.equivalenceClassHistogramBuckets;

  histogramBuckets.forEach((histogramBucket, histogramBucketIdx) => {
    console.log(`Bucket ${histogramBucketIdx}:`);
    console.log(
      `  Bucket size range: [${histogramBucket.equivalenceClassSizeLowerBound}, ${histogramBucket.equivalenceClassSizeUpperBound}]`
    );

    histogramBucket.bucketValues.forEach(valueBucket => {
      const quasiIdValues = valueBucket.quasiIdsValues
        .map(getValue)
        .join(', ');
      console.log(`  Quasi-ID values: {${quasiIdValues}}`);
      console.log(`  Class size: ${valueBucket.equivalenceClassSize}`);
    });
  });
}
kAnonymityAnalysis();

Python

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

def k_anonymity_analysis(
    project,
    table_project_id,
    dataset_id,
    table_id,
    topic_id,
    subscription_id,
    quasi_ids,
    timeout=300,
):
    """Uses the Data Loss Prevention API to compute the k-anonymity of a
        column set in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        quasi_ids: A set of columns that form a composite key.
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """
    import concurrent.futures

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # Create helper function for unpacking values
    def get_values(obj):
        return int(obj.integer_value)

    # Instantiate a client.
    dlp = google.cloud.dlp_v2.DlpServiceClient()

    # Convert the project id into a full resource id.
    topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
    parent = f"projects/{project}/locations/global"

    # Location info of the BigQuery table.
    source_table = {
        "project_id": table_project_id,
        "dataset_id": dataset_id,
        "table_id": table_id,
    }

    # Convert quasi id list to Protobuf type
    def map_fields(field):
        return {"name": field}

    quasi_ids = map(map_fields, quasi_ids)

    # Tell the API where to send a notification when the job is complete.
    actions = [{"pub_sub": {"topic": topic}}]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        "privacy_metric": {"k_anonymity_config": {"quasi_ids": quasi_ids}},
        "source_table": source_table,
        "actions": actions,
    }

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(request={"parent": parent, "risk_job": risk_job})

    def callback(message):
        if message.attributes["DlpJobName"] == operation.name:
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(request={"name": operation.name})
            histogram_buckets = (
                job.risk_details.k_anonymity_result.equivalence_class_histogram_buckets
            )
            # Print bucket stats
            for i, bucket in enumerate(histogram_buckets):
                print("Bucket {}:".format(i))
                if bucket.equivalence_class_size_lower_bound:
                    print(
                        "   Bucket size range: [{}, {}]".format(
                            bucket.equivalence_class_size_lower_bound,
                            bucket.equivalence_class_size_upper_bound,
                        )
                    )
                    for value_bucket in bucket.bucket_values:
                        print(
                            "   Quasi-ID values: {}".format(
                                map(get_values, value_bucket.quasi_ids_values)
                            )
                        )
                        print(
                            "   Class size: {}".format(
                                value_bucket.equivalence_class_size
                            )
                        )
            subscription.set_result(None)
        else:
            # This is not the message we're looking for.
            message.drop()

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_id)
    subscription = subscriber.subscribe(subscription_path, callback)

    try:
        subscription.result(timeout=timeout)
    except concurrent.futures.TimeoutError:
        print(
            "No event received before the timeout. Please verify that the "
            "subscription provided is subscribed to the topic provided."
        )
        subscription.close()

Go

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

import (
	"context"
	"fmt"
	"io"
	"strings"
	"time"

	dlp "cloud.google.com/go/dlp/apiv2"
	"cloud.google.com/go/pubsub"
	dlppb "google.golang.org/genproto/googleapis/privacy/dlp/v2"
)

// riskKAnonymity computes the risk of the given columns using K Anonymity.
func riskKAnonymity(w io.Writer, projectID, dataProject, pubSubTopic, pubSubSub, datasetID, tableID string, columnNames ...string) error {
	// projectID := "my-project-id"
	// dataProject := "bigquery-public-data"
	// pubSubTopic := "dlp-risk-sample-topic"
	// pubSubSub := "dlp-risk-sample-sub"
	// datasetID := "nhtsa_traffic_fatalities"
	// tableID := "accident_2015"
	// columnNames := "state_number" "county"
	ctx := context.Background()
	client, err := dlp.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("dlp.NewClient: %v", err)
	}

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("Error creating PubSub client: %v", err)
	}
	defer pubsubClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(projectID, pubSubTopic, pubSubSub)
	if err != nil {
		return fmt.Errorf("setupPubSub: %v", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + projectID + "/topics/" + pubSubTopic

	// Build the QuasiID slice.
	var q []*dlppb.FieldId
	for _, c := range columnNames {
		q = append(q, &dlppb.FieldId{Name: c})
	}

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_KAnonymityConfig_{
						KAnonymityConfig: &dlppb.PrivacyMetric_KAnonymityConfig{
							QuasiIds: q,
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(ctx, req)
	if err != nil {
		return fmt.Errorf("CreateDlpJob: %v", err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	// This only waits for 10 minutes. For long jobs, consider using a truly
	// asynchronous execution model such as Cloud Functions.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
	defer cancel()
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		j, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			fmt.Fprintf(w, "GetDlpJob: %v", err)
			return
		}
		h := j.GetRiskDetails().GetKAnonymityResult().GetEquivalenceClassHistogramBuckets()
		for i, b := range h {
			fmt.Fprintf(w, "Histogram bucket %v\n", i)
			fmt.Fprintf(w, "  Size range: [%v,%v]\n", b.GetEquivalenceClassSizeLowerBound(), b.GetEquivalenceClassSizeUpperBound())
			fmt.Fprintf(w, "  %v unique values total\n", b.GetBucketSize())
			for _, v := range b.GetBucketValues() {
				var qvs []string
				for _, qv := range v.GetQuasiIdsValues() {
					qvs = append(qvs, qv.String())
				}
				fmt.Fprintf(w, "    QuasiID values: %s\n", strings.Join(qvs, ", "))
				fmt.Fprintf(w, "    Class size: %v\n", v.GetEquivalenceClassSize())
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

PHP

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

/**
 * Computes the k-anonymity of a column set in a Google BigQuery table.
 */
use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric\KAnonymityConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\PubSubClient;

/** Uncomment and populate these variables in your code */
// $callingProjectId = 'The project ID to run the API call under';
// $dataProjectId = 'The project ID containing the target Datastore';
// $topicId = 'The name of the Pub/Sub topic to notify once the job completes';
// $subscriptionId = 'The name of the Pub/Sub subscription to use when listening for job';
// $datasetId = 'The ID of the dataset to inspect';
// $tableId = 'The ID of the table to inspect';
// $quasiIdNames = 'Comma-separated list of columns that form a composite key (quasi-identifiers)';

// Instantiate a client.
$dlp = new DlpServiceClient([
    'projectId' => $callingProjectId,
]);
$pubsub = new PubSubClient([
    'projectId' => $callingProjectId,
]);
$topic = $pubsub->topic($topicId);

// Construct risk analysis config
$quasiIds = array_map(
    function ($id) {
        return (new FieldId())->setName($id);
    },
    explode(',', $quasiIdNames)
);

$statsConfig = (new KAnonymityConfig())
    ->setQuasiIds($quasiIds);

$privacyMetric = (new PrivacyMetric())
    ->setKAnonymityConfig($statsConfig);

// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
    ->setProjectId($dataProjectId)
    ->setDatasetId($datasetId)
    ->setTableId($tableId);

// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
    ->setTopic($topic->name());

$action = (new Action())
    ->setPubSub($pubSubAction);

// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
    ->setPrivacyMetric($privacyMetric)
    ->setSourceTable($bigqueryTable)
    ->setActions([$action]);

// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);

// Submit request
$parent = "projects/$callingProjectId/locations/global";
$job = $dlp->createDlpJob($parent, [
    'riskJob' => $riskJob
]);

// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
    foreach ($subscription->pull() as $message) {
        if (isset($message->attributes()['DlpJobName']) &&
            $message->attributes()['DlpJobName'] === $job->getName()) {
            $subscription->acknowledge($message);
            // Get the updated job. Loop to avoid race condition with DLP API.
            do {
                $job = $dlp->getDlpJob($job->getName());
            } while ($job->getState() == JobState::RUNNING);
            break 2; // break from parent do while
        }
    }
    printf('Waiting for job to complete' . PHP_EOL);
    // Exponential backoff with max delay of 60 seconds
    sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout

// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
    case JobState::DONE:
        $histBuckets = $job->getRiskDetails()->getKAnonymityResult()->getEquivalenceClassHistogramBuckets();

        foreach ($histBuckets as $bucketIndex => $histBucket) {
            // Print bucket stats
            printf('Bucket %s:' . PHP_EOL, $bucketIndex);
            printf(
                '  Bucket size range: [%s, %s]' . PHP_EOL,
                $histBucket->getEquivalenceClassSizeLowerBound(),
                $histBucket->getEquivalenceClassSizeUpperBound()
            );

            // Print bucket values
            foreach ($histBucket->getBucketValues() as $percent => $valueBucket) {
                // Pretty-print quasi-ID values
                print('  Quasi-ID values:' . PHP_EOL);
                foreach ($valueBucket->getQuasiIdsValues() as $index => $value) {
                    print('    ' . $value->serializeToJsonString() . PHP_EOL);
                }
                printf(
                    '  Class size: %s' . PHP_EOL,
                    $valueBucket->getEquivalenceClassSize()
                );
            }
        }

        break;
    case JobState::FAILED:
        printf('Job %s had errors:' . PHP_EOL, $job->getName());
        $errors = $job->getErrors();
        foreach ($errors as $error) {
            var_dump($error->getDetails());
        }
        break;
    case JobState::PENDING:
        printf('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
        break;
    default:
        printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
}

C#

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库


using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.Dlp.V2.Action.Types;
using static Google.Cloud.Dlp.V2.PrivacyMetric.Types;

public class RiskAnalysisCreateKAnonymity
{
    public static AnalyzeDataSourceRiskDetails.Types.KAnonymityResult KAnonymity(
        string callingProjectId,
        string tableProjectId,
        string datasetId,
        string tableId,
        string topicId,
        string subscriptionId,
        IEnumerable<FieldId> quasiIds)
    {
        var dlp = DlpServiceClient.Create();

        // Construct + submit the job
        var KAnonymityConfig = new KAnonymityConfig
        {
            QuasiIds = { quasiIds }
        };

        var config = new RiskAnalysisJobConfig
        {
            PrivacyMetric = new PrivacyMetric
            {
                KAnonymityConfig = KAnonymityConfig
            },
            SourceTable = new BigQueryTable
            {
                ProjectId = tableProjectId,
                DatasetId = datasetId,
                TableId = tableId
            },
            Actions =
            {
                new Google.Cloud.Dlp.V2.Action
                {
                    PubSub = new PublishToPubSub
                    {
                        Topic = $"projects/{callingProjectId}/topics/{topicId}"
                    }
                }
            }
        };

        var submittedJob = dlp.CreateDlpJob(
            new CreateDlpJobRequest
            {
                ParentAsProjectName = new ProjectName(callingProjectId),
                RiskJob = config
            });

        // Listen to pub/sub for the job
        var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
        var subscriber = SubscriberClient.CreateAsync(
            subscriptionName).Result;

        // SimpleSubscriber runs your message handle function on multiple
        // threads to maximize throughput.
        var done = new ManualResetEventSlim(false);
        subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            if (message.Attributes["DlpJobName"] == submittedJob.Name)
            {
                Thread.Sleep(500); // Wait for DLP API results to become consistent
                done.Set();
                return Task.FromResult(SubscriberClient.Reply.Ack);
            }
            else
            {
                return Task.FromResult(SubscriberClient.Reply.Nack);
            }
        });

        done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
        subscriber.StopAsync(CancellationToken.None).Wait();

        // Process results
        var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
        {
            DlpJobName = DlpJobName.Parse(submittedJob.Name)
        });

        var result = resultJob.RiskDetails.KAnonymityResult;

        for (var bucketIdx = 0; bucketIdx < result.EquivalenceClassHistogramBuckets.Count; bucketIdx++)
        {
            var bucket = result.EquivalenceClassHistogramBuckets[bucketIdx];
            Console.WriteLine($"Bucket {bucketIdx}");
            Console.WriteLine($"  Bucket size range: [{bucket.EquivalenceClassSizeLowerBound}, {bucket.EquivalenceClassSizeUpperBound}].");
            Console.WriteLine($"  {bucket.BucketSize} unique value(s) total.");

            foreach (var bucketValue in bucket.BucketValues)
            {
                // 'UnpackValue(x)' is a prettier version of 'x.toString()'
                Console.WriteLine($"    Quasi-ID values: [{String.Join(',', bucketValue.QuasiIdsValues.Select(x => UnpackValue(x)))}]");
                Console.WriteLine($"    Class size: {bucketValue.EquivalenceClassSize}");
            }
        }

        return result;
    }

    public static string UnpackValue(Value protoValue)
    {
        var jsonValue = JsonConvert.DeserializeObject<Dictionary<string, object>>(protoValue.ToString());
        return jsonValue.Values.ElementAt(0).ToString();
    }
}

列出已完成的风险分析作业

您可以查看当前项目中已运行的风险分析作业的列表。

控制台

如需在 Cloud Console 中列出正在运行和先前运行的风险分析作业,请执行以下操作:

  1. 在 Cloud Console 中,打开 Cloud DLP。

    转到 Cloud DLP

  2. 点击页面顶部的作业和作业触发器标签页。

  3. 点击风险作业标签页。

系统会显示风险列表。

协议

如需列出正在运行和先前运行的风险分析作业,请向 projects.dlpJobs 资源发送 GET 请求。添加作业类型过滤条件 (?type=RISK_ANALYSIS_JOB) 可将响应范围缩小至风险分析作业。

https://dlp.googleapis.com/v2/projects/PROJECT_ID/dlpJobs?type=RISK_ANALYSIS_JOB

您收到的响应将以 JSON 表示法包含所有当前和以前的风险分析作业。

查看 k-匿名性作业结果

Cloud Console 中的 Cloud DLP 包含已完成的 k-匿名性作业的内置可视化功能。按照上一部分中的说明进行操作后,请在风险分析作业列表中,选择您要查看其结果的作业。假设作业已成功运行,风险分析详情页面顶部将如下所示:

该页面顶部列出了有关 k-匿名性风险作业的信息,包括其作业 ID 以及其资源位置(在容器下)。

如需查看 k-匿名性计算的结果,请点击 K-匿名性标签页。如需查看风险分析作业的配置,请点击配置标签页。

K-匿名性标签页首先列出实体 ID(如果有)以及用于计算 k-匿名性的准标识符

风险图表

重标识风险图表在 y 轴上绘制唯一行和唯一准标识符组合的潜在数据泄露百分比,以在 x 轴上实现 k-匿名性值。。图表的颜色还表示潜在风险程度。较深的蓝色表示风险较高,而较浅的蓝色表示风险较低。

k-匿名性值越大,表示重标识的风险越小。但是,要实现较大的 k-匿名性值,您需要移除总行数的较大百分比以及较大的唯一准标识符组合,这可能会降低数据的效用。如需查看某个 k-匿名性值的特定潜在泄露百分比值,请将光标悬停在图表上。如屏幕截图所示,图表上会显示一个提示。

如需查看特定 k-匿名性值的更多详情,请点击相应的数据点。图表下方会显示详细的说明,并且页面下方显示了示例数据表。

风险示例数据表

风险作业结果页面的第二个组成部分是示例数据表。它会显示给定目标 k-匿名性值的准标识符组合。

表的第一列列出了 k-匿名性值。点击 k-匿名性值以查看为达到该值而需要丢弃的相应样本数据

第二列会显示不重复的行和准标识符组合的各自潜在的数据泄露,以及具有至少 k 条记录的群组数和记录总数。

最后一列会显示共用一个准标识符组合的群组的一个示例,以及该组合存在的记录数量。

使用 REST 检索作业详情

如需使用 REST API 检索 k-匿名性风险分析作业,请将以下 GET 请求发送到 projects.dlpJobs 资源。将 PROJECT_ID 替换为您的项目 ID,并将 JOB_ID 替换为您要获取其结果的作业的标识符。作业 ID 在启动作业时返回,也可通过列出所有作业来检索。

GET https://dlp.googleapis.com/v2/projects/PROJECT_ID/dlpJobs/JOB_ID

该请求会返回包含作业实例的 JSON 对象。分析的结果位于 AnalyzeDataSourceRiskDetails 对象的 "riskDetails" 键中。如需了解详情,请参阅 DlpJob 资源的 API 参考文档。

后续步骤

  • 了解如何计算数据集的 l-多样性
  • 了解如何计算数据集的 k-图值。
  • 了解如何计算数据集的 δ-存在性值。