以下示例演示了如何使用 Cloud Data Loss Prevention API 扫描 BigQuery 表格的 1000 行子集。从随机行开始扫描。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C#
如需了解如何安装和使用用于 Sensitive Data Protection 的客户端库,请参阅 Sensitive Data Protection 客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;
public class InspectBigQueryWithSampling
{
public static async Task<DlpJob> InspectAsync(
string projectId,
int maxFindings,
bool includeQuote,
string topicId,
string subId,
Likelihood minLikelihood = Likelihood.Possible,
IEnumerable<FieldId> identifyingFields = null,
IEnumerable<InfoType> infoTypes = null)
{
// Instantiate the dlp client.
var dlp = DlpServiceClient.Create();
// Construct Storage config.
var storageConfig = new StorageConfig
{
BigQueryOptions = new BigQueryOptions
{
TableReference = new BigQueryTable
{
ProjectId = "bigquery-public-data",
DatasetId = "usa_names",
TableId = "usa_1910_current",
},
IdentifyingFields =
{
identifyingFields ?? new FieldId[] { new FieldId { Name = "name" } }
},
RowsLimit = 100,
SampleMethod = BigQueryOptions.Types.SampleMethod.RandomStart
}
};
// Construct the inspect config.
var inspectConfig = new InspectConfig
{
InfoTypes = { infoTypes ?? new InfoType[] { new InfoType { Name = "PERSON_NAME" } } },
Limits = new FindingLimits
{
MaxFindingsPerRequest = maxFindings,
},
IncludeQuote = includeQuote,
MinLikelihood = minLikelihood
};
// Construct the pubsub action.
var actions = new Action[]
{
new Action
{
PubSub = new Action.Types.PublishToPubSub
{
Topic = $"projects/{projectId}/topics/{topicId}"
}
}
};
// Construct the inspect job config using the actions.
var inspectJob = new InspectJobConfig
{
StorageConfig = storageConfig,
InspectConfig = inspectConfig,
Actions = { actions }
};
// Issue Create Dlp Job Request.
var request = new CreateDlpJobRequest
{
InspectJob = inspectJob,
ParentAsLocationName = new LocationName(projectId, "global"),
};
// We keep the name of the job that we just created.
var dlpJob = dlp.CreateDlpJob(request);
var jobName = dlpJob.Name;
// Listen to pub/sub for the job.
var subscriptionName = new SubscriptionName(projectId, subId);
var subscriber = await SubscriberClient.CreateAsync(
subscriptionName);
// SimpleSubscriber runs your message handle function on multiple threads to maximize throughput.
await subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
if (message.Attributes["DlpJobName"] == jobName)
{
subscriber.StopAsync(cancel);
return Task.FromResult(SubscriberClient.Reply.Ack);
}
else
{
return Task.FromResult(SubscriberClient.Reply.Nack);
}
});
// Get the latest state of the job from the service.
var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
{
DlpJobName = DlpJobName.Parse(jobName)
});
// Parse the response and process results.
System.Console.WriteLine($"Job status: {resultJob.State}");
System.Console.WriteLine($"Job Name: {resultJob.Name}");
var result = resultJob.InspectDetails.Result;
foreach (var infoType in result.InfoTypeStats)
{
System.Console.WriteLine($"Info Type: {infoType.InfoType.Name}");
System.Console.WriteLine($"Count: {infoType.Count}");
}
return resultJob;
}
}
Go
如需了解如何安装和使用用于 Sensitive Data Protection 的客户端库,请参阅 Sensitive Data Protection 客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectBigQueryTableWithSampling inspect bigQueries for sensitive data with sampling
func inspectBigQueryTableWithSampling(w io.Writer, projectID, topicID, subscriptionID string) error {
// projectId := "your-project-id"
// topicID := "your-pubsub-topic-id"
// or provide a topicID name to create one
// subscriptionID := "your-pubsub-subscription-id"
// or provide a subscription name to create one
ctx := context.Background()
// Initialize a client once and reuse it to send multiple requests. Clients
// are safe to use across goroutines. When the client is no longer needed,
// call the Close method to cleanup its resources.
client, err := dlp.NewClient(ctx)
if err != nil {
return err
}
// Closing the client safely cleans up background resources.
defer client.Close()
// Specify the BigQuery table to be inspected.
tableReference := &dlppb.BigQueryTable{
ProjectId: "bigquery-public-data",
DatasetId: "usa_names",
TableId: "usa_1910_current",
}
bigQueryOptions := &dlppb.BigQueryOptions{
TableReference: tableReference,
RowsLimit: int64(10000),
SampleMethod: dlppb.BigQueryOptions_RANDOM_START,
IdentifyingFields: []*dlppb.FieldId{
{Name: "name"},
},
}
// Provide storage config with BigqueryOptions
storageConfig := &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_BigQueryOptions{
BigQueryOptions: bigQueryOptions,
},
}
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
infoTypes := []*dlppb.InfoType{
{Name: "PERSON_NAME"},
}
// Specify how the content should be inspected.
inspectConfig := &dlppb.InspectConfig{
InfoTypes: infoTypes,
IncludeQuote: true,
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(topicID)
if exists, err := t.Exists(ctx); err != nil {
return err
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, topicID); err != nil {
return err
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(subscriptionID)
if exists, err := s.Exists(ctx); err != nil {
return err
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return err
}
}
// topic is the PubSub topic string where messages should be sent.
topic := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID)
action := &dlppb.Action{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
}
// Configure the long running job we want the service to perform.
inspectJobConfig := &dlppb.InspectJobConfig{
StorageConfig: storageConfig,
InspectConfig: inspectConfig,
Actions: []*dlppb.Action{
action,
},
}
// Create the request for the job configured above.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: inspectJobConfig,
},
}
// Use the client to send the request.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return err
}
fmt.Fprintf(w, "Job Created: %v", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
c, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(c, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
})
if err != nil {
return err
}
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
return err
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
return err
}
for _, s := range r {
fmt.Fprintf(w, "\nFound %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
return nil
}
Java
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.BigQueryOptions;
import com.google.privacy.dlp.v2.BigQueryOptions.SampleMethod;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class InspectBigQueryTableWithSampling {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectBigQueryTableWithSampling(projectId, topicId, subscriptionId);
}
// Inspects a BigQuery Table
public static void inspectBigQueryTableWithSampling(
String projectId, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlp = DlpServiceClient.create()) {
// Specify the BigQuery table to be inspected.
BigQueryTable tableReference =
BigQueryTable.newBuilder()
.setProjectId("bigquery-public-data")
.setDatasetId("usa_names")
.setTableId("usa_1910_current")
.build();
BigQueryOptions bigQueryOptions =
BigQueryOptions.newBuilder()
.setTableReference(tableReference)
.setRowsLimit(1000)
.setSampleMethod(SampleMethod.RANDOM_START)
.addIdentifyingFields(FieldId.newBuilder().setName("name"))
.build();
StorageConfig storageConfig =
StorageConfig.newBuilder().setBigQueryOptions(bigQueryOptions).build();
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
InfoType infoType = InfoType.newBuilder().setName("PERSON_NAME").build();
// Specify how the content should be inspected.
InspectConfig inspectConfig =
InspectConfig.newBuilder().addInfoTypes(infoType).setIncludeQuote(true).build();
// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the long running job we want the service to perform.
InspectJobConfig inspectJobConfig =
InspectJobConfig.newBuilder()
.setStorageConfig(storageConfig)
.setInspectConfig(inspectConfig)
.addActions(action)
.build();
// Create the request for the job configured above.
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setInspectJob(inspectJobConfig)
.build();
// Use the client to send the request.
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);
// Parse the response and process results.
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
System.out.println("Findings: ");
for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
System.out.println("\tCount: " + infoTypeStat.getCount());
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
如需了解如何安装和使用用于 Sensitive Data Protection 的客户端库,请参阅 Sensitive Data Protection 客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const dataProjectId = 'my-project';
// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';
// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
// DLP Job max time (in milliseconds)
const DLP_JOB_WAIT_TIME = 15 * 1000 * 60;
async function inspectBigqueryWithSampling() {
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
const infoTypes = [{name: 'PERSON_NAME'}];
// Specify the BigQuery options required for inspection.
const storageItem = {
bigQueryOptions: {
tableReference: {
projectId: dataProjectId,
datasetId: datasetId,
tableId: tableId,
},
rowsLimit: 1000,
sampleMethod:
DLP.protos.google.privacy.dlp.v2.BigQueryOptions.SampleMethod
.RANDOM_START,
includedFields: [{name: 'name'}],
},
};
// Specify the action that is triggered when the job completes.
const actions = [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
];
// Construct request for creating an inspect job
const request = {
parent: `projects/${projectId}/locations/global`,
inspectJob: {
inspectConfig: {
infoTypes: infoTypes,
includeQuote: true,
},
storageConfig: storageItem,
actions: actions,
},
};
// Use the client to send the request.
const [topicResponse] = await pubsub.topic(topicId).get();
// Verify the Pub/Sub topic and listen for job notifications via an
// existing subscription.
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
const jobName = jobsResponse.name;
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
// Set up the timeout
const timer = setTimeout(() => {
reject(new Error('Timeout'));
}, DLP_JOB_WAIT_TIME);
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
clearTimeout(timer);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
clearTimeout(timer);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
const [job] = await dlp.getDlpJob({name: jobName});
console.log(`Job ${job.name} status: ${job.state}`);
const infoTypeStats = job.inspectDetails.result.infoTypeStats;
if (infoTypeStats.length > 0) {
infoTypeStats.forEach(infoTypeStat => {
console.log(
` Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
);
});
} else {
console.log('No findings.');
}
}
await inspectBigqueryWithSampling();
PHP
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\BigQueryOptions;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryOptions\SampleMethod;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Inspect BigQuery for sensitive data with sampling.
* The following examples demonstrate using the Cloud Data Loss Prevention
* API to scan a 1000-row subset of a BigQuery table. The scan starts from
* a random row.
*
* @param string $callingProjectId The project ID to run the API call under.
* @param string $topicId The Pub/Sub topic ID to notify once the job is completed.
* @param string $subscriptionId The Pub/Sub subscription ID to use when listening for job.
* @param string $projectId The Google Cloud Project ID.
* @param string $datasetId The BigQuery Dataset ID.
* @param string $tableId The BigQuery Table ID to be inspected.
*/
function inspect_bigquery_with_sampling(
string $callingProjectId,
string $topicId,
string $subscriptionId,
string $projectId,
string $datasetId,
string $tableId
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Specify the BigQuery table to be inspected.
$bigqueryTable = (new BigQueryTable())
->setProjectId($projectId)
->setDatasetId($datasetId)
->setTableId($tableId);
$bigQueryOptions = (new BigQueryOptions())
->setTableReference($bigqueryTable)
->setRowsLimit(1000)
->setSampleMethod(SampleMethod::RANDOM_START)
->setIdentifyingFields([
(new FieldId())
->setName('name')
]);
$storageConfig = (new StorageConfig())
->setBigQueryOptions($bigQueryOptions);
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
$personNameInfoType = (new InfoType())
->setName('PERSON_NAME');
$infoTypes = [$personNameInfoType];
// Specify how the content should be inspected.
$inspectConfig = (new InspectConfig())
->setInfoTypes($infoTypes)
->setIncludeQuote(true);
// Specify the action that is triggered when the job completes.
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Configure the long running job we want the service to perform.
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$job = $dlp->createDlpJob($parent, [
'inspectJob' => $inspectJob
]);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$job = $dlp->getDlpJob($job->getName());
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
printf('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
printf('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(
' Found %s instance(s) of infoType %s' . PHP_EOL,
$infoTypeStat->getCount(),
$infoTypeStat->getInfoType()->getName()
);
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
printf('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
Python
如需了解如何安装和使用用于 Sensitive Data Protection 的客户端库,请参阅 Sensitive Data Protection 客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import threading
import google.cloud.dlp
import google.cloud.pubsub
def inspect_bigquery_table_with_sampling(
project: str,
topic_id: str,
subscription_id: str,
min_likelihood: str = None,
max_findings: str = None,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to analyze BigQuery data by limiting
the amount of data to be scanned.
Args:
project: The Google Cloud project id to use as a parent resource.
topic_id: The id of the Cloud Pub/Sub topic to which the API will
broadcast job completion. The topic must already exist.
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
while waiting for job completion. The subscription must already
exist and be subscribed to the topic.
min_likelihood: A string representing the minimum likelihood threshold
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
max_findings: The maximum number of findings to report; 0 = no maximum.
timeout: The number of seconds to wait for a response from the API.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Specify how the content should be inspected. Keys which are None may
# optionally be omitted entirely.
inspect_config = {
"info_types": [{"name": "PERSON_NAME"}],
"min_likelihood": min_likelihood,
"limits": {"max_findings_per_request": max_findings},
"include_quote": True,
}
# Specify the BigQuery table to be inspected.
# Here we are using public bigquery table.
table_reference = {
"project_id": "bigquery-public-data",
"dataset_id": "usa_names",
"table_id": "usa_1910_current",
}
# Construct a storage_config containing the target BigQuery info.
storage_config = {
"big_query_options": {
"table_reference": table_reference,
"rows_limit": 1000,
"sample_method": "RANDOM_START",
"identifying_fields": [{"name": "name"}],
}
}
# Tell the API where to send a notification when the job is complete.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
actions = [{"pub_sub": {"topic": topic}}]
# Construct the inspect_job, which defines the entire inspect content task.
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
# Convert the project id into full resource ids.
parent = f"projects/{project}/locations/global"
# Call the API
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print(f"Inspection operation started: {operation.name}")
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
# Set up a callback to acknowledge a message. This closes around an event
# so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
try:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
if job.inspect_details.result.info_type_stats:
for finding in job.inspect_details.result.info_type_stats:
print(
f"Info type: {finding.info_type.name}; Count: {finding.count}"
)
else:
print("No findings.")
# Signal to the main thread that we can exit.
job_done.set()
else:
# This is not the message we're looking for.
message.drop()
except Exception as e:
# Because this is executing in a thread, an exception won't be
# noted unless we print it manually.
print(e)
raise
# Register the callback and wait on the event.
subscriber.subscribe(subscription_path, callback=callback)
finished = job_done.wait(timeout=timeout)
if not finished:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。