Como calcular estatísticas numéricas e categóricas

É possível usar o DLP (Cloud Data Loss Prevention) para calcular estatísticas numéricas e categóricas numéricas para colunas individuais em tabelas do BigQuery. O Cloud DLP pode calcular:

  • o valor mínimo da coluna
  • o valor máximo da coluna
  • os valores de quantil da coluna
  • um histograma de frequências de valor na coluna

Calcular estatísticas numéricas

É possível determinar valores mínimos, máximos e de quantil para colunas individuais do BigQuery. Para calcular esses valores, configure um DlpJob, definindo a métrica de privacidade NumericalStatsConfig como o nome da coluna a ser verificada. Quando você executa o job, o Cloud DLP calcula estatísticas para a coluna especificada, retornando seus resultados no objeto NumericalStatsResult. O Cloud DLP pode calcular estatísticas para os seguintes tipos de números:

  • número inteiro
  • flutuante
  • data
  • data/hora
  • carimbo de data/hora
  • horário

As estatísticas apresentadas pela verificação incluem o valor mínimo, o valor máximo e 99 valores de quantil que particionam o conjunto de valores de campo em 100 buckets de tamanhos iguais.

Exemplos de código

O exemplo de código a seguir demonstra, em várias linguagens, como usar o Cloud DLP para calcular estatísticas numéricas.

Java


import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.Action.PublishToPubSub;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.NumericalStatsResult;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.PrivacyMetric;
import com.google.privacy.dlp.v2.PrivacyMetric.NumericalStatsConfig;
import com.google.privacy.dlp.v2.ProjectName;
import com.google.privacy.dlp.v2.RiskAnalysisJobConfig;
import com.google.privacy.dlp.v2.Value;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class RiskAnalysisNumericalStats {

  public static void numericalStatsAnalysis() throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String datasetId = "your-bigquery-dataset-id";
    String tableId = "your-bigquery-table-id";
    String topicId = "pub-sub-topic";
    String subscriptionId = "pub-sub-subscription";
    numericalStatsAnalysis(projectId, datasetId, tableId, topicId, subscriptionId);
  }

  public static void numericalStatsAnalysis(
      String projectId, String datasetId, String tableId, String topicId, String subscriptionId)
      throws Exception {

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {

      // Specify the BigQuery table to analyze
      BigQueryTable bigQueryTable =
          BigQueryTable.newBuilder()
              .setTableId(tableId)
              .setDatasetId(datasetId)
              .setProjectId(projectId)
              .build();

      // This represents the name of the column to analyze, which must contain numerical data
      String columnName = "Age";

      // Configure the privacy metric for the job
      FieldId fieldId = FieldId.newBuilder().setName(columnName).build();
      NumericalStatsConfig numericalStatsConfig =
          NumericalStatsConfig.newBuilder().setField(fieldId).build();
      PrivacyMetric privacyMetric =
          PrivacyMetric.newBuilder().setNumericalStatsConfig(numericalStatsConfig).build();

      // Create action to publish job status notifications over Google Cloud Pub/Sub
      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      PublishToPubSub publishToPubSub =
          PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();
      Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

      // Configure the risk analysis job to perform
      RiskAnalysisJobConfig riskAnalysisJobConfig =
          RiskAnalysisJobConfig.newBuilder()
              .setSourceTable(bigQueryTable)
              .setPrivacyMetric(privacyMetric)
              .addActions(action)
              .build();

      CreateDlpJobRequest createDlpJobRequest =
          CreateDlpJobRequest.newBuilder()
              .setParent(ProjectName.of(projectId).toString())
              .setRiskJob(riskAnalysisJobConfig)
              .build();

      // Send the request to the API using the client
      DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);

      // Set up a Pub/Sub subscriber to listen on the job completion status
      final SettableApiFuture<Boolean> done = SettableApiFuture.create();

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      MessageReceiver messageHandler =
          (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
            handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
          };
      Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
      subscriber.startAsync();

      // Wait for job completion semi-synchronously
      // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
      try {
        done.get(1, TimeUnit.MINUTES);
        Thread.sleep(500); // Wait for the job to become available
      } catch (TimeoutException e) {
        System.out.println("Unable to verify job completion.");
      }

      // Build a request to get the completed job
      GetDlpJobRequest getDlpJobRequest =
          GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();

      // Retrieve completed job status
      DlpJob completedJob = dlpServiceClient.getDlpJob(getDlpJobRequest);
      System.out.println("Job status: " + completedJob.getState());

      // Get the result and parse through and process the information
      NumericalStatsResult result = completedJob.getRiskDetails().getNumericalStatsResult();

      System.out.printf(
          "Value range : [%.3f, %.3f]\n",
          result.getMinValue().getFloatValue(), result.getMaxValue().getFloatValue());

      int percent = 1;
      Double lastValue = null;
      for (Value quantileValue : result.getQuantileValuesList()) {
        Double currentValue = quantileValue.getFloatValue();
        if (lastValue == null || !lastValue.equals(currentValue)) {
          System.out.printf("Value at %s %% quantile : %.3f", percent, currentValue);
        }
        lastValue = currentValue;
      }
    }
  }

  // handleMessage injects the job and settableFuture into the message reciever interface
  private static void handleMessage(
      DlpJob job,
      SettableApiFuture<Boolean> done,
      PubsubMessage pubsubMessage,
      AckReplyConsumer ackReplyConsumer) {
    String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
    if (job.getName().equals(messageAttribute)) {
      done.set(true);
      ackReplyConsumer.ack();
    } else {
      ackReplyConsumer.nack();
    }
  }
}

Node.js

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const callingProjectId = process.env.GCLOUD_PROJECT;

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = process.env.GCLOUD_PROJECT;

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the column to compute risk metrics for, e.g. 'age'
// Note that this column must be a numeric data type
// const columnName = 'firstName';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

const sourceTable = {
  projectId: tableProjectId,
  datasetId: datasetId,
  tableId: tableId,
};

// Construct request for creating a risk analysis job
const request = {
  parent: dlp.projectPath(callingProjectId),
  riskJob: {
    privacyMetric: {
      numericalStatsConfig: {
        field: {
          name: columnName,
        },
      },
    },
    sourceTable: sourceTable,
    actions: [
      {
        pubSub: {
          topic: `projects/${callingProjectId}/topics/${topicId}`,
        },
      },
    ],
  },
};

// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];

try {
  // Run risk analysis job
  const [topicResponse] = await pubsub.topic(topicId).get();
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  setTimeout(() => {
    console.log(' Waiting for DLP job to fully complete');
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  const results = job.riskDetails.numericalStatsResult;

  console.log(
    `Value Range: [${getValue(results.minValue)}, ${getValue(
      results.maxValue
    )}]`
  );

  // Print unique quantile values
  let tempValue = null;
  results.quantileValues.forEach((result, percent) => {
    const value = getValue(result);

    // Only print new values
    if (
      tempValue !== value &&
      !(tempValue && tempValue.equals && tempValue.equals(value))
    ) {
      console.log(`Value at ${percent}% quantile: ${value}`);
      tempValue = value;
    }
  });
} catch (err) {
  console.log(`Error in numericalRiskAnalysis: ${err.message || err}`);
}

Python

def numerical_risk_analysis(
    project,
    table_project_id,
    dataset_id,
    table_id,
    column_name,
    topic_id,
    subscription_id,
    timeout=300,
):
    """Uses the Data Loss Prevention API to compute risk metrics of a column
       of numerical data in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        column_name: The name of the column to compute risk metrics for.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # Instantiate a client.
    dlp = google.cloud.dlp_v2.DlpServiceClient()

    # Convert the project id into a full resource id.
    parent = dlp.project_path(project)

    # Location info of the BigQuery table.
    source_table = {
        "project_id": table_project_id,
        "dataset_id": dataset_id,
        "table_id": table_id,
    }

    # Tell the API where to send a notification when the job is complete.
    actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        "privacy_metric": {
            "numerical_stats_config": {"field": {"name": column_name}}        },
        "source_table": source_table,
        "actions": actions,
    }

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(parent, risk_job=risk_job)

    def callback(message):
        if message.attributes["DlpJobName"] == operation.name:
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(operation.name)
            results = job.risk_details.numerical_stats_result
            print(
                "Value Range: [{}, {}]".format(
                    results.min_value.integer_value,
                    results.max_value.integer_value,
                )
            )
            prev_value = None
            for percent, result in enumerate(results.quantile_values):
                value = result.integer_value
                if prev_value != value:
                    print("Value at {}% quantile: {}".format(percent, value))
                    prev_value = value
            subscription.set_result(None)
        else:
            # This is not the message we're looking for.
            message.drop()

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_id)
    subscription = subscriber.subscribe(subscription_path, callback)

    try:
        subscription.result(timeout=timeout)
    except TimeoutError:
        print(
            "No event received before the timeout. Please verify that the "
            "subscription provided is subscribed to the topic provided."
        )
        subscription.close()

Go

import (
	"context"
	"fmt"
	"io"
	"time"

	dlp "cloud.google.com/go/dlp/apiv2"
	"cloud.google.com/go/pubsub"
	dlppb "google.golang.org/genproto/googleapis/privacy/dlp/v2"
)

// riskNumerical computes the numerical risk of the given column.
func riskNumerical(w io.Writer, projectID, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, columnName string) error {
	// projectID := "my-project-id"
	// dataProject := "bigquery-public-data"
	// pubSubTopic := "dlp-risk-sample-topic"
	// pubSubSub := "dlp-risk-sample-sub"
	// datasetID := "nhtsa_traffic_fatalities"
	// tableID := "accident_2015"
	// columnName := "state_number"
	ctx := context.Background()
	client, err := dlp.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("dlp.NewClient: %v", err)
	}
	// Create a PubSub Client used to listen for when the inspect job finishes.
	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("Error creating PubSub client: %v", err)
	}
	defer pubsubClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(projectID, pubSubTopic, pubSubSub)
	if err != nil {
		return fmt.Errorf("setupPubSub: %v", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + projectID + "/topics/" + pubSubTopic

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: "projects/" + projectID,
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_NumericalStatsConfig_{
						NumericalStatsConfig: &dlppb.PrivacyMetric_NumericalStatsConfig{
							Field: &dlppb.FieldId{
								Name: columnName,
							},
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(ctx, req)
	if err != nil {
		return fmt.Errorf("CreateDlpJob: %v", err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	// This only waits for 1 minute. For long jobs, consider using a truly
	// asynchronous execution model such as Cloud Functions.
	ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
	defer cancel()
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			fmt.Fprintf(w, "GetDlpJob: %v", err)
			return
		}
		n := resp.GetRiskDetails().GetNumericalStatsResult()
		fmt.Fprintf(w, "Value range: [%v, %v]\n", n.GetMinValue(), n.GetMaxValue())
		var tmp string
		for p, v := range n.GetQuantileValues() {
			if v.String() != tmp {
				fmt.Fprintf(w, "Value at %v quantile: %v\n", p, v)
				tmp = v.String()
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		return fmt.Errorf("Recieve: %v", err)
	}
	return nil
}

PHP

/**
 * Computes risk metrics of a column of numbers in a Google BigQuery table.
 */
use Google\Cloud\Core\ExponentialBackoff;
use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric\NumericalStatsConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\V1\PublisherClient;

/** Uncomment and populate these variables in your code */
// $callingProjectId = 'The project ID to run the API call under';
// $dataProjectId = 'The project ID containing the target Datastore';
// $topicId = 'The name of the Pub/Sub topic to notify once the job completes';
// $subscriptionId = 'The name of the Pub/Sub subscription to use when listening for job';
// $datasetId = 'The ID of the BigQuery dataset to inspect';
// $tableId = 'The ID of the BigQuery table to inspect';
// $columnName = 'The name of the column to compute risk metrics for, e.g. "age"';

// Instantiate a client.
$dlp = new DlpServiceClient([
    'projectId' => $callingProjectId
]);
$pubsub = new PubSubClient([
    'projectId' => $callingProjectId
]);

// Construct risk analysis config
$columnField = (new FieldId())
    ->setName($columnName);

$statsConfig = (new NumericalStatsConfig())
    ->setField($columnField);

$privacyMetric = (new PrivacyMetric())
    ->setNumericalStatsConfig($statsConfig);

// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
    ->setProjectId($dataProjectId)
    ->setDatasetId($datasetId)
    ->setTableId($tableId);

// Construct the action to run when job completes
$fullTopicId = PublisherClient::topicName($callingProjectId, $topicId);
$pubSubAction = (new PublishToPubSub())
    ->setTopic($fullTopicId);

$action = (new Action())
    ->setPubSub($pubSubAction);

// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
    ->setPrivacyMetric($privacyMetric)
    ->setSourceTable($bigqueryTable)
    ->setActions([$action]);

// Listen for job notifications via an existing topic/subscription.
$topic = $pubsub->topic($topicId);
$subscription = $topic->subscription($subscriptionId);

// Submit request
$parent = $dlp->projectName($callingProjectId);
$job = $dlp->createDlpJob($parent, [
    'riskJob' => $riskJob
]);

// Poll Pub/Sub using exponential backoff until job finishes
$backoff = new ExponentialBackoff(30);
$backoff->execute(function () use ($subscription, $dlp, &$job) {
    printf('Waiting for job to complete' . PHP_EOL);
    foreach ($subscription->pull() as $message) {
        if (isset($message->attributes()['DlpJobName']) &&
            $message->attributes()['DlpJobName'] === $job->getName()) {
            $subscription->acknowledge($message);
            // Get the updated job. Loop to avoid race condition with DLP API.
            do {
                $job = $dlp->getDlpJob($job->getName());
            } while ($job->getState() == JobState::RUNNING);
            return true;
        }
    }
    throw new Exception('Job has not yet completed');
});

// Helper function to convert Protobuf values to strings
$valueToString = function ($value) {
    $json = json_decode($value->serializeToJsonString(), true);
    return array_shift($json);
};

// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
    case JobState::DONE:
        $results = $job->getRiskDetails()->getNumericalStatsResult();
        printf(
            'Value range: [%s, %s]' . PHP_EOL,
            $valueToString($results->getMinValue()),
            $valueToString($results->getMaxValue())
        );

        // Only print unique values
        $lastValue = null;
        foreach ($results->getQuantileValues() as $percent => $quantileValue) {
            $value = $valueToString($quantileValue);
            if ($value != $lastValue) {
                printf('Value at %s quantile: %s' . PHP_EOL, $percent, $value);
                $lastValue = $value;
            }
        }

        break;
    case JobState::FAILED:
        printf('Job %s had errors:' . PHP_EOL, $job->getName());
        $errors = $job->getErrors();
        foreach ($errors as $error) {
            var_dump($error->getDetails());
        }
        break;
    default:
        print('Unexpected job state. Most likely, the job is either running or has not yet started.');
}

C#

public static object NumericalStats(
    string callingProjectId,
    string tableProjectId,
    string datasetId,
    string tableId,
    string topicId,
    string subscriptionId,
    string columnName)
{
    DlpServiceClient dlp = DlpServiceClient.Create();

    // Construct + submit the job
    var config = new RiskAnalysisJobConfig
    {
        PrivacyMetric = new PrivacyMetric
        {
            NumericalStatsConfig = new NumericalStatsConfig
            {
                Field = new FieldId { Name = columnName }
            }
        },
        SourceTable = new BigQueryTable
        {
            ProjectId = tableProjectId,
            DatasetId = datasetId,
            TableId = tableId
        },
        Actions =
        {
            new Google.Cloud.Dlp.V2.Action
            {
                PubSub = new PublishToPubSub
                {
                    Topic = $"projects/{callingProjectId}/topics/{topicId}"
                }
            }
        }
    };

    var submittedJob = dlp.CreateDlpJob(
        new CreateDlpJobRequest
        {
            ParentAsProjectName = new ProjectName(callingProjectId),
            RiskJob = config
        });

    // Listen to pub/sub for the job
    var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
    SubscriberClient subscriber = SubscriberClient.CreateAsync(
        subscriptionName).Result;

    // SimpleSubscriber runs your message handle function on multiple
    // threads to maximize throughput.
    var done = new ManualResetEventSlim(false);
    subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
    {
        if (message.Attributes["DlpJobName"] == submittedJob.Name)
        {
            Thread.Sleep(500); // Wait for DLP API results to become consistent
            done.Set();
            return Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            return Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

    done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
    subscriber.StopAsync(CancellationToken.None).Wait();

    // Process results
    var resultJob = dlp.GetDlpJob(
        new GetDlpJobRequest
        {
            DlpJobName = DlpJobName.Parse(submittedJob.Name)
        });

    var result = resultJob.RiskDetails.NumericalStatsResult;

    // 'UnpackValue(x)' is a prettier version of 'x.toString()'
    Console.WriteLine($"Value Range: [{DlpSamplesUtils.UnpackValue(result.MinValue)}, {DlpSamplesUtils.UnpackValue(result.MaxValue)}]");
    string lastValue = string.Empty;
    for (int quantile = 0; quantile < result.QuantileValues.Count; quantile++)
    {
        string currentValue = DlpSamplesUtils.UnpackValue(result.QuantileValues[quantile]);
        if (lastValue != currentValue)
        {
            Console.WriteLine($"Value at {quantile + 1}% quantile: {currentValue}");
        }
        lastValue = currentValue;
    }

    return 0;
}

Calcular estatísticas categóricas numéricas

É possível calcular estatísticas categóricas numéricas para os buckets de histograma individuais em uma coluna do BigQuery, incluindo:

  • limite superior na frequência de valor de um determinado bucket;
  • limite inferior na frequência de valor de um determinado bucket;
  • tamanho de um determinado bucket;
  • uma amostra de frequências de valor de um determinado bucket (máximo de 20).

Para calcular esses valores, configure um DlpJob, definindo a métrica de privacidade CategoricalStatsConfig como o nome da coluna a ser verificada. Quando você executa o job, o Cloud DLP calcula estatísticas para a coluna especificada, retornando seus resultados no objeto CategoricalStatsResult.

Exemplos de código

O exemplo de código a seguir demonstra, em várias linguagens, como usar o Cloud DLP para calcular estatísticas categóricas.

Java


import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.Action.PublishToPubSub;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.CategoricalStatsResult;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.CategoricalStatsResult.CategoricalStatsHistogramBucket;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.PrivacyMetric;
import com.google.privacy.dlp.v2.PrivacyMetric.CategoricalStatsConfig;
import com.google.privacy.dlp.v2.ProjectName;
import com.google.privacy.dlp.v2.RiskAnalysisJobConfig;
import com.google.privacy.dlp.v2.ValueFrequency;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class RiskAnalysisCategoricalStats {

  public static void categoricalStatsAnalysis() throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String datasetId = "your-bigquery-dataset-id";
    String tableId = "your-bigquery-table-id";
    String topicId = "pub-sub-topic";
    String subscriptionId = "pub-sub-subscription";
    categoricalStatsAnalysis(projectId, datasetId, tableId, topicId, subscriptionId);
  }

  public static void categoricalStatsAnalysis(
      String projectId, String datasetId, String tableId, String topicId, String subscriptionId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
      // Specify the BigQuery table to analyze
      BigQueryTable bigQueryTable =
          BigQueryTable.newBuilder()
              .setProjectId(projectId)
              .setDatasetId(datasetId)
              .setTableId(tableId)
              .build();

      // The name of the column to analyze, which doesn't need to contain numerical data
      String columnName = "Mystery";

      // Configure the privacy metric for the job
      FieldId fieldId = FieldId.newBuilder().setName(columnName).build();
      CategoricalStatsConfig categoricalStatsConfig =
          CategoricalStatsConfig.newBuilder().setField(fieldId).build();
      PrivacyMetric privacyMetric =
          PrivacyMetric.newBuilder().setCategoricalStatsConfig(categoricalStatsConfig).build();

      // Create action to publish job status notifications over Google Cloud Pub/Sub
      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      PublishToPubSub publishToPubSub =
          PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();
      Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

      // Configure the risk analysis job to perform
      RiskAnalysisJobConfig riskAnalysisJobConfig =
          RiskAnalysisJobConfig.newBuilder()
              .setSourceTable(bigQueryTable)
              .setPrivacyMetric(privacyMetric)
              .addActions(action)
              .build();

      // Build the job creation request to be sent by the client
      CreateDlpJobRequest createDlpJobRequest =
          CreateDlpJobRequest.newBuilder()
              .setParent(ProjectName.of(projectId).toString())
              .setRiskJob(riskAnalysisJobConfig)
              .build();

      // Send the request to the API using the client
      DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);

      // Set up a Pub/Sub subscriber to listen on the job completion status
      final SettableApiFuture<Boolean> done = SettableApiFuture.create();

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      MessageReceiver messageHandler =
          (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
            handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
          };
      Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
      subscriber.startAsync();

      // Wait for job completion semi-synchronously
      // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
      try {
        done.get(1, TimeUnit.MINUTES);
        Thread.sleep(500); // Wait for the job to become available
      } catch (TimeoutException e) {
        System.out.println("Unable to verify job completion.");
      }

      // Build a request to get the completed job
      GetDlpJobRequest getDlpJobRequest =
          GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();

      // Retrieve completed job status
      DlpJob completedJob = dlpServiceClient.getDlpJob(getDlpJobRequest);
      System.out.println("Job status: " + completedJob.getState());

      // Get the result and parse through and process the information
      CategoricalStatsResult result = completedJob.getRiskDetails().getCategoricalStatsResult();
      List<CategoricalStatsHistogramBucket> histogramBucketList =
          result.getValueFrequencyHistogramBucketsList();

      for (CategoricalStatsHistogramBucket bucket : histogramBucketList) {
        long mostCommonFrequency = bucket.getValueFrequencyUpperBound();
        System.out.printf("Most common value occurs %d time(s).\n", mostCommonFrequency);

        long leastCommonFrequency = bucket.getValueFrequencyLowerBound();
        System.out.printf("Least common value occurs %d time(s).\n", leastCommonFrequency);

        for (ValueFrequency valueFrequency : bucket.getBucketValuesList()) {
          System.out.printf(
              "Value %s occurs %d time(s).\n",
              valueFrequency.getValue().toString(), valueFrequency.getCount());
        }
      }
    }
  }

  // handleMessage injects the job and settableFuture into the message reciever interface
  private static void handleMessage(
      DlpJob job,
      SettableApiFuture<Boolean> done,
      PubsubMessage pubsubMessage,
      AckReplyConsumer ackReplyConsumer) {
    String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
    if (job.getName().equals(messageAttribute)) {
      done.set(true);
      ackReplyConsumer.ack();
    } else {
      ackReplyConsumer.nack();
    }
  }
}

Node.js

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const callingProjectId = process.env.GCLOUD_PROJECT;

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = process.env.GCLOUD_PROJECT;

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

// The name of the column to compute risk metrics for, e.g. 'firstName'
// const columnName = 'firstName';

const sourceTable = {
  projectId: tableProjectId,
  datasetId: datasetId,
  tableId: tableId,
};

// Construct request for creating a risk analysis job
const request = {
  parent: dlp.projectPath(callingProjectId),
  riskJob: {
    privacyMetric: {
      categoricalStatsConfig: {
        field: {
          name: columnName,
        },
      },
    },
    sourceTable: sourceTable,
    actions: [
      {
        pubSub: {
          topic: `projects/${callingProjectId}/topics/${topicId}`,
        },
      },
    ],
  },
};

// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];

try {
  // Run risk analysis job
  const [topicResponse] = await pubsub.topic(topicId).get();
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  setTimeout(() => {
    console.log(' Waiting for DLP job to fully complete');
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  const histogramBuckets =
    job.riskDetails.categoricalStatsResult.valueFrequencyHistogramBuckets;
  histogramBuckets.forEach((histogramBucket, histogramBucketIdx) => {
    console.log(`Bucket ${histogramBucketIdx}:`);

    // Print bucket stats
    console.log(
      `  Most common value occurs ${histogramBucket.valueFrequencyUpperBound} time(s)`
    );
    console.log(
      `  Least common value occurs ${histogramBucket.valueFrequencyLowerBound} time(s)`
    );

    // Print bucket values
    console.log(`${histogramBucket.bucketSize} unique values total.`);
    histogramBucket.bucketValues.forEach(valueBucket => {
      console.log(
        `  Value ${getValue(valueBucket.value)} occurs ${
          valueBucket.count
        } time(s).`
      );
    });
  });
} catch (err) {
  console.log(`Error in categoricalRiskAnalysis: ${err.message || err}`);
}

Python

def categorical_risk_analysis(
    project,
    table_project_id,
    dataset_id,
    table_id,
    column_name,
    topic_id,
    subscription_id,
    timeout=300,
):
    """Uses the Data Loss Prevention API to compute risk metrics of a column
       of categorical data in a Google BigQuery table.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        table_project_id: The Google Cloud project id where the BigQuery table
            is stored.
        dataset_id: The id of the dataset to inspect.
        table_id: The id of the table to inspect.
        column_name: The name of the column to compute risk metrics for.
        topic_id: The name of the Pub/Sub topic to notify once the job
            completes.
        subscription_id: The name of the Pub/Sub subscription to use when
            listening for job completion notifications.
        timeout: The number of seconds to wait for a response from the API.

    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # Instantiate a client.
    dlp = google.cloud.dlp_v2.DlpServiceClient()

    # Convert the project id into a full resource id.
    parent = dlp.project_path(project)

    # Location info of the BigQuery table.
    source_table = {
        "project_id": table_project_id,
        "dataset_id": dataset_id,
        "table_id": table_id,
    }

    # Tell the API where to send a notification when the job is complete.
    actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}]

    # Configure risk analysis job
    # Give the name of the numeric column to compute risk metrics for
    risk_job = {
        "privacy_metric": {
            "categorical_stats_config": {"field": {"name": column_name}}        },
        "source_table": source_table,
        "actions": actions,
    }

    # Call API to start risk analysis job
    operation = dlp.create_dlp_job(parent, risk_job=risk_job)

    def callback(message):
        if message.attributes["DlpJobName"] == operation.name:
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(operation.name)
            histogram_buckets = (
                job.risk_details.categorical_stats_result.value_frequency_histogram_buckets  # noqa: E501
            )
            # Print bucket stats
            for i, bucket in enumerate(histogram_buckets):
                print("Bucket {}:".format(i))
                print(
                    "   Most common value occurs {} time(s)".format(
                        bucket.value_frequency_upper_bound
                    )
                )
                print(
                    "   Least common value occurs {} time(s)".format(
                        bucket.value_frequency_lower_bound
                    )
                )
                print("   {} unique values total.".format(bucket.bucket_size))
                for value in bucket.bucket_values:
                    print(
                        "   Value {} occurs {} time(s)".format(
                            value.value.integer_value, value.count
                        )
                    )
            subscription.set_result(None)
        else:
            # This is not the message we're looking for.
            message.drop()

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_id)
    subscription = subscriber.subscribe(subscription_path, callback)

    try:
        subscription.result(timeout=timeout)
    except TimeoutError:
        print(
            "No event received before the timeout. Please verify that the "
            "subscription provided is subscribed to the topic provided."
        )
        subscription.close()

Go

import (
	"context"
	"fmt"
	"io"
	"time"

	dlp "cloud.google.com/go/dlp/apiv2"
	"cloud.google.com/go/pubsub"
	dlppb "google.golang.org/genproto/googleapis/privacy/dlp/v2"
)

// riskCategorical computes the categorical risk of the given data.
func riskCategorical(w io.Writer, projectID, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, columnName string) error {
	// projectID := "my-project-id"
	// dataProject := "bigquery-public-data"
	// pubSubTopic := "dlp-risk-sample-topic"
	// pubSubSub := "dlp-risk-sample-sub"
	// datasetID := "nhtsa_traffic_fatalities"
	// tableID := "accident_2015"
	// columnName := "state_number"
	ctx := context.Background()
	client, err := dlp.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("dlp.NewClient: %v", err)
	}
	// Create a PubSub Client used to listen for when the inspect job finishes.
	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("Error creating PubSub client: %v", err)
	}
	defer pubsubClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	s, err := setupPubSub(projectID, pubSubTopic, pubSubSub)
	if err != nil {
		return fmt.Errorf("setupPubSub: %v", err)
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + projectID + "/topics/" + pubSubTopic

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: "projects/" + projectID,
		Job: &dlppb.CreateDlpJobRequest_RiskJob{
			RiskJob: &dlppb.RiskAnalysisJobConfig{
				// PrivacyMetric configures what to compute.
				PrivacyMetric: &dlppb.PrivacyMetric{
					Type: &dlppb.PrivacyMetric_CategoricalStatsConfig_{
						CategoricalStatsConfig: &dlppb.PrivacyMetric_CategoricalStatsConfig{
							Field: &dlppb.FieldId{
								Name: columnName,
							},
						},
					},
				},
				// SourceTable describes where to find the data.
				SourceTable: &dlppb.BigQueryTable{
					ProjectId: dataProject,
					DatasetId: datasetID,
					TableId:   tableID,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the risk job.
	j, err := client.CreateDlpJob(ctx, req)
	if err != nil {
		return fmt.Errorf("CreateDlpJob: %v", err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the risk job to finish by waiting for a PubSub message.
	// This only waits for 1 minute. For long jobs, consider using a truly
	// asynchronous execution model such as Cloud Functions.
	ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
	defer cancel()
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()
		time.Sleep(500 * time.Millisecond)
		resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			fmt.Fprintf(w, "GetDlpJob: %v", err)
			return
		}
		h := resp.GetRiskDetails().GetCategoricalStatsResult().GetValueFrequencyHistogramBuckets()
		for i, b := range h {
			fmt.Fprintf(w, "Histogram bucket %v\n", i)
			fmt.Fprintf(w, "  Most common value occurs %v times\n", b.GetValueFrequencyUpperBound())
			fmt.Fprintf(w, "  Least common value occurs %v times\n", b.GetValueFrequencyLowerBound())
			fmt.Fprintf(w, "  %v unique values total\n", b.GetBucketSize())
			for _, v := range b.GetBucketValues() {
				fmt.Fprintf(w, "    Value %v occurs %v times\n", v.GetValue(), v.GetCount())
			}
		}
		// Stop listening for more messages.
		cancel()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

PHP

/**
 * Computes risk metrics of a column of data in a Google BigQuery table.
 */
use Google\Cloud\Core\ExponentialBackoff;
use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\PrivacyMetric\CategoricalStatsConfig;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\PubSub\PubSubClient;

/** Uncomment and populate these variables in your code */
// $callingProjectId = 'The project ID to run the API call under';
// $dataProjectId = 'The project ID containing the target Datastore';
// $topicId = 'The name of the Pub/Sub topic to notify once the job completes';
// $subscriptionId = 'The name of the Pub/Sub subscription to use when listening for job';
// $datasetId = 'The ID of the dataset to inspect';
// $tableId = 'The ID of the table to inspect';
// $columnName = 'The name of the column to compute risk metrics for, e.g. "age"';

// Instantiate a client.
$dlp = new DlpServiceClient([
    'projectId' => $callingProjectId,
]);
$pubsub = new PubSubClient([
    'projectId' => $callingProjectId,
]);
$topic = $pubsub->topic($topicId);

// Construct risk analysis config
$columnField = (new FieldId())
    ->setName($columnName);

$statsConfig = (new CategoricalStatsConfig())
    ->setField($columnField);

$privacyMetric = (new PrivacyMetric())
    ->setCategoricalStatsConfig($statsConfig);

// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
    ->setProjectId($dataProjectId)
    ->setDatasetId($datasetId)
    ->setTableId($tableId);

// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
    ->setTopic($topic->name());

$action = (new Action())
    ->setPubSub($pubSubAction);

// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
    ->setPrivacyMetric($privacyMetric)
    ->setSourceTable($bigqueryTable)
    ->setActions([$action]);

// Submit request
$parent = $dlp->projectName($callingProjectId);
$job = $dlp->createDlpJob($parent, [
    'riskJob' => $riskJob
]);

// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);

// Poll Pub/Sub using exponential backoff until job finishes
$backoff = new ExponentialBackoff(30);
$backoff->execute(function () use ($subscription, $dlp, &$job) {
    printf('Waiting for job to complete' . PHP_EOL);
    foreach ($subscription->pull() as $message) {
        if (isset($message->attributes()['DlpJobName']) &&
            $message->attributes()['DlpJobName'] === $job->getName()) {
            $subscription->acknowledge($message);
            // Get the updated job. Loop to avoid race condition with DLP API.
            do {
                $job = $dlp->getDlpJob($job->getName());
            } while ($job->getState() == JobState::RUNNING);
            return true;
        }
    }
    throw new Exception('Job has not yet completed');
});

// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
    case JobState::DONE:
        $histBuckets = $job->getRiskDetails()->getCategoricalStatsResult()->getValueFrequencyHistogramBuckets();

        foreach ($histBuckets as $bucketIndex => $histBucket) {
            // Print bucket stats
            printf('Bucket %s:' . PHP_EOL, $bucketIndex);
            printf('  Most common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyUpperBound());
            printf('  Least common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyLowerBound());
            printf('  %s unique value(s) total.', $histBucket->getBucketSize());

            // Print bucket values
            foreach ($histBucket->getBucketValues() as $percent => $quantile) {
                printf(
                    '  Value %s occurs %s time(s).' . PHP_EOL,
                    $quantile->getValue()->serializeToJsonString(),
                    $quantile->getCount()
                );
            }
        }

        break;
    case JobState::FAILED:
        $errors = $job->getErrors();
        printf('Job %s had errors:' . PHP_EOL, $job->getName());
        foreach ($errors as $error) {
            var_dump($error->getDetails());
        }
        break;
    default:
        printf('Unexpected job state.');
}

C#

public static object CategoricalStats(
    string callingProjectId,
    string tableProjectId,
    string datasetId,
    string tableId,
    string topicId,
    string subscriptionId,
    string columnName)
{
    DlpServiceClient dlp = DlpServiceClient.Create();

    // Construct + submit the job
    RiskAnalysisJobConfig config = new RiskAnalysisJobConfig
    {
        PrivacyMetric = new PrivacyMetric
        {
            CategoricalStatsConfig = new CategoricalStatsConfig()
            {
                Field = new FieldId { Name = columnName }
            }
        },
        SourceTable = new BigQueryTable
        {
            ProjectId = tableProjectId,
            DatasetId = datasetId,
            TableId = tableId
        },
        Actions = {
            new Google.Cloud.Dlp.V2.Action
            {
                PubSub = new PublishToPubSub
                {
                    Topic = $"projects/{callingProjectId}/topics/{topicId}"
                }
            }
        }
    };

    var submittedJob = dlp.CreateDlpJob(new CreateDlpJobRequest
    {
        ParentAsProjectName = new ProjectName(callingProjectId),
        RiskJob = config
    });

    // Listen to pub/sub for the job
    var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
    SubscriberClient subscriber = SubscriberClient.CreateAsync(
        subscriptionName).Result;

    // SimpleSubscriber runs your message handle function on multiple
    // threads to maximize throughput.
    var done = new ManualResetEventSlim(false);
    subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
    {
        if (message.Attributes["DlpJobName"] == submittedJob.Name)
        {
            Thread.Sleep(500); // Wait for DLP API results to become consistent
            done.Set();
            return Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            return Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

    done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
    subscriber.StopAsync(CancellationToken.None).Wait();

    // Process results
    var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
    {
        DlpJobName = DlpJobName.Parse(submittedJob.Name)
    });

    var result = resultJob.RiskDetails.CategoricalStatsResult;

    for (int bucketIdx = 0; bucketIdx < result.ValueFrequencyHistogramBuckets.Count; bucketIdx++)
    {
        var bucket = result.ValueFrequencyHistogramBuckets[bucketIdx];
        Console.WriteLine($"Bucket {bucketIdx}");
        Console.WriteLine($"  Most common value occurs {bucket.ValueFrequencyUpperBound} time(s).");
        Console.WriteLine($"  Least common value occurs {bucket.ValueFrequencyLowerBound} time(s).");
        Console.WriteLine($"  {bucket.BucketSize} unique value(s) total.");

        foreach (var bucketValue in bucket.BucketValues)
        {
            // 'UnpackValue(x)' is a prettier version of 'x.toString()'
            Console.WriteLine($"  Value {DlpSamplesUtils.UnpackValue(bucketValue.Value)} occurs {bucketValue.Count} time(s).");
        }
    }

    return 0;
}