再識別と開示リスクの評価

リスク分析とは、機密データを分析し、対象が特定されるリスクを増大させるおそれのある特性を見つけ出すプロセスです。リスク分析の手法を使用して、匿名化を行う前に効果的な匿名化方式を決定したり、匿名化を行った後で変更や異常値をモニタリングしたりできます。

Cloud Data Loss Prevention(DLP)では、k-匿名性、l-多様性、k-マップ、δ-存在性の 4 つのリスク指標を計算できます。リスク分析やこれらの指標に関する知識がない場合は、まずリスク分析の概念のトピックをご覧ください。

このトピックでは、Cloud DLP でこれらの指標を使って構造化データのリスク分析を行う方法の概要を説明します。

Cloud DLP を使用したリスク分析

Cloud DLP では、BigQuery テーブルに保存されている構造化データを分析して、次のプライバシー指標を計算できます。

Cloud DLP によるリスク分析は、リスク分析ジョブの実行時に行われます。そのため、最初にリスク分析ジョブを作成する必要があります。リスク分析ジョブを作成するには、次の URL にリクエストを送信します。

POST https://dlp.googleapis.com/v2/{parent=projects/*}/dlpJobs

このリクエストには、次の項目で構成される RiskAnalysisJobConfig オブジェクトが含まれます。

Cloud DLP を使った k-匿名性の計算

k-匿名性は、データセットのレコードの再識別可能性を示すプロパティです。データセット内の各人物の準識別子が、データセット内の少なくとも k - 1 人の他の人物と同一である場合、そのデータセットは k-匿名性を持っています。

k-匿名性の値は 1 つ以上の列(フィールド)に基づいて計算できます。まず、上記の Cloud DLP を使ったリスク分析の手順を行ってください。

KAnonymityConfig オブジェクト内で、次の項目を指定します。

  • quasiIds[]: k-匿名性を計算するためにスキャンして使用する、1 つ以上の順識別子(FieldId オブジェクト)。複数の準識別子を指定すると、単一の複合キーとみなされます。構造体と繰り返しデータ型はサポートされていませんが、ネストされたフィールドは、それ自体が構造体ではなく、繰り返しフィールド内でネストされている限り、サポートされます。
  • entityId: 任意指定の識別子値。これを設定した場合、個々の entityId に対応する各行をすべてグループにまとめて k-匿名性を計算することを示します。通常、entityId はお客様 ID やユーザー ID などの一意のユーザーを表す列になります。それぞれ異なる準識別子の値を持つ entityId が複数の行に存在する場合、それらの行を結合して 1 つのマルチセットが作成され、そのエンティティの準識別子として使用されます。エンティティ ID の詳細については、リスク分析の概念のトピックにあるエンティティ ID と k-匿名性の計算をご覧ください。

コードの例

次に、Cloud DLP を使用して k-匿名性の値を計算する方法を示すサンプルコードをいくつかの言語で示します。

Java

/**
 * Calculate k-anonymity for quasi-identifiers in a BigQuery table using the DLP API.
 *
 * @param projectId The Google Cloud Platform project ID to run the API call under.
 * @param datasetId The BigQuery dataset to analyze.
 * @param tableId The BigQuery table to analyze.
 * @param quasiIds The names of columns that form a composite key ('quasi-identifiers').
 * @param topicId The name of the Pub/Sub topic to notify once the job completes
 * @param subscriptionId The name of the Pub/Sub subscription to use when listening for job
 *     completion status.
 */
private static void calculateKAnonymity(
    String projectId,
    String datasetId,
    String tableId,
    List<String> quasiIds,
    String topicId,
    String subscriptionId)
    throws Exception {
  // Instantiates a client
  try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {

    List<FieldId> quasiIdFields =
        quasiIds
            .stream()
            .map(columnName -> FieldId.newBuilder().setName(columnName).build())
            .collect(Collectors.toList());

    KAnonymityConfig kanonymityConfig =
        KAnonymityConfig.newBuilder().addAllQuasiIds(quasiIdFields).build();

    BigQueryTable bigQueryTable =
        BigQueryTable.newBuilder()
            .setProjectId(projectId)
            .setDatasetId(datasetId)
            .setTableId(tableId)
            .build();

    PrivacyMetric privacyMetric =
        PrivacyMetric.newBuilder().setKAnonymityConfig(kanonymityConfig).build();

    String topicName = String.format("projects/%s/topics/%s", projectId, topicId);

    PublishToPubSub publishToPubSub = PublishToPubSub.newBuilder().setTopic(topicName).build();

    // Create action to publish job status notifications over Google Cloud Pub/Sub
    Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

    RiskAnalysisJobConfig riskAnalysisJobConfig =
        RiskAnalysisJobConfig.newBuilder()
            .setSourceTable(bigQueryTable)
            .setPrivacyMetric(privacyMetric)
            .addActions(action)
            .build();

    CreateDlpJobRequest createDlpJobRequest =
        CreateDlpJobRequest.newBuilder()
            .setParent(ProjectName.of(projectId).toString())
            .setRiskJob(riskAnalysisJobConfig)
            .build();

    DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
    String dlpJobName = dlpJob.getName();

    final SettableApiFuture<Boolean> done = SettableApiFuture.create();

    // Set up a Pub/Sub subscriber to listen on the job completion status
    Subscriber subscriber =
        Subscriber.newBuilder(
                ProjectSubscriptionName.newBuilder()
                    .setProject(projectId)
                    .setSubscription(subscriptionId)
                    .build(),
          (pubsubMessage, ackReplyConsumer) -> {
            if (pubsubMessage.getAttributesCount() > 0
                && pubsubMessage.getAttributesMap().get("DlpJobName").equals(dlpJobName)) {
              // notify job completion
              done.set(true);
              ackReplyConsumer.ack();
            }
          })
            .build();
    subscriber.startAsync();

    // Wait for job completion semi-synchronously
    // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
    try {
      done.get(1, TimeUnit.MINUTES);
      Thread.sleep(500); // Wait for the job to become available
    } catch (TimeoutException e) {
      System.out.println("Unable to verify job completion.");
    }

    // Retrieve completed job status
    DlpJob completedJob =
        dlpServiceClient.getDlpJob(GetDlpJobRequest.newBuilder().setName(dlpJobName).build());

    System.out.println("Job status: " + completedJob.getState());
    AnalyzeDataSourceRiskDetails riskDetails = completedJob.getRiskDetails();

    KAnonymityResult kanonymityResult = riskDetails.getKAnonymityResult();
    for (KAnonymityHistogramBucket result :
        kanonymityResult.getEquivalenceClassHistogramBucketsList()) {
      System.out.printf(
          "Bucket size range: [%d, %d]\n",
          result.getEquivalenceClassSizeLowerBound(), result.getEquivalenceClassSizeUpperBound());

      for (KAnonymityEquivalenceClass bucket : result.getBucketValuesList()) {
        List<String> quasiIdValues =
            bucket
                .getQuasiIdsValuesList()
                .stream()
                .map(v -> v.toString())
                .collect(Collectors.toList());

        System.out.println("\tQuasi-ID values: " + String.join(", ", quasiIdValues));
        System.out.println("\tClass size: " + bucket.getEquivalenceClassSize());
      }
    }
  } catch (Exception e) {
    System.out.println("Error in calculateKAnonymity: " + e.getMessage());
  }
}

Node.js

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const callingProjectId = process.env.GCLOUD_PROJECT;

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = process.env.GCLOUD_PROJECT;

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

// A set of columns that form a composite key ('quasi-identifiers')
// const quasiIds = [{ name: 'age' }, { name: 'city' }];

const sourceTable = {
  projectId: tableProjectId,
  datasetId: datasetId,
  tableId: tableId,
};

// Construct request for creating a risk analysis job
const request = {
  parent: dlp.projectPath(callingProjectId),
  riskJob: {
    privacyMetric: {
      kAnonymityConfig: {
        quasiIds: quasiIds,
      },
    },
    sourceTable: sourceTable,
    actions: [
      {
        pubSub: {
          topic: `projects/${callingProjectId}/topics/${topicId}`,
        },
      },
    ],
  },
};

// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];

try {
  // Run risk analysis job
  const [topicResponse] = await pubsub.topic(topicId).get();
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  setTimeout(() => {
    console.log(` Waiting for DLP job to fully complete`);
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  const histogramBuckets =
    job.riskDetails.kAnonymityResult.equivalenceClassHistogramBuckets;

  histogramBuckets.forEach((histogramBucket, histogramBucketIdx) => {
    console.log(`Bucket ${histogramBucketIdx}:`);
    console.log(
      `  Bucket size range: [${
        histogramBucket.equivalenceClassSizeLowerBound
      }, ${histogramBucket.equivalenceClassSizeUpperBound}]`
    );

    histogramBucket.bucketValues.forEach(valueBucket => {
      const quasiIdValues = valueBucket.quasiIdsValues
        .map(getValue)
        .join(', ');
      console.log(`  Quasi-ID values: {${quasiIdValues}}`);
      console.log(`  Class size: ${valueBucket.equivalenceClassSize}`);
    });
  });
} catch (err) {
  console.log(`Error in kAnonymityAnalysis: ${err.message || err}`);
}

Python

def k_anonymity_analysis(project, table_project_id, dataset_id, table_id,
                         topic_id, subscription_id, quasi_ids, timeout=300):
    """Uses the Data Loss Prevention API to compute the k-anonymity of a
        column set in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        quasi_ids: A set of columns that form a composite key.
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # Create helper function for unpacking values
    def get_values(obj):
        return int(obj.integer_value)

    def callback(message):
        if (message.attributes['DlpJobName'] == operation.name):
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(operation.name)
            histogram_buckets = (job.risk_details
                                    .k_anonymity_result
                                    .equivalence_class_histogram_buckets)
            # Print bucket stats
            for i, bucket in enumerate(histogram_buckets):
                print('Bucket {}:'.format(i))
                if bucket.equivalence_class_size_lower_bound:
                    print('   Bucket size range: [{}, {}]'.format(
                        bucket.equivalence_class_size_lower_bound,
                        bucket.equivalence_class_size_upper_bound))
                    for value_bucket in bucket.bucket_values:
                        print('   Quasi-ID values: {}'.format(
                            map(get_values, value_bucket.quasi_ids_values)
                        ))
                        print('   Class size: {}'.format(
                            value_bucket.equivalence_class_size))
            subscription.set_result(None)
        else:
            # This is not the message we're looking for.
            message.drop()

    # Instantiate a client.
    dlp = google.cloud.dlp.DlpServiceClient()

    # Convert the project id into a full resource id.
    parent = dlp.project_path(project)

    # Location info of the BigQuery table.
    source_table = {
        'project_id': table_project_id,
        'dataset_id': dataset_id,
        'table_id': table_id
    }

    # Convert quasi id list to Protobuf type
    def map_fields(field):
        return {'name': field}

    quasi_ids = map(map_fields, quasi_ids)

    # Tell the API where to send a notification when the job is complete.
    actions = [{
        'pub_sub': {'topic': '{}/topics/{}'.format(parent, topic_id)}
    }]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        'privacy_metric': {
            'k_anonymity_config': {
                'quasi_ids': quasi_ids
            }
        },
        'source_table': source_table,
        'actions': actions
    }

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_id)
    subscription = subscriber.subscribe(subscription_path, callback)

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(parent, risk_job=risk_job)

    try:
        subscription.result(timeout=timeout)
    except TimeoutError:
        print('No event received before the timeout. Please verify that the '
              'subscription provided is subscribed to the topic provided.')
        subscription.close()

Go

// riskKAnonymity computes the risk of the given columns using K Anonymity.
func riskKAnonymity(w io.Writer, client *dlp.Client, project, dataProject, pubSubTopic, pubSubSub, datasetID, tableID string, columnNames ...string) {
	ctx := context.Background()

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pClient, err := pubsub.NewClient(ctx, project)
	if err != nil {
		log.Fatalf("Error creating PubSub client: %v", err)
	}
	defer pClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(ctx, pClient, project, pubSubTopic, pubSubSub)
	if err != nil {
		log.Fatalf("Error setting up PubSub: %v\n", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + project + "/topics/" + pubSubTopic

	// Build the QuasiID slice.
	var q []*dlppb.FieldId
	for _, c := range columnNames {
		q = append(q, &dlppb.FieldId{Name: c})
	}

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: "projects/" + project,
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_KAnonymityConfig_{
						KAnonymityConfig: &dlppb.PrivacyMetric_KAnonymityConfig{
							QuasiIds: q,
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(context.Background(), req)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	ctx, cancel := context.WithCancel(ctx)
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		j, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			log.Fatalf("Error getting completed job: %v\n", err)
		}
		h := j.GetRiskDetails().GetKAnonymityResult().GetEquivalenceClassHistogramBuckets()
		for i, b := range h {
			fmt.Fprintf(w, "Histogram bucket %v\n", i)
			fmt.Fprintf(w, "  Size range: [%v,%v]\n", b.GetEquivalenceClassSizeLowerBound(), b.GetEquivalenceClassSizeUpperBound())
			fmt.Fprintf(w, "  %v unique values total\n", b.GetBucketSize())
			for _, v := range b.GetBucketValues() {
				var qvs []string
				for _, qv := range v.GetQuasiIdsValues() {
					qvs = append(qvs, qv.String())
				}
				fmt.Fprintf(w, "    QuasiID values: %s\n", strings.Join(qvs, ", "))
				fmt.Fprintf(w, "    Class size: %v\n", v.GetEquivalenceClassSize())
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		log.Fatalf("Error receiving from PubSub: %v\n", err)
	}
}

PHP

use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric\KAnonymityConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Computes the k-anonymity of a column set in a Google BigQuery table.
 *
 * @param string $callingProjectId The project ID to run the API call under
 * @param string $dataProjectId The project ID containing the target Datastore
 * @param string $topicId The name of the Pub/Sub topic to notify once the job completes
 * @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
 * @param string $datasetId The ID of the dataset to inspect
 * @param string $tableId The ID of the table to inspect
 * @param array $quasiIdNames A set of columns that form a composite key ('quasi-identifiers')
 */
function k_anonymity(
    $callingProjectId,
    $dataProjectId,
    $topicId,
    $subscriptionId,
    $datasetId,
    $tableId,
    $quasiIdNames
) {
    // Instantiate a client.
    $dlp = new DlpServiceClient([
        'projectId' => $callingProjectId,
    ]);
    $pubsub = new PubSubClient([
        'projectId' => $callingProjectId,
    ]);
    $topic = $pubsub->topic($topicId);

    // Construct risk analysis config
    $quasiIds = array_map(
        function ($id) {
            return (new FieldId())->setName($id);
        },
        $quasiIdNames
    );

    $statsConfig = (new KAnonymityConfig())
        ->setQuasiIds($quasiIds);

    $privacyMetric = (new PrivacyMetric())
        ->setKAnonymityConfig($statsConfig);

    // Construct items to be analyzed
    $bigqueryTable = (new BigQueryTable())
        ->setProjectId($dataProjectId)
        ->setDatasetId($datasetId)
        ->setTableId($tableId);

    // Construct the action to run when job completes
    $pubSubAction = (new PublishToPubSub())
        ->setTopic($topic->name());

    $action = (new Action())
        ->setPubSub($pubSubAction);

    // Construct risk analysis job config to run
    $riskJob = (new RiskAnalysisJobConfig())
        ->setPrivacyMetric($privacyMetric)
        ->setSourceTable($bigqueryTable)
        ->setActions([$action]);

    // Listen for job notifications via an existing topic/subscription.
    $subscription = $topic->subscription($subscriptionId);

    // Submit request
    $parent = $dlp->projectName($callingProjectId);
    $job = $dlp->createDlpJob($parent, [
        'riskJob' => $riskJob
    ]);

    // Poll via Pub/Sub until job finishes
    while (true) {
        foreach ($subscription->pull() as $message) {
            if (isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()) {
                $subscription->acknowledge($message);
                break 2;
            }
        }
    }

    // Sleep for one second to avoid race condition with the job's status.
    usleep(1000000);

    // Get the updated job
    $job = $dlp->getDlpJob($job->getName());

    // Helper function to convert Protobuf values to strings
    $value_to_string = function ($value) {
        $json = json_decode($value->serializeToJsonString(), true);
        return array_shift($json);
    };

    // Print finding counts
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), $job->getState());
    switch ($job->getState()) {
        case JobState::DONE:
            $histBuckets = $job->getRiskDetails()->getKAnonymityResult()->getEquivalenceClassHistogramBuckets();

            foreach ($histBuckets as $bucketIndex => $histBucket) {
                // Print bucket stats
                printf('Bucket %s:' . PHP_EOL, $bucketIndex);
                printf(
                    '  Bucket size range: [%s, %s]' . PHP_EOL,
                    $histBucket->getEquivalenceClassSizeLowerBound(),
                    $histBucket->getEquivalenceClassSizeUpperBound()
                );

                // Print bucket values
                foreach ($histBucket->getBucketValues() as $percent => $valueBucket) {
                    // Pretty-print quasi-ID values
                    print('  Quasi-ID values: {');
                    foreach ($valueBucket->getQuasiIdsValues() as $index => $value) {
                        print(($index !== 0 ? ', ' : '') . $value_to_string($value));
                    }
                    print('}' . PHP_EOL);
                    printf(
                        '  Class size: %s' . PHP_EOL,
                        $valueBucket->getEquivalenceClassSize()
                    );
                }
            }

            break;
        case JobState::FAILED:
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            $errors = $job->getErrors();
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        default:
            printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
    }
}

C#

public static object KAnonymity(
    string callingProjectId,
    string tableProjectId,
    string datasetId,
    string tableId,
    string topicId,
    string subscriptionId,
    IEnumerable<FieldId> quasiIds)
{
    DlpServiceClient dlp = DlpServiceClient.Create();

    // Construct + submit the job
    var KAnonymityConfig = new KAnonymityConfig
    {
        QuasiIds = { quasiIds }
    };

    var config = new RiskAnalysisJobConfig
    {
        PrivacyMetric = new PrivacyMetric
        {
            KAnonymityConfig = KAnonymityConfig
        },
        SourceTable = new BigQueryTable
        {
            ProjectId = tableProjectId,
            DatasetId = datasetId,
            TableId = tableId
        },
        Actions =
        {
            new Google.Cloud.Dlp.V2.Action
            {
                PubSub = new PublishToPubSub
                {
                    Topic = $"projects/{callingProjectId}/topics/{topicId}"
                }
            }
        }
    };

    var submittedJob = dlp.CreateDlpJob(
        new CreateDlpJobRequest
        {
            ParentAsProjectName = new ProjectName(callingProjectId),
            RiskJob = config
        });

    // Listen to pub/sub for the job
    var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
    SubscriberClient subscriber = SubscriberClient.CreateAsync(
        subscriptionName).Result;

    // SimpleSubscriber runs your message handle function on multiple
    // threads to maximize throughput.
    var done = new ManualResetEventSlim(false);
    subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
    {
        if (message.Attributes["DlpJobName"] == submittedJob.Name)
        {
            Thread.Sleep(500); // Wait for DLP API results to become consistent
            done.Set();
            return Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            return Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

    done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
    subscriber.StopAsync(CancellationToken.None).Wait();

    // Process results
    var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
    {
        DlpJobName = DlpJobName.Parse(submittedJob.Name)
    });

    var result = resultJob.RiskDetails.KAnonymityResult;

    for (int bucketIdx = 0; bucketIdx < result.EquivalenceClassHistogramBuckets.Count; bucketIdx++)
    {
        var bucket = result.EquivalenceClassHistogramBuckets[bucketIdx];
        Console.WriteLine($"Bucket {bucketIdx}");
        Console.WriteLine($"  Bucket size range: [{bucket.EquivalenceClassSizeLowerBound}, {bucket.EquivalenceClassSizeUpperBound}].");
        Console.WriteLine($"  {bucket.BucketSize} unique value(s) total.");

        foreach (var bucketValue in bucket.BucketValues)
        {
            // 'UnpackValue(x)' is a prettier version of 'x.toString()'
            Console.WriteLine($"    Quasi-ID values: [{String.Join(',', bucketValue.QuasiIdsValues.Select(x => DlpSamplesUtils.UnpackValue(x)))}]");
            Console.WriteLine($"    Class size: {bucketValue.EquivalenceClassSize}");
        }
    }

    return 0;
}

Cloud DLP を使った l-多様性の計算

l-多様性k-匿名性を拡張したもので、機密値が発生する各列の多様性を追加的に測定します。同一の準識別子を持つ行のすべての集合で、各機密属性について少なくとも l 個の異なる値がある場合、そのデータセットは l-多様性を持っています。

1 つ以上の列(フィールド)の l-多様性の値を計算できます。まず、上記の Cloud DLP を使ったリスク分析の手順を行ってください。

LDiversityConfig オブジェクト内で、次の項目を指定します。

  • quasiIds[]: l-多様性の計算のために等価クラスがどのように定義されるかを示す 1 組の準識別子(FieldId オブジェクト)。KAnonymityConfig と同じように、複数のフィールドを指定すると、単一の複合キーとみなされます。
  • sensitiveAttribute: l-多様性の値を計算する機密フィールド(FieldId オブジェクト)。

コードの例

次に、Cloud DLP を使用して l-多様性の値を計算する方法を示すサンプルコードをいくつかの言語で示します。

Java

/**
 * Calculate l-diversity for an attribute relative to quasi-identifiers in a BigQuery table.
 *
 * @param projectId The Google Cloud Platform project ID to run the API call under.
 * @param datasetId The BigQuery dataset to analyze.
 * @param tableId The BigQuery table to analyze.
 * @param sensitiveAttribute The name of the attribute to compare the quasi-ID against
 * @param quasiIds A set of column names that form a composite key ('quasi-identifiers').
 * @param topicId The name of the Pub/Sub topic to notify once the job completes
 * @param subscriptionId The name of the Pub/Sub subscription to use when listening for job
 *     completion status.
 */
private static void calculateLDiversity(
    String projectId,
    String datasetId,
    String tableId,
    String sensitiveAttribute,
    List<String> quasiIds,
    String topicId,
    String subscriptionId)
    throws Exception {

  // Instantiates a client
  try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {

    FieldId sensitiveAttributeField = FieldId.newBuilder().setName(sensitiveAttribute).build();

    List<FieldId> quasiIdFields =
        quasiIds
            .stream()
            .map(columnName -> FieldId.newBuilder().setName(columnName).build())
            .collect(Collectors.toList());

    LDiversityConfig ldiversityConfig =
        LDiversityConfig.newBuilder()
            .addAllQuasiIds(quasiIdFields)
            .setSensitiveAttribute(sensitiveAttributeField)
            .build();

    BigQueryTable bigQueryTable =
        BigQueryTable.newBuilder()
            .setProjectId(projectId)
            .setDatasetId(datasetId)
            .setTableId(tableId)
            .build();

    PrivacyMetric privacyMetric =
        PrivacyMetric.newBuilder().setLDiversityConfig(ldiversityConfig).build();

    String topicName = String.format("projects/%s/topics/%s", projectId, topicId);

    PublishToPubSub publishToPubSub = PublishToPubSub.newBuilder().setTopic(topicName).build();

    // Create action to publish job status notifications over Google Cloud Pub/Sub
    Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

    RiskAnalysisJobConfig riskAnalysisJobConfig =
        RiskAnalysisJobConfig.newBuilder()
            .setSourceTable(bigQueryTable)
            .setPrivacyMetric(privacyMetric)
            .addActions(action)
            .build();

    CreateDlpJobRequest createDlpJobRequest =
        CreateDlpJobRequest.newBuilder()
            .setParent(ProjectName.of(projectId).toString())
            .setRiskJob(riskAnalysisJobConfig)
            .build();

    DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
    String dlpJobName = dlpJob.getName();

    final SettableApiFuture<Boolean> done = SettableApiFuture.create();

    // Set up a Pub/Sub subscriber to listen on the job completion status
    Subscriber subscriber =
        Subscriber.newBuilder(
                ProjectSubscriptionName.newBuilder()
                    .setProject(projectId)
                    .setSubscription(subscriptionId)
                    .build(),
          (pubsubMessage, ackReplyConsumer) -> {
            if (pubsubMessage.getAttributesCount() > 0
                && pubsubMessage.getAttributesMap().get("DlpJobName").equals(dlpJobName)) {
              // notify job completion
              done.set(true);
              ackReplyConsumer.ack();
            }
          })
            .build();
    subscriber.startAsync();

    // Wait for job completion semi-synchronously
    // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
    try {
      done.get(1, TimeUnit.MINUTES);
      Thread.sleep(500); // Wait for the job to become available
    } catch (TimeoutException e) {
      System.out.println("Unable to verify job completion.");
    }

    // retrieve completed job status
    DlpJob completedJob =
        dlpServiceClient.getDlpJob(GetDlpJobRequest.newBuilder().setName(dlpJobName).build());

    System.out.println("Job status: " + completedJob.getState());
    AnalyzeDataSourceRiskDetails riskDetails = completedJob.getRiskDetails();

    LDiversityResult ldiversityResult = riskDetails.getLDiversityResult();
    for (LDiversityHistogramBucket result :
        ldiversityResult.getSensitiveValueFrequencyHistogramBucketsList()) {
      for (LDiversityEquivalenceClass bucket : result.getBucketValuesList()) {
        List<String> quasiIdValues =
            bucket
                .getQuasiIdsValuesList()
                .stream()
                .map(Value::toString)
                .collect(Collectors.toList());

        System.out.println("\tQuasi-ID values: " + String.join(", ", quasiIdValues));
        System.out.println("\tClass size: " + bucket.getEquivalenceClassSize());

        for (ValueFrequency valueFrequency : bucket.getTopSensitiveValuesList()) {
          System.out.printf(
              "\t\tSensitive value %s occurs %d time(s).\n",
              valueFrequency.getValue().toString(), valueFrequency.getCount());
        }
      }
    }
  } catch (Exception e) {
    System.out.println("Error in calculateLDiversity: " + e.getMessage());
  }
}

Node.js

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const callingProjectId = process.env.GCLOUD_PROJECT;

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = process.env.GCLOUD_PROJECT;

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

// The column to measure l-diversity relative to, e.g. 'firstName'
// const sensitiveAttribute = 'name';

// A set of columns that form a composite key ('quasi-identifiers')
// const quasiIds = [{ name: 'age' }, { name: 'city' }];

const sourceTable = {
  projectId: tableProjectId,
  datasetId: datasetId,
  tableId: tableId,
};

// Construct request for creating a risk analysis job
const request = {
  parent: dlp.projectPath(callingProjectId),
  riskJob: {
    privacyMetric: {
      lDiversityConfig: {
        quasiIds: quasiIds,
        sensitiveAttribute: {
          name: sensitiveAttribute,
        },
      },
    },
    sourceTable: sourceTable,
    actions: [
      {
        pubSub: {
          topic: `projects/${callingProjectId}/topics/${topicId}`,
        },
      },
    ],
  },
};

// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];

try {
  // Run risk analysis job
  const [topicResponse] = await pubsub.topic(topicId).get();
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  setTimeout(() => {
    console.log(` Waiting for DLP job to fully complete`);
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  const histogramBuckets =
    job.riskDetails.lDiversityResult.sensitiveValueFrequencyHistogramBuckets;

  histogramBuckets.forEach((histogramBucket, histogramBucketIdx) => {
    console.log(`Bucket ${histogramBucketIdx}:`);

    console.log(
      `Bucket size range: [${
        histogramBucket.sensitiveValueFrequencyLowerBound
      }, ${histogramBucket.sensitiveValueFrequencyUpperBound}]`
    );
    histogramBucket.bucketValues.forEach(valueBucket => {
      const quasiIdValues = valueBucket.quasiIdsValues
        .map(getValue)
        .join(', ');
      console.log(`  Quasi-ID values: {${quasiIdValues}}`);
      console.log(`  Class size: ${valueBucket.equivalenceClassSize}`);
      valueBucket.topSensitiveValues.forEach(valueObj => {
        console.log(
          `    Sensitive value ${getValue(valueObj.value)} occurs ${
            valueObj.count
          } time(s).`
        );
      });
    });
  });
} catch (err) {
  console.log(`Error in lDiversityAnalysis: ${err.message || err}`);
}

Python

def l_diversity_analysis(project, table_project_id, dataset_id, table_id,
                         topic_id, subscription_id, sensitive_attribute,
                         quasi_ids, timeout=300):
    """Uses the Data Loss Prevention API to compute the l-diversity of a
        column set in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        sensitive_attribute: The column to measure l-diversity relative to.
        quasi_ids: A set of columns that form a composite key.
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # Create helper function for unpacking values
    def get_values(obj):
        return int(obj.integer_value)

    def callback(message):
        if (message.attributes['DlpJobName'] == operation.name):
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(operation.name)
            histogram_buckets = (
                job.risk_details
                   .l_diversity_result
                   .sensitive_value_frequency_histogram_buckets)
            # Print bucket stats
            for i, bucket in enumerate(histogram_buckets):
                print('Bucket {}:'.format(i))
                print('   Bucket size range: [{}, {}]'.format(
                    bucket.sensitive_value_frequency_lower_bound,
                    bucket.sensitive_value_frequency_upper_bound))
                for value_bucket in bucket.bucket_values:
                    print('   Quasi-ID values: {}'.format(
                        map(get_values, value_bucket.quasi_ids_values)))
                    print('   Class size: {}'.format(
                        value_bucket.equivalence_class_size))
                    for value in value_bucket.top_sensitive_values:
                        print(('   Sensitive value {} occurs {} time(s)'
                               .format(value.value, value.count)))
            subscription.set_result(None)
        else:
            # This is not the message we're looking for.
            message.drop()

    # Instantiate a client.
    dlp = google.cloud.dlp.DlpServiceClient()

    # Convert the project id into a full resource id.
    parent = dlp.project_path(project)

    # Location info of the BigQuery table.
    source_table = {
        'project_id': table_project_id,
        'dataset_id': dataset_id,
        'table_id': table_id
    }

    # Convert quasi id list to Protobuf type
    def map_fields(field):
        return {'name': field}

    quasi_ids = map(map_fields, quasi_ids)

    # Tell the API where to send a notification when the job is complete.
    actions = [{
        'pub_sub': {'topic': '{}/topics/{}'.format(parent, topic_id)}
    }]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        'privacy_metric': {
            'l_diversity_config': {
                'quasi_ids': quasi_ids,
                'sensitive_attribute': {
                    'name': sensitive_attribute
                }
            }
        },
        'source_table': source_table,
        'actions': actions
    }

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_id)
    subscription = subscriber.subscribe(subscription_path, callback)

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(parent, risk_job=risk_job)

    try:
        subscription.result(timeout=timeout)
    except TimeoutError:
        print('No event received before the timeout. Please verify that the '
              'subscription provided is subscribed to the topic provided.')
        subscription.close()

Go

// riskLDiversity computes the L Diversity of the given columns.
func riskLDiversity(w io.Writer, client *dlp.Client, project, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, sensitiveAttribute string, columnNames ...string) {
	ctx := context.Background()

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pClient, err := pubsub.NewClient(ctx, project)
	if err != nil {
		log.Fatalf("Error creating PubSub client: %v", err)
	}
	defer pClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(ctx, pClient, project, pubSubTopic, pubSubSub)
	if err != nil {
		log.Fatalf("Error setting up PubSub: %v\n", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + project + "/topics/" + pubSubTopic

	// Build the QuasiID slice.
	var q []*dlppb.FieldId
	for _, c := range columnNames {
		q = append(q, &dlppb.FieldId{Name: c})
	}

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: "projects/" + project,
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_LDiversityConfig_{
						LDiversityConfig: &dlppb.PrivacyMetric_LDiversityConfig{
							QuasiIds: q,
							SensitiveAttribute: &dlppb.FieldId{
								Name: sensitiveAttribute,
							},
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(context.Background(), req)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	ctx, cancel := context.WithCancel(ctx)
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		j, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			log.Fatalf("Error getting completed job: %v\n", err)
		}
		h := j.GetRiskDetails().GetLDiversityResult().GetSensitiveValueFrequencyHistogramBuckets()
		for i, b := range h {
			fmt.Fprintf(w, "Histogram bucket %v\n", i)
			fmt.Fprintf(w, "  Size range: [%v,%v]\n", b.GetSensitiveValueFrequencyLowerBound(), b.GetSensitiveValueFrequencyUpperBound())
			fmt.Fprintf(w, "  %v unique values total\n", b.GetBucketSize())
			for _, v := range b.GetBucketValues() {
				var qvs []string
				for _, qv := range v.GetQuasiIdsValues() {
					qvs = append(qvs, qv.String())
				}
				fmt.Fprintf(w, "    QuasiID values: %s\n", strings.Join(qvs, ", "))
				fmt.Fprintf(w, "    Class size: %v\n", v.GetEquivalenceClassSize())
				for _, sv := range v.GetTopSensitiveValues() {
					fmt.Fprintf(w, "    Sensitive value %v occurs %v times\n", sv.GetValue(), sv.GetCount())
				}
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		log.Fatalf("Error receiving from PubSub: %v\n", err)
	}
}

PHP

use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric\LDiversityConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Computes the l-diversity of a column set in a Google BigQuery table.
 *
 * @param string $callingProjectId The project ID to run the API call under
 * @param string $dataProjectId The project ID containing the target Datastore
 * @param string $topicId The name of the Pub/Sub topic to notify once the job completes
 * @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
 * @param string $datasetId The ID of the dataset to inspect
 * @param string $tableId The ID of the table to inspect
 * @param string $sensitiveAttribute The column to measure l-diversity relative to, e.g. 'firstName'
 * @param array $quasiIdNames A set of columns that form a composite key ('quasi-identifiers')

 */
function l_diversity(
    $callingProjectId,
    $dataProjectId,
    $topicId,
    $subscriptionId,
    $datasetId,
    $tableId,
    $sensitiveAttribute,
    $quasiIdNames
) {
    // Instantiate a client.
    $dlp = new DlpServiceClient([
        'projectId' => $callingProjectId,
    ]);
    $pubsub = new PubSubClient([
        'projectId' => $callingProjectId,
    ]);
    $topic = $pubsub->topic($topicId);

    // Construct risk analysis config
    $quasiIds = array_map(
        function ($id) {
            return (new FieldId())->setName($id);
        },
        $quasiIdNames
    );

    $sensitiveField = (new FieldId())
        ->setName($sensitiveAttribute);

    $statsConfig = (new LDiversityConfig())
        ->setQuasiIds($quasiIds)
        ->setSensitiveAttribute($sensitiveField);

    $privacyMetric = (new PrivacyMetric())
        ->setLDiversityConfig($statsConfig);

    // Construct items to be analyzed
    $bigqueryTable = (new BigQueryTable())
        ->setProjectId($dataProjectId)
        ->setDatasetId($datasetId)
        ->setTableId($tableId);

    // Construct the action to run when job completes
    $pubSubAction = (new PublishToPubSub())
        ->setTopic($topic->name());

    $action = (new Action())
        ->setPubSub($pubSubAction);

    // Construct risk analysis job config to run
    $riskJob = (new RiskAnalysisJobConfig())
        ->setPrivacyMetric($privacyMetric)
        ->setSourceTable($bigqueryTable)
        ->setActions([$action]);

    // Listen for job notifications via an existing topic/subscription.
    $subscription = $topic->subscription($subscriptionId);

    // Submit request
    $parent = $dlp->projectName($callingProjectId);
    $job = $dlp->createDlpJob($parent, [
        'riskJob' => $riskJob
    ]);

    // Poll via Pub/Sub until job finishes
    while (true) {
        foreach ($subscription->pull() as $message) {
            if (isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()) {
                $subscription->acknowledge($message);
                break 2;
            }
        }
    }

    // Sleep for one second to avoid race condition with the job's status.
    usleep(1000000);

    // Get the updated job
    $job = $dlp->getDlpJob($job->getName());

    // Helper function to convert Protobuf values to strings
    $value_to_string = function ($value) {
        $json = json_decode($value->serializeToJsonString(), true);
        return array_shift($json);
    };

    // Print finding counts
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), $job->getState());
    switch ($job->getState()) {
        case JobState::DONE:
            $histBuckets = $job->getRiskDetails()->getLDiversityResult()->getSensitiveValueFrequencyHistogramBuckets();

            foreach ($histBuckets as $bucketIndex => $histBucket) {
                // Print bucket stats
                printf('Bucket %s:' . PHP_EOL, $bucketIndex);
                printf(
                    '  Bucket size range: [%s, %s]' . PHP_EOL,
                    $histBucket->getSensitiveValueFrequencyLowerBound(),
                    $histBucket->getSensitiveValueFrequencyUpperBound()
                );

                // Print bucket values
                foreach ($histBucket->getBucketValues() as $percent => $valueBucket) {
                    printf(
                        '  Class size: %s' . PHP_EOL,
                        $valueBucket->getEquivalenceClassSize()
                    );

                    // Pretty-print quasi-ID values
                    print('  Quasi-ID values: {');
                    foreach ($valueBucket->getQuasiIdsValues() as $index => $value) {
                        print(($index !== 0 ? ', ' : '') . $value_to_string($value));
                    }
                    print('}' . PHP_EOL);

                    // Pretty-print sensitive values
                    $topValues = $valueBucket->getTopSensitiveValues();
                    foreach ($topValues as $topValue) {
                        printf(
                            '  Sensitive value %s occurs %s time(s).' . PHP_EOL,
                            $value_to_string($topValue->getValue()),
                            $topValue->getCount()
                        );
                    }
                }
            }
            break;
        case JobState::FAILED:
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            $errors = $job->getErrors();
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        default:
            printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
    }
}

C#

public static object LDiversity(
    string callingProjectId,
    string tableProjectId,
    string datasetId,
    string tableId,
    string topicId,
    string subscriptionId,
    IEnumerable<FieldId> quasiIds,
    string sensitiveAttribute)
{
    DlpServiceClient dlp = DlpServiceClient.Create();

    // Construct + submit the job
    var ldiversityConfig = new LDiversityConfig
    {
        SensitiveAttribute = new FieldId { Name = sensitiveAttribute },
        QuasiIds = { quasiIds }
    };

    var config = new RiskAnalysisJobConfig
    {
        PrivacyMetric = new PrivacyMetric
        {
            LDiversityConfig = ldiversityConfig
        },
        SourceTable = new BigQueryTable
        {
            ProjectId = tableProjectId,
            DatasetId = datasetId,
            TableId = tableId
        },
        Actions = {
            new Google.Cloud.Dlp.V2.Action
            {
                PubSub = new PublishToPubSub
                {
                    Topic = $"projects/{callingProjectId}/topics/{topicId}"
                }
            }
        }
    };

    var submittedJob = dlp.CreateDlpJob(
        new CreateDlpJobRequest
        {
            ParentAsProjectName = new ProjectName(callingProjectId),
            RiskJob = config
        });

    // Listen to pub/sub for the job
    var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
    SubscriberClient subscriber = SubscriberClient.CreateAsync(
        subscriptionName).Result;

    // SimpleSubscriber runs your message handle function on multiple
    // threads to maximize throughput.
    var done = new ManualResetEventSlim(false);
    subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
    {
        if (message.Attributes["DlpJobName"] == submittedJob.Name)
        {
            Thread.Sleep(500); // Wait for DLP API results to become consistent
            done.Set();
            return Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            return Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

    done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
    subscriber.StopAsync(CancellationToken.None).Wait();

    // Process results
    var resultJob = dlp.GetDlpJob(
        new GetDlpJobRequest
        {
            DlpJobName = DlpJobName.Parse(submittedJob.Name)
        });

    var result = resultJob.RiskDetails.LDiversityResult;

    for (int bucketIdx = 0; bucketIdx < result.SensitiveValueFrequencyHistogramBuckets.Count; bucketIdx++)
    {
        var bucket = result.SensitiveValueFrequencyHistogramBuckets[bucketIdx];
        Console.WriteLine($"Bucket {bucketIdx}");
        Console.WriteLine($"  Bucket size range: [{bucket.SensitiveValueFrequencyLowerBound}, {bucket.SensitiveValueFrequencyUpperBound}].");
        Console.WriteLine($"  {bucket.BucketSize} unique value(s) total.");

        foreach (var bucketValue in bucket.BucketValues)
        {
            // 'UnpackValue(x)' is a prettier version of 'x.toString()'
            Console.WriteLine($"    Quasi-ID values: [{String.Join(',', bucketValue.QuasiIdsValues.Select(x => DlpSamplesUtils.UnpackValue(x)))}]");
            Console.WriteLine($"    Class size: {bucketValue.EquivalenceClassSize}");

            foreach (var topValue in bucketValue.TopSensitiveValues)
            {
                Console.WriteLine($"    Sensitive value {DlpSamplesUtils.UnpackValue(topValue.Value)} occurs {topValue.Count} time(s).");
            }
        }
    }

    return 0;
}

Cloud DLP を使った k-マップ推定値の計算

Cloud DLP を使用して k-マップの値を推定できます。Cloud DLP は、統計モデルを使用して再識別データセットを推定します。これは、攻撃データセットが明示的に知られている他のリスク分析手法とは対照的です。Cloud DLP は、データの種類に応じて、一般に公開されているデータセット(米国国勢調査のデータセットなど)またはカスタム統計モデル(ユーザーが指定する 1 つ以上の BigQuery テーブルなど)を使用するか、ユーザーが入力したデータセット内の値の分布から推定します。

Cloud DLP を使用して k-マップ推定値を計算するには、まず、上記の Cloud DLP を使ったリスク分析の手順を行ってください。

KMapEstimationConfig オブジェクト内で、次の項目を指定します。

  • quasiIds[]: 必須。準識別子とみなされるフィールド(TaggedField オブジェクト)です。k-マップを計算するためにスキャンして使用されます。2 つの列に同じタグを付けることはできません。次のいずれかを指定できます。

    • infoType: この場合 Cloud DLP は、関連する一般公開データセットを母集団の統計モデルとして使用します(米国の郵便番号、地域コード、年齢、性別など)。
    • カスタム infoType: この列の有効な値に関する統計情報を含む補助テーブル(AuxiliaryTable オブジェクト)を示すカスタムタグ。
    • inferred タグ: 意味のあるタグが示されない場合は、inferred を指定します。Cloud DLP は入力データの値の分布から統計モデルを推定します。
  • regionCode: Cloud DLP が統計モデルで使用する ISO 3166-1 alpha-2 地域コード。この値は、列が地域固有の infoType(米国の郵便番号など)または地域コードでタグ付けされていない場合に必要です。

  • auxiliaryTables[]: 分析で使用する補助テーブル(AuxiliaryTable オブジェクト)。準識別子(quasiIds[])の列に付けるカスタムタグは、それぞれ 1 つの補助テーブルの 1 つの列でのみ使用する必要があります。

コードの例

次に、Cloud DLP を使用して k-マップの値を計算する方法を示すサンプルコードをいくつかの言語で示します。

Java

/**
 * Calculate k-map risk estimation for an attribute relative to quasi-identifiers in a BigQuery
 * table.
 *
 * @param projectId The Google Cloud Platform project ID to run the API call under.
 * @param datasetId The BigQuery dataset to analyze.
 * @param tableId The BigQuery table to analyze.
 * @param quasiIds A set of column names that form a composite key ('quasi-identifiers').
 * @param infoTypes The infoTypes corresponding to each quasi-id column
 * @param regionCode An ISO-3166-1 region code specifying the k-map distribution region
 * @param topicId The name of the Pub/Sub topic to notify once the job completes
 * @param subscriptionId The name of the Pub/Sub subscription to use when listening for job
 *     completion status.
 */
private static void calculateKMap(
    String projectId,
    String datasetId,
    String tableId,
    List<String> quasiIds,
    List<InfoType> infoTypes,
    String regionCode,
    String topicId,
    String subscriptionId)
    throws Exception {

  // Instantiates a client
  try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {

    Iterator<String> quasiIdsIterator = quasiIds.iterator();
    Iterator<InfoType> infoTypesIterator = infoTypes.iterator();

    if (quasiIds.size() != infoTypes.size()) {
      throw new IllegalArgumentException("The numbers of quasi-IDs and infoTypes must be equal!");
    }

    ArrayList<TaggedField> taggedFields = new ArrayList();

    while (quasiIdsIterator.hasNext() || infoTypesIterator.hasNext()) {
      taggedFields.add(
          TaggedField.newBuilder()
              .setField(FieldId.newBuilder().setName(quasiIdsIterator.next()).build())
              .setInfoType(infoTypesIterator.next())
              .build());
    }

    KMapEstimationConfig kmapConfig =
        KMapEstimationConfig.newBuilder()
            .addAllQuasiIds(taggedFields)
            .setRegionCode(regionCode)
            .build();

    BigQueryTable bigQueryTable =
        BigQueryTable.newBuilder()
            .setProjectId(projectId)
            .setDatasetId(datasetId)
            .setTableId(tableId)
            .build();

    PrivacyMetric privacyMetric =
        PrivacyMetric.newBuilder().setKMapEstimationConfig(kmapConfig).build();

    String topicName = String.format("projects/%s/topics/%s", projectId, topicId);

    PublishToPubSub publishToPubSub = PublishToPubSub.newBuilder().setTopic(topicName).build();

    // Create action to publish job status notifications over Google Cloud Pub/Sub
    Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

    RiskAnalysisJobConfig riskAnalysisJobConfig =
        RiskAnalysisJobConfig.newBuilder()
            .setSourceTable(bigQueryTable)
            .setPrivacyMetric(privacyMetric)
            .addActions(action)
            .build();

    CreateDlpJobRequest createDlpJobRequest =
        CreateDlpJobRequest.newBuilder()
            .setParent(ProjectName.of(projectId).toString())
            .setRiskJob(riskAnalysisJobConfig)
            .build();

    DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
    String dlpJobName = dlpJob.getName();

    final SettableApiFuture<Boolean> done = SettableApiFuture.create();

    // Set up a Pub/Sub subscriber to listen on the job completion status
    Subscriber subscriber =
        Subscriber.newBuilder(
                ProjectSubscriptionName.newBuilder()
                    .setProject(projectId)
                    .setSubscription(subscriptionId)
                    .build(),
          (pubsubMessage, ackReplyConsumer) -> {
            if (pubsubMessage.getAttributesCount() > 0
                && pubsubMessage.getAttributesMap().get("DlpJobName").equals(dlpJobName)) {
              // notify job completion
              done.set(true);
              ackReplyConsumer.ack();
            }
          })
            .build();
    subscriber.startAsync();

    // Wait for job completion semi-synchronously
    // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
    try {
      done.get(1, TimeUnit.MINUTES);
      Thread.sleep(500); // Wait for the job to become available
    } catch (TimeoutException e) {
      System.out.println("Unable to verify job completion.");
    }

    // retrieve completed job status
    DlpJob completedJob =
        dlpServiceClient.getDlpJob(GetDlpJobRequest.newBuilder().setName(dlpJobName).build());

    System.out.println("Job status: " + completedJob.getState());
    AnalyzeDataSourceRiskDetails riskDetails = completedJob.getRiskDetails();

    KMapEstimationResult kmapResult = riskDetails.getKMapEstimationResult();
    for (KMapEstimationHistogramBucket result : kmapResult.getKMapEstimationHistogramList()) {

      System.out.printf(
          "\tAnonymity range: [%d, %d]\n", result.getMinAnonymity(), result.getMaxAnonymity());
      System.out.printf("\tSize: %d\n", result.getBucketSize());

      for (KMapEstimationQuasiIdValues valueBucket : result.getBucketValuesList()) {
        String quasiIdValues =
            valueBucket
                .getQuasiIdsValuesList()
                .stream()
                .map(
                    v -> {
                      String s = v.toString();
                      return s.substring(s.indexOf(':') + 1).trim();
                    })
                .collect(Collectors.joining(", "));

        System.out.printf("\tValues: {%s}\n", quasiIdValues);
        System.out.printf(
            "\tEstimated k-map anonymity: %d\n", valueBucket.getEstimatedAnonymity());
      }
    }
  } catch (Exception e) {
    System.out.println("Error in calculateKMap: " + e.getMessage());
  }
}

Node.js

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const callingProjectId = process.env.GCLOUD_PROJECT;

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = process.env.GCLOUD_PROJECT;

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

// The ISO 3166-1 region code that the data is representative of
// Can be omitted if using a region-specific infoType (such as US_ZIP_5)
// const regionCode = 'USA';

// A set of columns that form a composite key ('quasi-identifiers'), and
// optionally their reidentification distributions
// const quasiIds = [{ field: { name: 'age' }, infoType: { name: 'AGE' }}];

const sourceTable = {
  projectId: tableProjectId,
  datasetId: datasetId,
  tableId: tableId,
};

// Construct request for creating a risk analysis job
const request = {
  parent: dlp.projectPath(process.env.GCLOUD_PROJECT),
  riskJob: {
    privacyMetric: {
      kMapEstimationConfig: {
        quasiIds: quasiIds,
        regionCode: regionCode,
      },
    },
    sourceTable: sourceTable,
    actions: [
      {
        pubSub: {
          topic: `projects/${callingProjectId}/topics/${topicId}`,
        },
      },
    ],
  },
};

// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];

try {
  // Run risk analysis job
  const [topicResponse] = await pubsub.topic(topicId).get();
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  setTimeout(() => {
    console.log(` Waiting for DLP job to fully complete`);
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  const histogramBuckets =
    job.riskDetails.kMapEstimationResult.kMapEstimationHistogram;

  histogramBuckets.forEach((histogramBucket, histogramBucketIdx) => {
    console.log(`Bucket ${histogramBucketIdx}:`);
    console.log(
      `  Anonymity range: [${histogramBucket.minAnonymity}, ${
        histogramBucket.maxAnonymity
      }]`
    );
    console.log(`  Size: ${histogramBucket.bucketSize}`);
    histogramBucket.bucketValues.forEach(valueBucket => {
      const values = valueBucket.quasiIdsValues.map(value => getValue(value));
      console.log(`    Values: ${values.join(' ')}`);
      console.log(
        `    Estimated k-map anonymity: ${valueBucket.estimatedAnonymity}`
      );
    });
  });
} catch (err) {
  console.log(`Error in kMapEstimationAnalysis: ${err.message || err}`);
}

Python

def k_map_estimate_analysis(project, table_project_id, dataset_id, table_id,
                            topic_id, subscription_id, quasi_ids, info_types,
                            region_code='US', timeout=300):
    """Uses the Data Loss Prevention API to compute the k-map risk estimation
        of a column set in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        column_name: The name of the column to compute risk metrics for.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        quasi_ids: A set of columns that form a composite key and optionally
            their reidentification distributions.
        info_types: Type of information of the quasi_id in order to provide a
            statistical model of population.
        region_code: The ISO 3166-1 region code that the data is representative
            of. Can be omitted if using a region-specific infoType (such as
            US_ZIP_5)
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # Create helper function for unpacking values
    def get_values(obj):
        return int(obj.integer_value)

    def callback(message):
        if (message.attributes['DlpJobName'] == operation.name):
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(operation.name)
            histogram_buckets = (job.risk_details
                                    .k_map_estimation_result
                                    .k_map_estimation_histogram)
            # Print bucket stats
            for i, bucket in enumerate(histogram_buckets):
                print('Bucket {}:'.format(i))
                print('   Anonymity range: [{}, {}]'.format(
                    bucket.min_anonymity, bucket.max_anonymity))
                print('   Size: {}'.format(bucket.bucket_size))
                for value_bucket in bucket.bucket_values:
                    print('   Values: {}'.format(
                        map(get_values, value_bucket.quasi_ids_values)))
                    print('   Estimated k-map anonymity: {}'.format(
                        value_bucket.estimated_anonymity))
            subscription.set_result(None)
        else:
            # This is not the message we're looking for.
            message.drop()

    # Instantiate a client.
    dlp = google.cloud.dlp.DlpServiceClient()

    # Convert the project id into a full resource id.
    parent = dlp.project_path(project)

    # Location info of the BigQuery table.
    source_table = {
        'project_id': table_project_id,
        'dataset_id': dataset_id,
        'table_id': table_id
    }

    # Check that numbers of quasi-ids and info types are equal
    if len(quasi_ids) != len(info_types):
        raise ValueError("""Number of infoTypes and number of quasi-identifiers
                            must be equal!""")

    # Convert quasi id list to Protobuf type
    def map_fields(quasi_id, info_type):
        return {'field': {'name': quasi_id}, 'info_type': {'name': info_type}}

    quasi_ids = map(map_fields, quasi_ids, info_types)

    # Tell the API where to send a notification when the job is complete.
    actions = [{
        'pub_sub': {'topic': '{}/topics/{}'.format(parent, topic_id)}
    }]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        'privacy_metric': {
            'k_map_estimation_config': {
                'quasi_ids': quasi_ids,
                'region_code': region_code
            }
        },
        'source_table': source_table,
        'actions': actions
    }

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_id)
    subscription = subscriber.subscribe(subscription_path, callback)

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(parent, risk_job=risk_job)

    try:
        subscription.result(timeout=timeout)
    except TimeoutError:
        print('No event received before the timeout. Please verify that the '
              'subscription provided is subscribed to the topic provided.')
        subscription.close()

Go

// riskKMap runs K Map on the given data.
func riskKMap(w io.Writer, client *dlp.Client, project, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, region string, columnNames ...string) {
	ctx := context.Background()

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pClient, err := pubsub.NewClient(ctx, project)
	if err != nil {
		log.Fatalf("Error creating PubSub client: %v", err)
	}
	defer pClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(ctx, pClient, project, pubSubTopic, pubSubSub)
	if err != nil {
		log.Fatalf("Error setting up PubSub: %v\n", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + project + "/topics/" + pubSubTopic

	// Build the QuasiID slice.
	var q []*dlppb.PrivacyMetric_KMapEstimationConfig_TaggedField
	for _, c := range columnNames {
		q = append(q, &dlppb.PrivacyMetric_KMapEstimationConfig_TaggedField{
			Field: &dlppb.FieldId{
				Name: c,
			},
			Tag: &dlppb.PrivacyMetric_KMapEstimationConfig_TaggedField_Inferred{
				Inferred: &empty.Empty{},
			},
		})
	}

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: "projects/" + project,
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_KMapEstimationConfig_{
						KMapEstimationConfig: &dlppb.PrivacyMetric_KMapEstimationConfig{
							QuasiIds:   q,
							RegionCode: region,
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(context.Background(), req)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	ctx, cancel := context.WithCancel(ctx)
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		j, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			log.Fatalf("Error getting completed job: %v\n", err)
		}
		h := j.GetRiskDetails().GetKMapEstimationResult().GetKMapEstimationHistogram()
		for i, b := range h {
			fmt.Fprintf(w, "Histogram bucket %v\n", i)
			fmt.Fprintf(w, "  Anonymity range: [%v,%v]\n", b.GetMaxAnonymity(), b.GetMaxAnonymity())
			fmt.Fprintf(w, "  %v unique values total\n", b.GetBucketSize())
			for _, v := range b.GetBucketValues() {
				var qvs []string
				for _, qv := range v.GetQuasiIdsValues() {
					qvs = append(qvs, qv.String())
				}
				fmt.Fprintf(w, "    QuasiID values: %s\n", strings.Join(qvs, ", "))
				fmt.Fprintf(w, "    Estimated anonymity: %v\n", v.GetEstimatedAnonymity())
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		log.Fatalf("Error receiving from PubSub: %v\n", err)
	}
}

PHP

use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric\KMapEstimationConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric\KMapEstimationConfig\TaggedField;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Computes the k-map risk estimation of a column set in a Google BigQuery table.
 *
 * @param string $callingProjectId The project ID to run the API call under
 * @param string $dataProjectId The project ID containing the target Datastore
 * @param string $topicId The name of the Pub/Sub topic to notify once the job completes
 * @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
 * @param string $datasetId The ID of the dataset to inspect
 * @param string $tableId The ID of the table to inspect
 * @param string $regionCode The ISO 3166-1 region code that the data is representative of
 * @param array $quasiIdNames A set of columns that form a composite key ('quasi-identifiers'),
 *        and optionally their reidentification distributions
 * @param array $infoTypes The infoTypes corresponding to the chosen quasi-identifiers

 */
function k_map(
  $callingProjectId,
  $dataProjectId,
  $topicId,
  $subscriptionId,
  $datasetId,
  $tableId,
  $regionCode,
  $quasiIdNames,
  $infoTypes
) {
    // Instantiate a client.
    $dlp = new DlpServiceClient([
        'projectId' => $callingProjectId,
    ]);
    $pubsub = new PubSubClient([
        'projectId' => $callingProjectId,
    ]);
    $topic = $pubsub->topic($topicId);

    // Verify input
    if (count($infoTypes) != count($quasiIdNames)) {
        throw new Exception('Number of infoTypes and number of quasi-identifiers must be equal!');
    }

    // Map infoTypes to quasi-ids
    $quasiIdObjects = array_map(function ($quasiId, $infoType) {
        $quasiIdField = (new FieldId())
            ->setName($quasiId);

        $quasiIdType = (new InfoType())
            ->setName($infoType);

        $quasiIdObject = (new TaggedField())
            ->setInfoType($quasiIdType)
            ->setField($quasiIdField);

        return $quasiIdObject;
    }, $quasiIdNames, $infoTypes);

    // Construct analysis config
    $statsConfig = (new KMapEstimationConfig())
        ->setQuasiIds($quasiIdObjects)
        ->setRegionCode($regionCode);

    $privacyMetric = (new PrivacyMetric())
        ->setKMapEstimationConfig($statsConfig);

    // Construct items to be analyzed
    $bigqueryTable = (new BigQueryTable())
        ->setProjectId($dataProjectId)
        ->setDatasetId($datasetId)
        ->setTableId($tableId);

    // Construct the action to run when job completes
    $pubSubAction = (new PublishToPubSub())
        ->setTopic($topic->name());

    $action = (new Action())
        ->setPubSub($pubSubAction);

    // Construct risk analysis job config to run
    $riskJob = (new RiskAnalysisJobConfig())
        ->setPrivacyMetric($privacyMetric)
        ->setSourceTable($bigqueryTable)
        ->setActions([$action]);

    // Listen for job notifications via an existing topic/subscription.
    $subscription = $topic->subscription($subscriptionId);

    // Submit request
    $parent = $dlp->projectName($callingProjectId);
    $job = $dlp->createDlpJob($parent, [
        'riskJob' => $riskJob
    ]);

    // Poll via Pub/Sub until job finishes
    while (true) {
        foreach ($subscription->pull() as $message) {
            if (isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()) {
                $subscription->acknowledge($message);
                break 2;
            }
        }
    }

    // Sleep for one second to avoid race condition with the job's status.
    usleep(1000000);

    // Get the updated job
    $job = $dlp->getDlpJob($job->getName());

    // Helper function to convert Protobuf values to strings
    $value_to_string = function ($value) {
        $json = json_decode($value->serializeToJsonString(), true);
        return array_shift($json);
    };

    // Print finding counts
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), $job->getState());
    switch ($job->getState()) {
        case JobState::DONE:
            $histBuckets = $job->getRiskDetails()->getKMapEstimationResult()->getKMapEstimationHistogram();

            foreach ($histBuckets as $bucketIndex => $histBucket) {
                // Print bucket stats
                printf('Bucket %s:' . PHP_EOL, $bucketIndex);
                printf(
                    '  Anonymity range: [%s, %s]' . PHP_EOL,
                    $histBucket->getMinAnonymity(),
                    $histBucket->getMaxAnonymity()
                );
                printf('  Size: %s' . PHP_EOL, $histBucket->getBucketSize());

                // Print bucket values
                foreach ($histBucket->getBucketValues() as $percent => $valueBucket) {
                    printf(
                        '  Estimated k-map anonymity: %s' . PHP_EOL,
                        $valueBucket->getEstimatedAnonymity()
                    );

                    // Pretty-print quasi-ID values
                    print('  Values: {');
                    foreach ($valueBucket->getQuasiIdsValues() as $index => $value) {
                        print(($index !== 0 ? ', ' : '') . $value_to_string($value));
                    }
                    print('}' . PHP_EOL);
                }
            }
            break;
        case JobState::FAILED:
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            $errors = $job->getErrors();
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        default:
            print('Unexpected job state. Most likely, the job is either running or has not yet started.');
    }
}

C#

public static object KMap(
    string callingProjectId,
    string tableProjectId,
    string datasetId,
    string tableId,
    string topicId,
    string subscriptionId,
    IEnumerable<FieldId> quasiIds,
    IEnumerable<InfoType> infoTypes,
    string regionCode)
{
    DlpServiceClient dlp = DlpServiceClient.Create();

    // Construct + submit the job
    var kmapEstimationConfig = new KMapEstimationConfig
    {
        QuasiIds =
        {
            quasiIds.Zip(
                infoTypes,
                (Field, InfoType) => new TaggedField
                {
                    Field = Field,
                    InfoType = InfoType
                }
            )
        },
        RegionCode = regionCode
    };

    var config = new RiskAnalysisJobConfig()
    {
        PrivacyMetric = new PrivacyMetric
        {
            KMapEstimationConfig = kmapEstimationConfig
        },
        SourceTable = new BigQueryTable
        {
            ProjectId = tableProjectId,
            DatasetId = datasetId,
            TableId = tableId
        },
        Actions =
        {
            new Google.Cloud.Dlp.V2.Action
            {
                PubSub = new PublishToPubSub
                {
                    Topic = $"projects/{callingProjectId}/topics/{topicId}"
                }
            }
        }
    };

    var submittedJob = dlp.CreateDlpJob(
        new CreateDlpJobRequest
        {
            ParentAsProjectName = new ProjectName(callingProjectId),
            RiskJob = config
        });

    // Listen to pub/sub for the job
    var subscriptionName = new SubscriptionName(
        callingProjectId,
        subscriptionId);
    SubscriberClient subscriber = SubscriberClient.CreateAsync(
        subscriptionName).Result;

    // SimpleSubscriber runs your message handle function on multiple
    // threads to maximize throughput.
    var done = new ManualResetEventSlim(false);
    subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
    {
        if (message.Attributes["DlpJobName"] == submittedJob.Name)
        {
            Thread.Sleep(500); // Wait for DLP API results to become consistent
            done.Set();
            return Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            return Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

    done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
    subscriber.StopAsync(CancellationToken.None).Wait();

    // Process results
    var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
    {
        DlpJobName = DlpJobName.Parse(submittedJob.Name)
    });

    var result = resultJob.RiskDetails.KMapEstimationResult;

    for (int histogramIdx = 0; histogramIdx < result.KMapEstimationHistogram.Count; histogramIdx++)
    {
        var histogramValue = result.KMapEstimationHistogram[histogramIdx];
        Console.WriteLine($"Bucket {histogramIdx}");
        Console.WriteLine($"  Anonymity range: [{histogramValue.MinAnonymity}, {histogramValue.MaxAnonymity}].");
        Console.WriteLine($"  Size: {histogramValue.BucketSize}");

        foreach (var datapoint in histogramValue.BucketValues)
        {
            // 'UnpackValue(x)' is a prettier version of 'x.toString()'
            Console.WriteLine($"    Values: [{String.Join(',', datapoint.QuasiIdsValues.Select(x => DlpSamplesUtils.UnpackValue(x)))}]");
            Console.WriteLine($"    Estimated k-map anonymity: {datapoint.EstimatedAnonymity}");
        }
    }

    return 0;
}

Cloud DLP を使った δ-存在性推定値の計算

デルタ存在性(δ-存在性)は、個人が分析対象のデータセットに含まれている確率を数値化する指標です。k-マップと同様に、δ-存在性の値も Cloud DLP を使用して推定できます。Cloud DLP は統計モデルを使用して攻撃データセットを推定します。これは、攻撃データセットが明示的に知られている他のリスク分析手法とは対照的です。Cloud DLP は、データの種類に応じて、一般に公開されているデータセット(米国国勢調査のデータセットなど)またはカスタム統計モデル(ユーザーが指定する 1 つ以上の BigQuery テーブルなど)を使用するか、ユーザーが入力したデータセット内の値の分布から推定します。

Cloud DLP を使用して δ-存在性の値を計算するには、まず上記の Cloud DLP を使ったリスク分析の手順を行ってください。

DeltaPresenceEstimationConfig オブジェクト内で、次の項目を指定します。

  • quasiIds[]: 必須。準識別子とみなされるフィールド(QuasiId オブジェクト)です。δ-存在性を計算するためにスキャンして使用されます。2 つの列に同じタグを付けることはできません。次のいずれかを指定できます。

    • infoType: この場合 Cloud DLP は、関連する一般公開データセットを母集団の統計モデルとして使用します(米国の郵便番号、地域コード、年齢、性別など)。
    • カスタム infoType: この列の有効な値に関する統計情報を含む補助テーブル(AuxiliaryTable オブジェクト)を示すカスタムタグ。
    • inferred タグ: 意味のあるタグが示されない場合は、inferred を指定します。Cloud DLP は入力データの値の分布から統計モデルを推定します。
  • regionCode: Cloud DLP が統計モデルで使用する ISO 3166-1 alpha-2 地域コード。この値は、列が地域固有の infoType(米国の郵便番号など)または地域コードでタグ付けされていない場合に必要です。

  • auxiliaryTables[]: 分析で使用する補助テーブル(StatisticalTable オブジェクト)。準識別子(quasiIds[])の列に付けるカスタムタグは、それぞれ 1 つの補助テーブルの 1 つの列でのみ使用する必要があります。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Data Loss Prevention