Computing numerical and categorical statistics

You can use the Cloud Data Loss Prevention (DLP) API to compute numerical and categorical numerical statistics for individual columns in BigQuery tables. The DLP API can calculate the following:

  • The column's minimum value
  • The column's maximum value
  • Quantile values for the column
  • A histogram of value frequencies in the column

Compute numerical statistics

You can determine minimum, maximum, and quantile values for an individual BigQuery column. To calculate these values, you configure a DlpJob, setting the NumericalStatsConfig privacy metric to the name of the column to scan. When you run the job, the DLP API computes statistics for the given column, returning its results in the NumericalStatsResult object. The DLP API can compute statistics for the following number types:

  • integer
  • float
  • date
  • datetime
  • timestamp
  • time

The statistics that a scan run returns include the minimum value, the maximum value, and 99 quantile values that partition the set of field values into 100 equal sized buckets.

Code examples

Following is sample code in several languages that demonstrates how to use the DLP API to calculate numerical statistics.

Running this code requires a client library to be installed. For more information about installing and creating a DLP API client, see DLP API client libraries.

Java

/**
 * Calculate numerical statistics for a column in a BigQuery table using the DLP API.
 *
 * @param projectId The Google Cloud Platform project ID to run the API call under.
 * @param datasetId The BigQuery dataset to analyze.
 * @param tableId The BigQuery table to analyze.
 * @param columnName The name of the column to analyze, which must contain only numerical data.
 * @param topicId The name of the Pub/Sub topic to notify once the job completes
 * @param subscriptionId The name of the Pub/Sub subscription to use when listening for job
 *     completion status.
 */
private static void numericalStatsAnalysis(
    String projectId,
    String datasetId,
    String tableId,
    String columnName,
    String topicId,
    String subscriptionId)
    throws Exception {

  // Instantiates a client
  try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
    BigQueryTable bigQueryTable =
        BigQueryTable.newBuilder()
            .setTableId(tableId)
            .setDatasetId(datasetId)
            .setProjectId(projectId)
            .build();

    FieldId fieldId = FieldId.newBuilder().setName(columnName).build();

    NumericalStatsConfig numericalStatsConfig =
        NumericalStatsConfig.newBuilder().setField(fieldId).build();

    PrivacyMetric privacyMetric =
        PrivacyMetric.newBuilder().setNumericalStatsConfig(numericalStatsConfig).build();

    String topicName = String.format("projects/%s/topics/%s", projectId, topicId);

    PublishToPubSub publishToPubSub = PublishToPubSub.newBuilder().setTopic(topicName).build();

    // Create action to publish job status notifications over Google Cloud Pub/Sub
    Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

    RiskAnalysisJobConfig riskAnalysisJobConfig =
        RiskAnalysisJobConfig.newBuilder()
            .setSourceTable(bigQueryTable)
            .setPrivacyMetric(privacyMetric)
            .addActions(action)
            .build();

    CreateDlpJobRequest createDlpJobRequest =
        CreateDlpJobRequest.newBuilder()
            .setParent(ProjectName.of(projectId).toString())
            .setRiskJob(riskAnalysisJobConfig)
            .build();

    DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
    String dlpJobName = dlpJob.getName();

    final SettableApiFuture<Boolean> done = SettableApiFuture.create();

    // Set up a Pub/Sub subscriber to listen on the job completion status
    Subscriber subscriber =
        Subscriber.newBuilder(
                ProjectSubscriptionName.newBuilder()
                    .setProject(projectId)
                    .setSubscription(subscriptionId)
                    .build(),
          (pubsubMessage, ackReplyConsumer) -> {
            if (pubsubMessage.getAttributesCount() > 0
                && pubsubMessage.getAttributesMap().get("DlpJobName").equals(dlpJobName)) {
              // notify job completion
              done.set(true);
              ackReplyConsumer.ack();
            }
          })
            .build();
    subscriber.startAsync();

    // Wait for job completion semi-synchronously
    // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
    try {
      done.get(1, TimeUnit.MINUTES);
      Thread.sleep(500); // Wait for the job to become available
    } catch (TimeoutException e) {
      System.out.println("Unable to verify job completion.");
    }

    // Retrieve completed job status
    DlpJob completedJob =
        dlpServiceClient.getDlpJob(GetDlpJobRequest.newBuilder().setName(dlpJobName).build());

    System.out.println("Job status: " + completedJob.getState());
    AnalyzeDataSourceRiskDetails riskDetails = completedJob.getRiskDetails();
    AnalyzeDataSourceRiskDetails.NumericalStatsResult result =
        riskDetails.getNumericalStatsResult();

    System.out.printf(
        "Value range : [%.3f, %.3f]\n",
        result.getMinValue().getFloatValue(), result.getMaxValue().getFloatValue());

    int percent = 1;
    Double lastValue = null;
    for (Value quantileValue : result.getQuantileValuesList()) {
      Double currentValue = quantileValue.getFloatValue();
      if (lastValue == null || !lastValue.equals(currentValue)) {
        System.out.printf("Value at %s %% quantile : %.3f", percent, currentValue);
      }
      lastValue = currentValue;
    }
  } catch (Exception e) {
    System.out.println("Error in categoricalStatsAnalysis: " + e.getMessage());
  }
}

Node.js

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const Pubsub = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new Pubsub();

// The project ID to run the API call under
// const callingProjectId = process.env.GCLOUD_PROJECT;

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = process.env.GCLOUD_PROJECT;

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the column to compute risk metrics for, e.g. 'age'
// Note that this column must be a numeric data type
// const columnName = 'firstName';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

const sourceTable = {
  projectId: tableProjectId,
  datasetId: datasetId,
  tableId: tableId,
};

// Construct request for creating a risk analysis job
const request = {
  parent: dlp.projectPath(callingProjectId),
  riskJob: {
    privacyMetric: {
      numericalStatsConfig: {
        field: {
          name: columnName,
        },
      },
    },
    sourceTable: sourceTable,
    actions: [
      {
        pubSub: {
          topic: `projects/${callingProjectId}/topics/${topicId}`,
        },
      },
    ],
  },
};

// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];

// Run risk analysis job
let subscription;
pubsub
  .topic(topicId)
  .get()
  .then(topicResponse => {
    // Verify the Pub/Sub topic and listen for job notifications via an
    // existing subscription.
    return topicResponse[0].subscription(subscriptionId);
  })
  .then(subscriptionResponse => {
    subscription = subscriptionResponse;
    return dlp.createDlpJob(request);
  })
  .then(jobsResponse => {
    // Get the job's ID
    return jobsResponse[0].name;
  })
  .then(jobName => {
    // Watch the Pub/Sub topic until the DLP job finishes
    return new Promise((resolve, reject) => {
      const messageHandler = message => {
        if (message.attributes && message.attributes.DlpJobName === jobName) {
          message.ack();
          subscription.removeListener('message', messageHandler);
          subscription.removeListener('error', errorHandler);
          resolve(jobName);
        } else {
          message.nack();
        }
      };

      const errorHandler = err => {
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        reject(err);
      };

      subscription.on('message', messageHandler);
      subscription.on('error', errorHandler);
    });
  })
  .then(jobName => {
    // Wait for DLP job to fully complete
    return new Promise(resolve => setTimeout(resolve(jobName), 500));
  })
  .then(jobName => dlp.getDlpJob({name: jobName}))
  .then(wrappedJob => {
    const job = wrappedJob[0];
    const results = job.riskDetails.numericalStatsResult;

    console.log(
      `Value Range: [${getValue(results.minValue)}, ${getValue(
        results.maxValue
      )}]`
    );

    // Print unique quantile values
    let tempValue = null;
    results.quantileValues.forEach((result, percent) => {
      const value = getValue(result);

      // Only print new values
      if (
        tempValue !== value &&
        !(tempValue && tempValue.equals && tempValue.equals(value))
      ) {
        console.log(`Value at ${percent}% quantile: ${value}`);
        tempValue = value;
      }
    });
  })
  .catch(err => {
    console.log(`Error in numericalRiskAnalysis: ${err.message || err}`);
  });

Python

def numerical_risk_analysis(project, table_project_id, dataset_id, table_id,
                            column_name, topic_id, subscription_id,
                            timeout=300):
    """Uses the Data Loss Prevention API to compute risk metrics of a column
       of numerical data in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        column_name: The name of the column to compute risk metrics for.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # This sample also uses threading.Event() to wait for the job to finish.
    import threading

    # Instantiate a client.
    dlp = google.cloud.dlp.DlpServiceClient()

    # Convert the project id into a full resource id.
    parent = dlp.project_path(project)

    # Location info of the BigQuery table.
    source_table = {
        'project_id': table_project_id,
        'dataset_id': dataset_id,
        'table_id': table_id
    }

    # Tell the API where to send a notification when the job is complete.
    actions = [{
        'pub_sub': {'topic': '{}/topics/{}'.format(parent, topic_id)}
    }]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        'privacy_metric': {
            'numerical_stats_config': {
                'field': {
                    'name': column_name
                }
            }
        },
        'source_table': source_table,
        'actions': actions
    }

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(parent, risk_job=risk_job)

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_id)
    subscription = subscriber.subscribe(subscription_path)

    # Set up a callback to acknowledge a message. This closes around an event
    # so that it can signal that it is done and the main thread can continue.
    job_done = threading.Event()

    def callback(message):
        try:
            if (message.attributes['DlpJobName'] == operation.name):
                # This is the message we're looking for, so acknowledge it.
                message.ack()

                # Now that the job is done, fetch the results and print them.
                job = dlp.get_dlp_job(operation.name)
                results = job.risk_details.numerical_stats_result
                print('Value Range: [{}, {}]'.format(
                    results.min_value.integer_value,
                    results.max_value.integer_value))
                prev_value = None
                for percent, result in enumerate(results.quantile_values):
                    value = result.integer_value
                    if prev_value != value:
                        print('Value at {}% quantile: {}'.format(
                              percent, value))
                        prev_value = value
                # Signal to the main thread that we can exit.
                job_done.set()
            else:
                # This is not the message we're looking for.
                message.drop()
        except Exception as e:
            # Because this is executing in a thread, an exception won't be
            # noted unless we print it manually.
            print(e)
            raise

    # Register the callback and wait on the event.
    subscription.open(callback)
    finished = job_done.wait(timeout=timeout)
    if not finished:
        print('No event received before the timeout. Please verify that the '
              'subscription provided is subscribed to the topic provided.')

Go

// riskNumerical computes the numerical risk of the given column.
func riskNumerical(w io.Writer, client *dlp.Client, project, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, columnName string) {
	ctx := context.Background()

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pClient, err := pubsub.NewClient(ctx, project)
	if err != nil {
		log.Fatalf("Error creating PubSub client: %v", err)
	}
	defer pClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(ctx, pClient, project, pubSubTopic, pubSubSub)
	if err != nil {
		log.Fatalf("Error setting up PubSub: %v\n", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + project + "/topics/" + pubSubTopic

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: "projects/" + project,
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_NumericalStatsConfig_{
						NumericalStatsConfig: &dlppb.PrivacyMetric_NumericalStatsConfig{
							Field: &dlppb.FieldId{
								Name: columnName,
							},
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(context.Background(), req)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	ctx, cancel := context.WithCancel(ctx)
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			log.Fatalf("Error getting completed job: %v\n", err)
		}
		n := resp.GetRiskDetails().GetNumericalStatsResult()
		fmt.Fprintf(w, "Value range: [%v, %v]\n", n.GetMinValue(), n.GetMaxValue())
		var tmp string
		for p, v := range n.GetQuantileValues() {
			if v.String() != tmp {
				fmt.Fprintf(w, "Value at %v quantile: %v\n", p, v)
				tmp = v.String()
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		log.Fatalf("Error receiving from PubSub: %v\n", err)
	}
}

PHP

use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob_JobState;
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action_PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric_NumericalStatsConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\V1\PublisherClient;

/**
 * Computes risk metrics of a column of numbers in a Google BigQuery table.
 *
 * @param string $callingProjectId The project ID to run the API call under
 * @param string $dataProjectId The project ID containing the target Datastore
 * @param string $topicId The name of the Pub/Sub topic to notify once the job completes
 * @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
 * @param string $datasetId The ID of the dataset to inspect
 * @param string $tableId The ID of the table to inspect
 * @param string $columnName The name of the (number-type) column to compute risk metrics for, e.g. 'age'
 */
function numerical_stats(
    $callingProjectId,
    $dataProjectId,
    $topicId,
    $subscriptionId,
    $datasetId,
    $tableId,
    $columnName
) {
    // Instantiate a client.
    $dlp = new DlpServiceClient([
        'projectId' => $callingProjectId
    ]);
    $pubsub = new PubSubClient([
        'projectId' => $callingProjectId
    ]);

    // Construct risk analysis config
    $columnField = (new FieldId())
        ->setName($columnName);

    $statsConfig = (new PrivacyMetric_NumericalStatsConfig())
        ->setField($columnField);

    $privacyMetric = (new PrivacyMetric())
        ->setNumericalStatsConfig($statsConfig);

    // Construct items to be analyzed
    $bigqueryTable = (new BigQueryTable())
        ->setProjectId($dataProjectId)
        ->setDatasetId($datasetId)
        ->setTableId($tableId);

    // Construct the action to run when job completes
    $fullTopicId = PublisherClient::topicName($callingProjectId, $topicId);
    $pubSubAction = (new Action_PublishToPubSub())
        ->setTopic($fullTopicId);

    $action = (new Action())
        ->setPubSub($pubSubAction);

    // Construct risk analysis job config to run
    $riskJob = (new RiskAnalysisJobConfig())
        ->setPrivacyMetric($privacyMetric)
        ->setSourceTable($bigqueryTable)
        ->setActions([$action]);

    // Listen for job notifications via an existing topic/subscription.
    $topic = $pubsub->topic($topicId);
    $subscription = $topic->subscription($subscriptionId);

    // Submit request
    $parent = $dlp->projectName($callingProjectId);
    $job = $dlp->createDlpJob($parent, [
        'riskJob' => $riskJob
    ]);

    // Poll via Pub/Sub until job finishes
    $polling = true;
    while ($polling) {
        foreach ($subscription->pull() as $message) {
            if (isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()) {
                $subscription->acknowledge($message);
                $polling = false;
            }
        }
    }

    // Sleep for half a second to avoid race condition with the job's status.
    usleep(500000);

    // Get the updated job
    $job = $dlp->getDlpJob($job->getName());

    // Helper function to convert Protobuf values to strings
    $value_to_string = function ($value) {
        $json = json_decode($value->serializeToJsonString(), true);
        return array_shift($json);
    };

    // Print finding counts
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), $job->getState());
    switch ($job->getState()) {
        case DlpJob_JobState::DONE:
            $results = $job->getRiskDetails()->getNumericalStatsResult();
            printf(
                'Value range: [%s, %s]' . PHP_EOL,
                $value_to_string($results->getMinValue()),
                $value_to_string($results->getMaxValue())
            );

            // Only print unique values
            $lastValue = null;
            foreach ($results->getQuantileValues() as $percent => $quantileValue) {
                $value = $value_to_string($quantileValue);
                if ($value != $lastValue) {
                    printf('Value at %s quantile: %s' . PHP_EOL, $percent, $value);
                    $lastValue = $value;
                }
            }

            break;
        case DlpJob_JobState::FAILED:
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            $errors = $job->getErrors();
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        default:
            print('Unknown job state. Most likely, the job is either running or has not yet started.');
    }
}

C#

public static object NumericalStats(
    string callingProjectId,
    string tableProjectId,
    string datasetId,
    string tableId,
    string topicId,
    string subscriptionId,
    string columnName)
{
    DlpServiceClient dlp = DlpServiceClient.Create();

    // Construct + submit the job
    var config = new RiskAnalysisJobConfig
    {
        PrivacyMetric = new PrivacyMetric
        {
            NumericalStatsConfig = new NumericalStatsConfig
            {
                Field = new FieldId { Name = columnName }
            }
        },
        SourceTable = new BigQueryTable
        {
            ProjectId = tableProjectId,
            DatasetId = datasetId,
            TableId = tableId
        },
        Actions =
        {
            new Google.Cloud.Dlp.V2.Action
            {
                PubSub = new PublishToPubSub
                {
                    Topic = $"projects/{callingProjectId}/topics/{topicId}"
                }
            }
        }
    };

    var submittedJob = dlp.CreateDlpJob(
        new CreateDlpJobRequest
        {
            ParentAsProjectName = new Google.Cloud.Dlp.V2.ProjectName(callingProjectId),
            RiskJob = config
        });

    // Listen to pub/sub for the job
    var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
    SubscriberClient subscriber = SubscriberClient.Create(
        subscriptionName, new[] { SubscriberServiceApiClient.Create() });

    // SimpleSubscriber runs your message handle function on multiple
    // threads to maximize throughput.
    var done = new ManualResetEventSlim(false);
    subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
    {
        if (message.Attributes["DlpJobName"] == submittedJob.Name)
        {
            Thread.Sleep(500); // Wait for DLP API results to become consistent
            done.Set();
            return Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            return Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

    done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
    subscriber.StopAsync(CancellationToken.None).Wait();

    // Process results
    var resultJob = dlp.GetDlpJob(
        new GetDlpJobRequest
        {
            DlpJobName = DlpJobName.Parse(submittedJob.Name)
        });

    var result = resultJob.RiskDetails.NumericalStatsResult;

    // 'UnpackValue(x)' is a prettier version of 'x.toString()'
    Console.WriteLine($"Value Range: [{DlpSamplesUtils.UnpackValue(result.MinValue)}, {DlpSamplesUtils.UnpackValue(result.MaxValue)}]");
    string lastValue = string.Empty;
    for (int quantile = 0; quantile < result.QuantileValues.Count; quantile++)
    {
        string currentValue = DlpSamplesUtils.UnpackValue(result.QuantileValues[quantile]);
        if (lastValue != currentValue)
        {
            Console.WriteLine($"Value at {quantile + 1}% quantile: {currentValue}");
        }
        lastValue = currentValue;
    }

    return 0;
}

Compute categorical numerical statistics

You can compute categorical numerical statistics for the individual histogram buckets within a BigQuery column, including:

  • Upper bound on value frequency within a given bucket
  • Lower bound on value frequency within a given bucket
  • Size of a given bucket
  • A sample of value frequencies within a given bucket (maximum 20)

To calculate these values, you configure a DlpJob, setting the CategoricalStatsConfig privacy metric to the name of the column to scan. When you run the job, the DLP API computes statistics for the given column, returning its results in the CategoricalStatsResult object.

Code examples

Following is sample code in several languages that demonstrates how to use the DLP API to calculate categorical statistics.

Running this code requires a client library to be installed. For more information about installing and creating a DLP API client, see DLP API client libraries.

Java

/**
 * Calculate categorical statistics for a column in a BigQuery table using the DLP API.
 *
 * @param projectId The Google Cloud Platform project ID to run the API call under.
 * @param datasetId The BigQuery dataset to analyze.
 * @param tableId The BigQuery table to analyze.
 * @param columnName The name of the column to analyze, which need not contain numerical data.
 * @param topicId The name of the Pub/Sub topic to notify once the job completes
 * @param subscriptionId The name of the Pub/Sub subscription to use when listening for job
 *     completion status.
 */
private static void categoricalStatsAnalysis(
    String projectId,
    String datasetId,
    String tableId,
    String columnName,
    String topicId,
    String subscriptionId) {

  // Instantiates a client
  try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {

    FieldId fieldId = FieldId.newBuilder().setName(columnName).build();

    CategoricalStatsConfig categoricalStatsConfig =
        CategoricalStatsConfig.newBuilder().setField(fieldId).build();

    BigQueryTable bigQueryTable =
        BigQueryTable.newBuilder()
            .setProjectId(projectId)
            .setDatasetId(datasetId)
            .setTableId(tableId)
            .build();

    PrivacyMetric privacyMetric =
        PrivacyMetric.newBuilder().setCategoricalStatsConfig(categoricalStatsConfig).build();

    ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);

    PublishToPubSub publishToPubSub =
        PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();

    // Create action to publish job status notifications over Google Cloud Pub/Sub
    Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

    RiskAnalysisJobConfig riskAnalysisJobConfig =
        RiskAnalysisJobConfig.newBuilder()
            .setSourceTable(bigQueryTable)
            .setPrivacyMetric(privacyMetric)
            .addActions(action)
            .build();

    CreateDlpJobRequest createDlpJobRequest =
        CreateDlpJobRequest.newBuilder()
            .setParent(ProjectName.of(projectId).toString())
            .setRiskJob(riskAnalysisJobConfig)
            .build();

    DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
    String dlpJobName = dlpJob.getName();

    final SettableApiFuture<Boolean> done = SettableApiFuture.create();

    // Set up a Pub/Sub subscriber to listen on the job completion status
    Subscriber subscriber =
        Subscriber.newBuilder(
                ProjectSubscriptionName.newBuilder()
                    .setProject(projectId)
                    .setSubscription(subscriptionId)
                    .build(),
          (pubsubMessage, ackReplyConsumer) -> {
            if (pubsubMessage.getAttributesCount() > 0
                && pubsubMessage.getAttributesMap().get("DlpJobName").equals(dlpJobName)) {
              // notify job completion
              done.set(true);
              ackReplyConsumer.ack();
            }
          })
            .build();
    subscriber.startAsync();

    // Wait for job completion semi-synchronously
    // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
    try {
      done.get(1, TimeUnit.MINUTES);
      Thread.sleep(500); // Wait for the job to become available
    } catch (TimeoutException e) {
      System.out.println("Unable to verify job completion.");
    }

    // Retrieve completed job status
    DlpJob completedJob =
        dlpServiceClient.getDlpJob(GetDlpJobRequest.newBuilder().setName(dlpJobName).build());

    System.out.println("Job status: " + completedJob.getState());
    AnalyzeDataSourceRiskDetails riskDetails = completedJob.getRiskDetails();
    AnalyzeDataSourceRiskDetails.CategoricalStatsResult result =
        riskDetails.getCategoricalStatsResult();

    for (CategoricalStatsHistogramBucket bucket :
        result.getValueFrequencyHistogramBucketsList()) {
      System.out.printf(
          "Most common value occurs %d time(s).\n", bucket.getValueFrequencyUpperBound());
      System.out.printf(
          "Least common value occurs %d time(s).\n", bucket.getValueFrequencyLowerBound());
      for (ValueFrequency valueFrequency : bucket.getBucketValuesList()) {
        System.out.printf(
            "Value %s occurs %d time(s).\n",
            valueFrequency.getValue().toString(), valueFrequency.getCount());
      }
    }
  } catch (Exception e) {
    System.out.println("Error in categoricalStatsAnalysis: " + e.getMessage());
  }
}

Node.js

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const Pubsub = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new Pubsub();

// The project ID to run the API call under
// const callingProjectId = process.env.GCLOUD_PROJECT;

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = process.env.GCLOUD_PROJECT;

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

// The name of the column to compute risk metrics for, e.g. 'firstName'
// const columnName = 'firstName';

const sourceTable = {
  projectId: tableProjectId,
  datasetId: datasetId,
  tableId: tableId,
};

// Construct request for creating a risk analysis job
const request = {
  parent: dlp.projectPath(callingProjectId),
  riskJob: {
    privacyMetric: {
      categoricalStatsConfig: {
        field: {
          name: columnName,
        },
      },
    },
    sourceTable: sourceTable,
    actions: [
      {
        pubSub: {
          topic: `projects/${callingProjectId}/topics/${topicId}`,
        },
      },
    ],
  },
};

// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];

// Run risk analysis job
let subscription;
pubsub
  .topic(topicId)
  .get()
  .then(topicResponse => {
    // Verify the Pub/Sub topic and listen for job notifications via an
    // existing subscription.
    return topicResponse[0].subscription(subscriptionId);
  })
  .then(subscriptionResponse => {
    subscription = subscriptionResponse;
    return dlp.createDlpJob(request);
  })
  .then(jobsResponse => {
    // Get the job's ID
    return jobsResponse[0].name;
  })
  .then(jobName => {
    // Watch the Pub/Sub topic until the DLP job finishes
    return new Promise((resolve, reject) => {
      const messageHandler = message => {
        if (message.attributes && message.attributes.DlpJobName === jobName) {
          message.ack();
          subscription.removeListener('message', messageHandler);
          subscription.removeListener('error', errorHandler);
          resolve(jobName);
        } else {
          message.nack();
        }
      };

      const errorHandler = err => {
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        reject(err);
      };

      subscription.on('message', messageHandler);
      subscription.on('error', errorHandler);
    });
  })
  .then(jobName => {
    // Wait for DLP job to fully complete
    return new Promise(resolve => setTimeout(resolve(jobName), 500));
  })
  .then(jobName => dlp.getDlpJob({name: jobName}))
  .then(wrappedJob => {
    const job = wrappedJob[0];
    const histogramBuckets =
      job.riskDetails.categoricalStatsResult.valueFrequencyHistogramBuckets;
    histogramBuckets.forEach((histogramBucket, histogramBucketIdx) => {
      console.log(`Bucket ${histogramBucketIdx}:`);

      // Print bucket stats
      console.log(
        `  Most common value occurs ${
          histogramBucket.valueFrequencyUpperBound
        } time(s)`
      );
      console.log(
        `  Least common value occurs ${
          histogramBucket.valueFrequencyLowerBound
        } time(s)`
      );

      // Print bucket values
      console.log(`${histogramBucket.bucketSize} unique values total.`);
      histogramBucket.bucketValues.forEach(valueBucket => {
        console.log(
          `  Value ${getValue(valueBucket.value)} occurs ${
            valueBucket.count
          } time(s).`
        );
      });
    });
  })
  .catch(err => {
    console.log(`Error in categoricalRiskAnalysis: ${err.message || err}`);
  });

Python

def categorical_risk_analysis(project, table_project_id, dataset_id, table_id,
                              column_name, topic_id, subscription_id,
                              timeout=300):
    """Uses the Data Loss Prevention API to compute risk metrics of a column
       of categorical data in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        column_name: The name of the column to compute risk metrics for.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # This sample also uses threading.Event() to wait for the job to finish.
    import threading

    # Instantiate a client.
    dlp = google.cloud.dlp.DlpServiceClient()

    # Convert the project id into a full resource id.
    parent = dlp.project_path(project)

    # Location info of the BigQuery table.
    source_table = {
        'project_id': table_project_id,
        'dataset_id': dataset_id,
        'table_id': table_id
    }

    # Tell the API where to send a notification when the job is complete.
    actions = [{
        'pub_sub': {'topic': '{}/topics/{}'.format(parent, topic_id)}
    }]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        'privacy_metric': {
            'categorical_stats_config': {
                'field': {
                    'name': column_name
                }
            }
        },
        'source_table': source_table,
        'actions': actions
    }

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(parent, risk_job=risk_job)

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_id)
    subscription = subscriber.subscribe(subscription_path)

    # Set up a callback to acknowledge a message. This closes around an event
    # so that it can signal that it is done and the main thread can continue.
    job_done = threading.Event()

    def callback(message):
        try:
            if (message.attributes['DlpJobName'] == operation.name):
                # This is the message we're looking for, so acknowledge it.
                message.ack()

                # Now that the job is done, fetch the results and print them.
                job = dlp.get_dlp_job(operation.name)
                histogram_buckets = (job.risk_details
                                        .categorical_stats_result
                                        .value_frequency_histogram_buckets)
                # Print bucket stats
                for i, bucket in enumerate(histogram_buckets):
                    print('Bucket {}:'.format(i))
                    print('   Most common value occurs {} time(s)'.format(
                        bucket.value_frequency_upper_bound))
                    print('   Least common value occurs {} time(s)'.format(
                        bucket.value_frequency_lower_bound))
                    print('   {} unique values total.'.format(
                        bucket.bucket_size))
                    for value in bucket.bucket_values:
                        print('   Value {} occurs {} time(s)'.format(
                            value.value.integer_value, value.count))
                # Signal to the main thread that we can exit.
                job_done.set()
            else:
                # This is not the message we're looking for.
                message.drop()
        except Exception as e:
            # Because this is executing in a thread, an exception won't be
            # noted unless we print it manually.
            print(e)
            raise

    # Register the callback and wait on the event.
    subscription.open(callback)
    finished = job_done.wait(timeout=timeout)
    if not finished:
        print('No event received before the timeout. Please verify that the '
              'subscription provided is subscribed to the topic provided.')

Go

// riskCategorical computes the categorical risk of the given data.
func riskCategorical(w io.Writer, client *dlp.Client, project, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, columnName string) {
	ctx := context.Background()

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pClient, err := pubsub.NewClient(ctx, project)
	if err != nil {
		log.Fatalf("Error creating PubSub client: %v", err)
	}
	defer pClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(ctx, pClient, project, pubSubTopic, pubSubSub)
	if err != nil {
		log.Fatalf("Error setting up PubSub: %v\n", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + project + "/topics/" + pubSubTopic

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: "projects/" + project,
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_CategoricalStatsConfig_{
						CategoricalStatsConfig: &dlppb.PrivacyMetric_CategoricalStatsConfig{
							Field: &dlppb.FieldId{
								Name: columnName,
							},
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(context.Background(), req)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	ctx, cancel := context.WithCancel(ctx)
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			log.Fatalf("Error getting completed job: %v\n", err)
		}
		h := resp.GetRiskDetails().GetCategoricalStatsResult().GetValueFrequencyHistogramBuckets()
		for i, b := range h {
			fmt.Fprintf(w, "Histogram bucket %v\n", i)
			fmt.Fprintf(w, "  Most common value occurs %v times\n", b.GetValueFrequencyUpperBound())
			fmt.Fprintf(w, "  Least common value occurs %v times\n", b.GetValueFrequencyLowerBound())
			fmt.Fprintf(w, "  %v unique values total\n", b.GetBucketSize())
			for _, v := range b.GetBucketValues() {
				fmt.Fprintf(w, "    Value %v occurs %v times\n", v.GetValue(), v.GetCount())
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		log.Fatalf("Error receiving from PubSub: %v\n", err)
	}
}

PHP

use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob_JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action_PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric_CategoricalStatsConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Computes risk metrics of a column of data in a Google BigQuery table.
 *
 * @param string $callingProjectId The project ID to run the API call under
 * @param string $dataProjectId The project ID containing the target Datastore
 * @param string $topicId The name of the Pub/Sub topic to notify once the job completes
 * @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
 * @param string $datasetId The ID of the dataset to inspect
 * @param string $tableId The ID of the table to inspect
 * @param string $columnName The name of the column to compute risk metrics for, e.g. 'age'
 */
function categorical_stats(
    $callingProjectId,
    $dataProjectId,
    $topicId,
    $subscriptionId,
    $datasetId,
    $tableId,
    $columnName
) {
    // Instantiate a client.
    $dlp = new DlpServiceClient([
        'projectId' => $callingProjectId,
    ]);
    $pubsub = new PubSubClient([
        'projectId' => $callingProjectId,
    ]);
    $topic = $pubsub->topic($topicId);

    // Construct risk analysis config
    $columnField = (new FieldId())
        ->setName($columnName);

    $statsConfig = (new PrivacyMetric_CategoricalStatsConfig())
        ->setField($columnField);

    $privacyMetric = (new PrivacyMetric())
        ->setCategoricalStatsConfig($statsConfig);

    // Construct items to be analyzed
    $bigqueryTable = (new BigQueryTable())
        ->setProjectId($dataProjectId)
        ->setDatasetId($datasetId)
        ->setTableId($tableId);

    // Construct the action to run when job completes
    $pubSubAction = (new Action_PublishToPubSub())
        ->setTopic($topic->name());

    $action = (new Action())
        ->setPubSub($pubSubAction);

    // Construct risk analysis job config to run
    $riskJob = (new RiskAnalysisJobConfig())
        ->setPrivacyMetric($privacyMetric)
        ->setSourceTable($bigqueryTable)
        ->setActions([$action]);

    // Submit request
    $parent = $dlp->projectName($callingProjectId);
    $job = $dlp->createDlpJob($parent, [
        'riskJob' => $riskJob
    ]);

    // Listen for job notifications via an existing topic/subscription.
    $subscription = $topic->subscription($subscriptionId);

    // Poll via Pub/Sub until job finishes
    while (true) {
        foreach ($subscription->pull() as $message) {
            if (isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()) {
                $subscription->acknowledge($message);
                break 2;
            }
        }
    }

    // Sleep for half a second to avoid race condition with the job's status.
    usleep(500000);

    // Get the updated job
    $job = $dlp->getDlpJob($job->getName());

    // Helper function to convert Protobuf values to strings
    $value_to_string = function ($value) {
        $json = json_decode($value->serializeToJsonString(), true);
        return array_shift($json);
    };

    // Print finding counts
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), $job->getState());
    switch ($job->getState()) {
        case DlpJob_JobState::DONE:
            $histBuckets = $job->getRiskDetails()->getCategoricalStatsResult()->getValueFrequencyHistogramBuckets();

            foreach ($histBuckets as $bucketIndex => $histBucket) {
                // Print bucket stats
                printf('Bucket %s:' . PHP_EOL, $bucketIndex);
                printf('  Most common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyUpperBound());
                printf('  Least common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyLowerBound());
                printf('  %s unique value(s) total.', $histBucket->getBucketSize());

                // Print bucket values
                foreach ($histBucket->getBucketValues() as $percent => $quantile) {
                    printf(
                        '  Value %s occurs %s time(s).' . PHP_EOL,
                        $value_to_string($quantile->getValue()),
                        $quantile->getCount()
                    );
                }
            }

            break;
        case DlpJob_JobState::FAILED:
            $errors = $job->getErrors();
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        default:
            printf('Unknown job state. Most likely, the job is either running or has not yet started.');
    }
}

C#

public static object CategoricalStats(
    string callingProjectId,
    string tableProjectId,
    string datasetId,
    string tableId,
    string topicId,
    string subscriptionId,
    string columnName)
{
    DlpServiceClient dlp = DlpServiceClient.Create();

    // Construct + submit the job
    RiskAnalysisJobConfig config = new RiskAnalysisJobConfig
    {
        PrivacyMetric = new PrivacyMetric
        {
            CategoricalStatsConfig = new CategoricalStatsConfig()
            {
                Field = new FieldId { Name = columnName }
            }
        },
        SourceTable = new BigQueryTable
        {
            ProjectId = tableProjectId,
            DatasetId = datasetId,
            TableId = tableId
        },
        Actions = {
            new Google.Cloud.Dlp.V2.Action
            {
                PubSub = new PublishToPubSub
                {
                    Topic = $"projects/{callingProjectId}/topics/{topicId}"
                }
            }
        }
    };

    var submittedJob = dlp.CreateDlpJob(new CreateDlpJobRequest
    {
        ParentAsProjectName = new Google.Cloud.Dlp.V2.ProjectName(callingProjectId),
        RiskJob = config
    });

    // Listen to pub/sub for the job
    var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
    SubscriberClient subscriber = SubscriberClient.Create(
        subscriptionName, new[] { SubscriberServiceApiClient.Create() });

    // SimpleSubscriber runs your message handle function on multiple
    // threads to maximize throughput.
    var done = new ManualResetEventSlim(false);
    subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
    {
        if (message.Attributes["DlpJobName"] == submittedJob.Name)
        {
            Thread.Sleep(500); // Wait for DLP API results to become consistent
            done.Set();
            return Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            return Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

    done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
    subscriber.StopAsync(CancellationToken.None).Wait();

    // Process results
    var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
    {
        DlpJobName = DlpJobName.Parse(submittedJob.Name)
    });

    var result = resultJob.RiskDetails.CategoricalStatsResult;

    for (int bucketIdx = 0; bucketIdx < result.ValueFrequencyHistogramBuckets.Count; bucketIdx++)
    {
        var bucket = result.ValueFrequencyHistogramBuckets[bucketIdx];
        Console.WriteLine($"Bucket {bucketIdx}");
        Console.WriteLine($"  Most common value occurs {bucket.ValueFrequencyUpperBound} time(s).");
        Console.WriteLine($"  Least common value occurs {bucket.ValueFrequencyLowerBound} time(s).");
        Console.WriteLine($"  {bucket.BucketSize} unique value(s) total.");

        foreach (var bucketValue in bucket.BucketValues)
        {
            // 'UnpackValue(x)' is a prettier version of 'x.toString()'
            Console.WriteLine($"  Value {DlpSamplesUtils.UnpackValue(bucketValue.Value)} occurs {bucketValue.Count} time(s).");
        }
    }

    return 0;
}

Was this page helpful? Let us know how we did:

Send feedback about...

Data Loss Prevention API