Puedes usar la protección de datos sensibles a fin de calcular estadísticas numéricas y categóricas para columnas individuales en tablas de BigQuery. La protección de datos sensibles puede calcular lo siguiente:
- El valor mínimo de la columna
- El valor máximo de la columna
- Valores cuantiles de la columna
- Un histograma de frecuencias de valores en la columna
Calcula estadísticas numéricas
Puedes determinar los valores mínimo, máximo y cuantil para una columna individual de BigQuery. Si deseas calcular estos valores, configura una DlpJob
y establece la métrica de privacidad NumericalStatsConfig
en el nombre de la columna que se analizará. Cuando ejecutas el trabajo, la protección de datos sensibles calcula las estadísticas para la columna determinada y muestra sus resultados en el objeto NumericalStatsResult
. La protección de datos sensibles puede calcular estadísticas para los siguientes tipos de números:
- integer
- float
- fecha
- datetime
- marca de tiempo
- Hora
Las estadísticas que muestra el análisis incluyen el valor mínimo, el valor máximo y 99 valores cuantiles que dividen el conjunto de valores de campo en 100 buckets del mismo tamaño.
Ejemplos de código
A continuación, figura un código de muestra en varios lenguajes que muestra cómo usar la protección de datos sensibles para calcular estadísticas numéricas.
C#
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.Dlp.V2.Action.Types;
using static Google.Cloud.Dlp.V2.PrivacyMetric.Types;
public class RiskAnalysisCreateNumericalStats
{
public static AnalyzeDataSourceRiskDetails.Types.NumericalStatsResult NumericalStats(
string callingProjectId,
string tableProjectId,
string datasetId,
string tableId,
string topicId,
string subscriptionId,
string columnName)
{
var dlp = DlpServiceClient.Create();
// Construct + submit the job
var config = new RiskAnalysisJobConfig
{
PrivacyMetric = new PrivacyMetric
{
NumericalStatsConfig = new NumericalStatsConfig
{
Field = new FieldId { Name = columnName }
}
},
SourceTable = new BigQueryTable
{
ProjectId = tableProjectId,
DatasetId = datasetId,
TableId = tableId
},
Actions =
{
new Google.Cloud.Dlp.V2.Action
{
PubSub = new PublishToPubSub
{
Topic = $"projects/{callingProjectId}/topics/{topicId}"
}
}
}
};
var submittedJob = dlp.CreateDlpJob(
new CreateDlpJobRequest
{
ParentAsProjectName = new ProjectName(callingProjectId),
RiskJob = config
});
// Listen to pub/sub for the job
var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
var subscriber = SubscriberClient.CreateAsync(
subscriptionName).Result;
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
var done = new ManualResetEventSlim(false);
subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
if (message.Attributes["DlpJobName"] == submittedJob.Name)
{
Thread.Sleep(500); // Wait for DLP API results to become consistent
done.Set();
return Task.FromResult(SubscriberClient.Reply.Ack);
}
else
{
return Task.FromResult(SubscriberClient.Reply.Nack);
}
});
done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
subscriber.StopAsync(CancellationToken.None).Wait();
// Process results
var resultJob = dlp.GetDlpJob(
new GetDlpJobRequest
{
DlpJobName = DlpJobName.Parse(submittedJob.Name)
});
var result = resultJob.RiskDetails.NumericalStatsResult;
// 'UnpackValue(x)' is a prettier version of 'x.toString()'
Console.WriteLine($"Value Range: [{UnpackValue(result.MinValue)}, {UnpackValue(result.MaxValue)}]");
var lastValue = string.Empty;
for (var quantile = 0; quantile < result.QuantileValues.Count; quantile++)
{
var currentValue = UnpackValue(result.QuantileValues[quantile]);
if (lastValue != currentValue)
{
Console.WriteLine($"Value at {quantile + 1}% quantile: {currentValue}");
}
lastValue = currentValue;
}
return result;
}
public static string UnpackValue(Value protoValue)
{
var jsonValue = JsonConvert.DeserializeObject<Dictionary<string, object>>(protoValue.ToString());
return jsonValue.Values.ElementAt(0).ToString();
}
}
Go
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import (
"context"
"fmt"
"io"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// riskNumerical computes the numerical risk of the given column.
func riskNumerical(w io.Writer, projectID, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, columnName string) error {
// projectID := "my-project-id"
// dataProject := "bigquery-public-data"
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// datasetID := "nhtsa_traffic_fatalities"
// tableID := "accident_2015"
// columnName := "state_number"
ctx := context.Background()
client, err := dlp.NewClient(ctx)
if err != nil {
return fmt.Errorf("dlp.NewClient: %w", err)
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(pubSubTopic)
topicExists, err := t.Exists(ctx)
if err != nil {
return err
}
if !topicExists {
if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
return err
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(pubSubSub)
subExists, err := s.Exists(ctx)
if err != nil {
return err
}
if !subExists {
if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return err
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + pubSubTopic
// Create a configured request.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_RiskJob{
RiskJob: &dlppb.RiskAnalysisJobConfig{
// PrivacyMetric configures what to compute.
PrivacyMetric: &dlppb.PrivacyMetric{
Type: &dlppb.PrivacyMetric_NumericalStatsConfig_{
NumericalStatsConfig: &dlppb.PrivacyMetric_NumericalStatsConfig{
Field: &dlppb.FieldId{
Name: columnName,
},
},
},
},
// SourceTable describes where to find the data.
SourceTable: &dlppb.BigQueryTable{
ProjectId: dataProject,
DatasetId: datasetID,
TableId: tableID,
},
// Send a message to PubSub using Actions.
Actions: []*dlppb.Action{
{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
},
},
},
},
}
// Create the risk job.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return fmt.Errorf("CreateDlpJob: %w", err)
}
fmt.Fprintf(w, "Created job: %v\n", j.GetName())
// Wait for the risk job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
time.Sleep(500 * time.Millisecond)
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "GetDlpJob: %v", err)
return
}
n := resp.GetRiskDetails().GetNumericalStatsResult()
fmt.Fprintf(w, "Value range: [%v, %v]\n", n.GetMinValue(), n.GetMaxValue())
var tmp string
for p, v := range n.GetQuantileValues() {
if v.String() != tmp {
fmt.Fprintf(w, "Value at %v quantile: %v\n", p, v)
tmp = v.String()
}
}
// Stop listening for more messages.
cancel()
})
if err != nil {
return fmt.Errorf("Recieve: %w", err)
}
return nil
}
Java
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.Action.PublishToPubSub;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.NumericalStatsResult;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.PrivacyMetric;
import com.google.privacy.dlp.v2.PrivacyMetric.NumericalStatsConfig;
import com.google.privacy.dlp.v2.RiskAnalysisJobConfig;
import com.google.privacy.dlp.v2.Value;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class RiskAnalysisNumericalStats {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String datasetId = "your-bigquery-dataset-id";
String tableId = "your-bigquery-table-id";
String topicId = "pub-sub-topic";
String subscriptionId = "pub-sub-subscription";
numericalStatsAnalysis(projectId, datasetId, tableId, topicId, subscriptionId);
}
public static void numericalStatsAnalysis(
String projectId, String datasetId, String tableId, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
// Specify the BigQuery table to analyze
BigQueryTable bigQueryTable =
BigQueryTable.newBuilder()
.setTableId(tableId)
.setDatasetId(datasetId)
.setProjectId(projectId)
.build();
// This represents the name of the column to analyze, which must contain numerical data
String columnName = "Age";
// Configure the privacy metric for the job
FieldId fieldId = FieldId.newBuilder().setName(columnName).build();
NumericalStatsConfig numericalStatsConfig =
NumericalStatsConfig.newBuilder().setField(fieldId).build();
PrivacyMetric privacyMetric =
PrivacyMetric.newBuilder().setNumericalStatsConfig(numericalStatsConfig).build();
// Create action to publish job status notifications over Google Cloud Pub/Sub
ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
PublishToPubSub publishToPubSub =
PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the risk analysis job to perform
RiskAnalysisJobConfig riskAnalysisJobConfig =
RiskAnalysisJobConfig.newBuilder()
.setSourceTable(bigQueryTable)
.setPrivacyMetric(privacyMetric)
.addActions(action)
.build();
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setRiskJob(riskAnalysisJobConfig)
.build();
// Send the request to the API using the client
DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Build a request to get the completed job
GetDlpJobRequest getDlpJobRequest =
GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
// Retrieve completed job status
DlpJob completedJob = dlpServiceClient.getDlpJob(getDlpJobRequest);
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
// Get the result and parse through and process the information
NumericalStatsResult result = completedJob.getRiskDetails().getNumericalStatsResult();
System.out.printf(
"Value range : [%.3f, %.3f]\n",
result.getMinValue().getFloatValue(), result.getMaxValue().getFloatValue());
int percent = 1;
Double lastValue = null;
for (Value quantileValue : result.getQuantileValuesList()) {
Double currentValue = quantileValue.getFloatValue();
if (lastValue == null || !lastValue.equals(currentValue)) {
System.out.printf("Value at %s %% quantile : %.3f", percent, currentValue);
}
lastValue = currentValue;
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = 'my-project';
// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';
// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';
// The name of the column to compute risk metrics for, e.g. 'age'
// Note that this column must be a numeric data type
// const columnName = 'firstName';
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
async function numericalRiskAnalysis() {
const sourceTable = {
projectId: tableProjectId,
datasetId: datasetId,
tableId: tableId,
};
// Construct request for creating a risk analysis job
const request = {
parent: `projects/${projectId}/locations/global`,
riskJob: {
privacyMetric: {
numericalStatsConfig: {
field: {
name: columnName,
},
},
},
sourceTable: sourceTable,
actions: [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
],
},
};
// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];
// Run risk analysis job
const [topicResponse] = await pubsub.topic(topicId).get();
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
const jobName = jobsResponse.name;
console.log(`Job created. Job name: ${jobName}`);
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
setTimeout(() => {
console.log(' Waiting for DLP job to fully complete');
}, 500);
const [job] = await dlp.getDlpJob({name: jobName});
const results = job.riskDetails.numericalStatsResult;
console.log(
`Value Range: [${getValue(results.minValue)}, ${getValue(
results.maxValue
)}]`
);
// Print unique quantile values
let tempValue = null;
results.quantileValues.forEach((result, percent) => {
const value = getValue(result);
// Only print new values
if (
tempValue !== value &&
!(tempValue && tempValue.equals && tempValue.equals(value))
) {
console.log(`Value at ${percent}% quantile: ${value}`);
tempValue = value;
}
});
}
await numericalRiskAnalysis();
PHP
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\PrivacyMetric\NumericalStatsConfig;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Computes risk metrics of a column of numbers in a Google BigQuery table.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $dataProjectId The project ID containing the target Datastore
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $datasetId The ID of the BigQuery dataset to inspect
* @param string $tableId The ID of the BigQuery table to inspect
* @param string $columnName The name of the column to compute risk metrics for, e.g. "age"
*/
function numerical_stats(
string $callingProjectId,
string $dataProjectId,
string $topicId,
string $subscriptionId,
string $datasetId,
string $tableId,
string $columnName
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Construct risk analysis config
$columnField = (new FieldId())
->setName($columnName);
$statsConfig = (new NumericalStatsConfig())
->setField($columnField);
$privacyMetric = (new PrivacyMetric())
->setNumericalStatsConfig($statsConfig);
// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
->setProjectId($dataProjectId)
->setDatasetId($datasetId)
->setTableId($tableId);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
->setPrivacyMetric($privacyMetric)
->setSourceTable($bigqueryTable)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setRiskJob($riskJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Helper function to convert Protobuf values to strings
$valueToString = function ($value) {
$json = json_decode($value->serializeToJsonString(), true);
return array_shift($json);
};
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$results = $job->getRiskDetails()->getNumericalStatsResult();
printf(
'Value range: [%s, %s]' . PHP_EOL,
$valueToString($results->getMinValue()),
$valueToString($results->getMaxValue())
);
// Only print unique values
$lastValue = null;
foreach ($results->getQuantileValues() as $percent => $quantileValue) {
$value = $valueToString($quantileValue);
if ($value != $lastValue) {
printf('Value at %s quantile: %s' . PHP_EOL, $percent, $value);
$lastValue = $value;
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
Python
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import concurrent.futures
import google.cloud.dlp
import google.cloud.pubsub
def numerical_risk_analysis(
project: str,
table_project_id: str,
dataset_id: str,
table_id: str,
column_name: str,
topic_id: str,
subscription_id: str,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to compute risk metrics of a column
of numerical data in a Google BigQuery table.
Args:
project: The Google Cloud project id to use as a parent resource.
table_project_id: The Google Cloud project id where the BigQuery table
is stored.
dataset_id: The id of the dataset to inspect.
table_id: The id of the table to inspect.
column_name: The name of the column to compute risk metrics for.
topic_id: The name of the Pub/Sub topic to notify once the job
completes.
subscription_id: The name of the Pub/Sub subscription to use when
listening for job completion notifications.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Convert the project id into full resource ids.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
parent = f"projects/{project}/locations/global"
# Location info of the BigQuery table.
source_table = {
"project_id": table_project_id,
"dataset_id": dataset_id,
"table_id": table_id,
}
# Tell the API where to send a notification when the job is complete.
actions = [{"pub_sub": {"topic": topic}}]
# Configure risk analysis job
# Give the name of the numeric column to compute risk metrics for
risk_job = {
"privacy_metric": {"numerical_stats_config": {"field": {"name": column_name}}},
"source_table": source_table,
"actions": actions,
}
# Call API to start risk analysis job
operation = dlp.create_dlp_job(request={"parent": parent, "risk_job": risk_job})
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
results = job.risk_details.numerical_stats_result
print(
"Value Range: [{}, {}]".format(
results.min_value.integer_value,
results.max_value.integer_value,
)
)
prev_value = None
for percent, result in enumerate(results.quantile_values):
value = result.integer_value
if prev_value != value:
print(f"Value at {percent}% quantile: {value}")
prev_value = value
subscription.set_result(None)
else:
# This is not the message we're looking for.
message.drop()
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
subscription = subscriber.subscribe(subscription_path, callback)
try:
subscription.result(timeout=timeout)
except concurrent.futures.TimeoutError:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
subscription.close()
Calcula estadísticas numéricas categóricas
Puedes calcular estadísticas numéricas categóricas como las siguientes para los depósitos de histogramas individuales dentro de una columna de BigQuery:
- Límite superior de la frecuencia de valor dentro de un bucket dado
- Límite inferior de la frecuencia de valor dentro de un bucket dado
- Tamaño de un bucket dado
- Una muestra de frecuencias de valor dentro de un grupo dado (máximo: 20)
Si deseas calcular estos valores, configura una DlpJob
y establece la métrica de privacidad CategoricalStatsConfig
en el nombre de la columna que se analizará. Cuando ejecutas el trabajo, la protección de datos sensibles calcula las estadísticas para la columna determinada y muestra sus resultados en el objeto CategoricalStatsResult
.
Ejemplos de código
A continuación, figura un código de muestra en varios lenguajes que muestra cómo usar la protección de datos sensibles para calcular estadísticas categóricas.
C#
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.Dlp.V2.Action.Types;
using static Google.Cloud.Dlp.V2.PrivacyMetric.Types;
public class RiskAnalysisCreateCategoricalStats
{
public static AnalyzeDataSourceRiskDetails.Types.CategoricalStatsResult CategoricalStats(
string callingProjectId,
string tableProjectId,
string datasetId,
string tableId,
string topicId,
string subscriptionId,
string columnName)
{
var dlp = DlpServiceClient.Create();
// Construct + submit the job
var config = new RiskAnalysisJobConfig
{
PrivacyMetric = new PrivacyMetric
{
CategoricalStatsConfig = new CategoricalStatsConfig()
{
Field = new FieldId { Name = columnName }
}
},
SourceTable = new BigQueryTable
{
ProjectId = tableProjectId,
DatasetId = datasetId,
TableId = tableId
},
Actions =
{
new Google.Cloud.Dlp.V2.Action
{
PubSub = new PublishToPubSub
{
Topic = $"projects/{callingProjectId}/topics/{topicId}"
}
}
}
};
var submittedJob = dlp.CreateDlpJob(new CreateDlpJobRequest
{
ParentAsProjectName = new ProjectName(callingProjectId),
RiskJob = config
});
// Listen to pub/sub for the job
var subscriptionName = new SubscriptionName(callingProjectId, subscriptionId);
var subscriber = SubscriberClient.CreateAsync(
subscriptionName).Result;
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
var done = new ManualResetEventSlim(false);
subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
if (message.Attributes["DlpJobName"] == submittedJob.Name)
{
Thread.Sleep(500); // Wait for DLP API results to become consistent
done.Set();
return Task.FromResult(SubscriberClient.Reply.Ack);
}
else
{
return Task.FromResult(SubscriberClient.Reply.Nack);
}
});
done.Wait(TimeSpan.FromMinutes(10)); // 10 minute timeout; may not work for large jobs
subscriber.StopAsync(CancellationToken.None).Wait();
// Process results
var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
{
DlpJobName = DlpJobName.Parse(submittedJob.Name)
});
var result = resultJob.RiskDetails.CategoricalStatsResult;
for (var bucketIdx = 0; bucketIdx < result.ValueFrequencyHistogramBuckets.Count; bucketIdx++)
{
var bucket = result.ValueFrequencyHistogramBuckets[bucketIdx];
Console.WriteLine($"Bucket {bucketIdx}");
Console.WriteLine($" Most common value occurs {bucket.ValueFrequencyUpperBound} time(s).");
Console.WriteLine($" Least common value occurs {bucket.ValueFrequencyLowerBound} time(s).");
Console.WriteLine($" {bucket.BucketSize} unique value(s) total.");
foreach (var bucketValue in bucket.BucketValues)
{
// 'UnpackValue(x)' is a prettier version of 'x.toString()'
Console.WriteLine($" Value {UnpackValue(bucketValue.Value)} occurs {bucketValue.Count} time(s).");
}
}
return result;
}
public static string UnpackValue(Value protoValue)
{
var jsonValue = JsonConvert.DeserializeObject<Dictionary<string, object>>(protoValue.ToString());
return jsonValue.Values.ElementAt(0).ToString();
}
}
Go
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import (
"context"
"fmt"
"io"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// riskCategorical computes the categorical risk of the given data.
func riskCategorical(w io.Writer, projectID, dataProject, pubSubTopic, pubSubSub, datasetID, tableID, columnName string) error {
// projectID := "my-project-id"
// dataProject := "bigquery-public-data"
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// datasetID := "nhtsa_traffic_fatalities"
// tableID := "accident_2015"
// columnName := "state_number"
ctx := context.Background()
client, err := dlp.NewClient(ctx)
if err != nil {
return fmt.Errorf("dlp.NewClient: %w", err)
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(pubSubTopic)
topicExists, err := t.Exists(ctx)
if err != nil {
return err
}
if !topicExists {
if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
return err
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(pubSubSub)
subExists, err := s.Exists(ctx)
if err != nil {
return err
}
if !subExists {
if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return err
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + pubSubTopic
// Create a configured request.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_RiskJob{
RiskJob: &dlppb.RiskAnalysisJobConfig{
// PrivacyMetric configures what to compute.
PrivacyMetric: &dlppb.PrivacyMetric{
Type: &dlppb.PrivacyMetric_CategoricalStatsConfig_{
CategoricalStatsConfig: &dlppb.PrivacyMetric_CategoricalStatsConfig{
Field: &dlppb.FieldId{
Name: columnName,
},
},
},
},
// SourceTable describes where to find the data.
SourceTable: &dlppb.BigQueryTable{
ProjectId: dataProject,
DatasetId: datasetID,
TableId: tableID,
},
// Send a message to PubSub using Actions.
Actions: []*dlppb.Action{
{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
},
},
},
},
}
// Create the risk job.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return fmt.Errorf("CreateDlpJob: %w", err)
}
fmt.Fprintf(w, "Created job: %v\n", j.GetName())
// Wait for the risk job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
time.Sleep(500 * time.Millisecond)
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "GetDlpJob: %v", err)
return
}
h := resp.GetRiskDetails().GetCategoricalStatsResult().GetValueFrequencyHistogramBuckets()
for i, b := range h {
fmt.Fprintf(w, "Histogram bucket %v\n", i)
fmt.Fprintf(w, " Most common value occurs %v times\n", b.GetValueFrequencyUpperBound())
fmt.Fprintf(w, " Least common value occurs %v times\n", b.GetValueFrequencyLowerBound())
fmt.Fprintf(w, " %v unique values total\n", b.GetBucketSize())
for _, v := range b.GetBucketValues() {
fmt.Fprintf(w, " Value %v occurs %v times\n", v.GetValue(), v.GetCount())
}
}
// Stop listening for more messages.
cancel()
})
if err != nil {
return fmt.Errorf("Receive: %w", err)
}
return nil
}
Java
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.Action.PublishToPubSub;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.CategoricalStatsResult;
import com.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.CategoricalStatsResult.CategoricalStatsHistogramBucket;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.PrivacyMetric;
import com.google.privacy.dlp.v2.PrivacyMetric.CategoricalStatsConfig;
import com.google.privacy.dlp.v2.RiskAnalysisJobConfig;
import com.google.privacy.dlp.v2.ValueFrequency;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class RiskAnalysisCategoricalStats {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String datasetId = "your-bigquery-dataset-id";
String tableId = "your-bigquery-table-id";
String topicId = "pub-sub-topic";
String subscriptionId = "pub-sub-subscription";
categoricalStatsAnalysis(projectId, datasetId, tableId, topicId, subscriptionId);
}
public static void categoricalStatsAnalysis(
String projectId, String datasetId, String tableId, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
// Specify the BigQuery table to analyze
BigQueryTable bigQueryTable =
BigQueryTable.newBuilder()
.setProjectId(projectId)
.setDatasetId(datasetId)
.setTableId(tableId)
.build();
// The name of the column to analyze, which doesn't need to contain numerical data
String columnName = "Mystery";
// Configure the privacy metric for the job
FieldId fieldId = FieldId.newBuilder().setName(columnName).build();
CategoricalStatsConfig categoricalStatsConfig =
CategoricalStatsConfig.newBuilder().setField(fieldId).build();
PrivacyMetric privacyMetric =
PrivacyMetric.newBuilder().setCategoricalStatsConfig(categoricalStatsConfig).build();
// Create action to publish job status notifications over Google Cloud Pub/Sub
ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
PublishToPubSub publishToPubSub =
PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the risk analysis job to perform
RiskAnalysisJobConfig riskAnalysisJobConfig =
RiskAnalysisJobConfig.newBuilder()
.setSourceTable(bigQueryTable)
.setPrivacyMetric(privacyMetric)
.addActions(action)
.build();
// Build the job creation request to be sent by the client
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setRiskJob(riskAnalysisJobConfig)
.build();
// Send the request to the API using the client
DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Build a request to get the completed job
GetDlpJobRequest getDlpJobRequest =
GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
// Retrieve completed job status
DlpJob completedJob = dlpServiceClient.getDlpJob(getDlpJobRequest);
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
// Get the result and parse through and process the information
CategoricalStatsResult result = completedJob.getRiskDetails().getCategoricalStatsResult();
List<CategoricalStatsHistogramBucket> histogramBucketList =
result.getValueFrequencyHistogramBucketsList();
for (CategoricalStatsHistogramBucket bucket : histogramBucketList) {
long mostCommonFrequency = bucket.getValueFrequencyUpperBound();
System.out.printf("Most common value occurs %d time(s).\n", mostCommonFrequency);
long leastCommonFrequency = bucket.getValueFrequencyLowerBound();
System.out.printf("Least common value occurs %d time(s).\n", leastCommonFrequency);
for (ValueFrequency valueFrequency : bucket.getBucketValuesList()) {
System.out.printf(
"Value %s occurs %d time(s).\n",
valueFrequency.getValue().toString(), valueFrequency.getCount());
}
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = 'my-project';
// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';
// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
// The name of the column to compute risk metrics for, e.g. 'firstName'
// const columnName = 'firstName';
async function categoricalRiskAnalysis() {
const sourceTable = {
projectId: tableProjectId,
datasetId: datasetId,
tableId: tableId,
};
// Construct request for creating a risk analysis job
const request = {
parent: `projects/${projectId}/locations/global`,
riskJob: {
privacyMetric: {
categoricalStatsConfig: {
field: {
name: columnName,
},
},
},
sourceTable: sourceTable,
actions: [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
],
},
};
// Create helper function for unpacking values
const getValue = obj => obj[Object.keys(obj)[0]];
// Run risk analysis job
const [topicResponse] = await pubsub.topic(topicId).get();
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
const jobName = jobsResponse.name;
console.log(`Job created. Job name: ${jobName}`);
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
setTimeout(() => {
console.log(' Waiting for DLP job to fully complete');
}, 500);
const [job] = await dlp.getDlpJob({name: jobName});
const histogramBuckets =
job.riskDetails.categoricalStatsResult.valueFrequencyHistogramBuckets;
histogramBuckets.forEach((histogramBucket, histogramBucketIdx) => {
console.log(`Bucket ${histogramBucketIdx}:`);
// Print bucket stats
console.log(
` Most common value occurs ${histogramBucket.valueFrequencyUpperBound} time(s)`
);
console.log(
` Least common value occurs ${histogramBucket.valueFrequencyLowerBound} time(s)`
);
// Print bucket values
console.log(`${histogramBucket.bucketSize} unique values total.`);
histogramBucket.bucketValues.forEach(valueBucket => {
console.log(
` Value ${getValue(valueBucket.value)} occurs ${
valueBucket.count
} time(s).`
);
});
});
}
await categoricalRiskAnalysis();
PHP
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\PrivacyMetric\CategoricalStatsConfig;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Computes risk metrics of a column of data in a Google BigQuery table.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $dataProjectId The project ID containing the target Datastore
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $datasetId The ID of the dataset to inspect
* @param string $tableId The ID of the table to inspect
* @param string $columnName The name of the column to compute risk metrics for, e.g. "age"
*/
function categorical_stats(
string $callingProjectId,
string $dataProjectId,
string $topicId,
string $subscriptionId,
string $datasetId,
string $tableId,
string $columnName
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Construct risk analysis config
$columnField = (new FieldId())
->setName($columnName);
$statsConfig = (new CategoricalStatsConfig())
->setField($columnField);
$privacyMetric = (new PrivacyMetric())
->setCategoricalStatsConfig($statsConfig);
// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
->setProjectId($dataProjectId)
->setDatasetId($datasetId)
->setTableId($tableId);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
->setPrivacyMetric($privacyMetric)
->setSourceTable($bigqueryTable)
->setActions([$action]);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setRiskJob($riskJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$histBuckets = $job->getRiskDetails()->getCategoricalStatsResult()->getValueFrequencyHistogramBuckets();
foreach ($histBuckets as $bucketIndex => $histBucket) {
// Print bucket stats
printf('Bucket %s:' . PHP_EOL, $bucketIndex);
printf(' Most common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyUpperBound());
printf(' Least common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyLowerBound());
printf(' %s unique value(s) total.', $histBucket->getBucketSize());
// Print bucket values
foreach ($histBucket->getBucketValues() as $percent => $quantile) {
printf(
' Value %s occurs %s time(s).' . PHP_EOL,
$quantile->getValue()->serializeToJsonString(),
$quantile->getCount()
);
}
}
break;
case JobState::FAILED:
$errors = $job->getErrors();
printf('Job %s had errors:' . PHP_EOL, $job->getName());
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state.');
}
}
Python
Para obtener información sobre cómo instalar y usar la biblioteca cliente de la protección de datos sensibles, consulta Bibliotecas cliente de la protección de datos sensibles.
Para autenticarte en la protección de datos sensibles, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import concurrent.futures
import google.cloud.dlp
import google.cloud.pubsub
def categorical_risk_analysis(
project: str,
table_project_id: str,
dataset_id: str,
table_id: str,
column_name: str,
topic_id: str,
subscription_id: str,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to compute risk metrics of a column
of categorical data in a Google BigQuery table.
Args:
project: The Google Cloud project id to use as a parent resource.
table_project_id: The Google Cloud project id where the BigQuery table
is stored.
dataset_id: The id of the dataset to inspect.
table_id: The id of the table to inspect.
column_name: The name of the column to compute risk metrics for.
topic_id: The name of the Pub/Sub topic to notify once the job
completes.
subscription_id: The name of the Pub/Sub subscription to use when
listening for job completion notifications.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Convert the project id into full resource ids.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
parent = f"projects/{project}/locations/global"
# Location info of the BigQuery table.
source_table = {
"project_id": table_project_id,
"dataset_id": dataset_id,
"table_id": table_id,
}
# Tell the API where to send a notification when the job is complete.
actions = [{"pub_sub": {"topic": topic}}]
# Configure risk analysis job
# Give the name of the numeric column to compute risk metrics for
risk_job = {
"privacy_metric": {
"categorical_stats_config": {"field": {"name": column_name}}
},
"source_table": source_table,
"actions": actions,
}
# Call API to start risk analysis job
operation = dlp.create_dlp_job(request={"parent": parent, "risk_job": risk_job})
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
histogram_buckets = (
job.risk_details.categorical_stats_result.value_frequency_histogram_buckets # noqa: E501
)
# Print bucket stats
for i, bucket in enumerate(histogram_buckets):
print(f"Bucket {i}:")
print(
" Most common value occurs {} time(s)".format(
bucket.value_frequency_upper_bound
)
)
print(
" Least common value occurs {} time(s)".format(
bucket.value_frequency_lower_bound
)
)
print(f" {bucket.bucket_size} unique values total.")
for value in bucket.bucket_values:
print(
" Value {} occurs {} time(s)".format(
value.value.integer_value, value.count
)
)
subscription.set_result(None)
else:
# This is not the message we're looking for.
message.drop()
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
subscription = subscriber.subscribe(subscription_path, callback)
try:
subscription.result(timeout=timeout)
except concurrent.futures.TimeoutError:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
subscription.close()