演示如何在 Cloud Storage 中的文件中查找敏感数据。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C#
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;
public class InspectGoogleCloudStorage
{
public static DlpJob InspectGCS(
string projectId,
Likelihood minLikelihood,
int maxFindings,
bool includeQuote,
IEnumerable<InfoType> infoTypes,
IEnumerable<CustomInfoType> customInfoTypes,
string bucketName,
string topicId,
string subscriptionId)
{
var inspectJob = new InspectJobConfig
{
StorageConfig = new StorageConfig
{
CloudStorageOptions = new CloudStorageOptions
{
FileSet = new CloudStorageOptions.Types.FileSet { Url = $"gs://{bucketName}/*.txt" },
BytesLimitPerFile = 1073741824
},
},
InspectConfig = new InspectConfig
{
InfoTypes = { infoTypes },
CustomInfoTypes = { customInfoTypes },
ExcludeInfoTypes = false,
IncludeQuote = includeQuote,
Limits = new FindingLimits
{
MaxFindingsPerRequest = maxFindings
},
MinLikelihood = minLikelihood
},
Actions =
{
new Google.Cloud.Dlp.V2.Action
{
// Send results to Pub/Sub topic
PubSub = new Google.Cloud.Dlp.V2.Action.Types.PublishToPubSub
{
Topic = topicId,
}
}
}
};
// Issue Create Dlp Job Request
var client = DlpServiceClient.Create();
var request = new CreateDlpJobRequest
{
InspectJob = inspectJob,
Parent = new LocationName(projectId, "global").ToString(),
};
// We need created job name
var dlpJob = client.CreateDlpJob(request);
// Get a pub/sub subscription and listen for DLP results
var fireEvent = new ManualResetEventSlim();
var subscriptionName = new SubscriptionName(projectId, subscriptionId);
var subscriber = SubscriberClient.CreateAsync(subscriptionName).Result;
subscriber.StartAsync(
(pubSubMessage, cancellationToken) =>
{
// Given a message that we receive on this subscription, we should either acknowledge or decline it
if (pubSubMessage.Attributes["DlpJobName"] == dlpJob.Name)
{
fireEvent.Set();
return Task.FromResult(SubscriberClient.Reply.Ack);
}
return Task.FromResult(SubscriberClient.Reply.Nack);
});
// We block here until receiving a signal from a separate thread that is waiting on a message indicating receiving a result of Dlp job
if (fireEvent.Wait(TimeSpan.FromMinutes(1)))
{
// Stop the thread that is listening to messages as a result of StartAsync call earlier
subscriber.StopAsync(CancellationToken.None).Wait();
// Now we can inspect full job results
var job = client.GetDlpJob(new GetDlpJobRequest { DlpJobName = new DlpJobName(projectId, dlpJob.Name) });
// Inspect Job details
Console.WriteLine($"Processed bytes: {job.InspectDetails.Result.ProcessedBytes}");
Console.WriteLine($"Total estimated bytes: {job.InspectDetails.Result.TotalEstimatedBytes}");
var stats = job.InspectDetails.Result.InfoTypeStats;
Console.WriteLine("Found stats:");
foreach (var stat in stats)
{
Console.WriteLine($"{stat.InfoType.Name}");
}
return job;
}
throw new InvalidOperationException("The wait failed on timeout");
}
}
Go
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"strings"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectGCSFile searches for the given info types in the given file.
func inspectGCSFile(w io.Writer, projectID string, infoTypeNames []string, customDictionaries []string, customRegexes []string, pubSubTopic, pubSubSub, bucketName, fileName string) error {
// projectID := "my-project-id"
// infoTypeNames := []string{"US_SOCIAL_SECURITY_NUMBER"}
// customDictionaries := []string{...}
// customRegexes := []string{...}
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// bucketName := "my-bucket"
// fileName := "my-file.txt"
ctx := context.Background()
client, err := dlp.NewClient(ctx)
if err != nil {
return fmt.Errorf("dlp.NewClient: %w", err)
}
// Convert the info type strings to a list of InfoTypes.
var infoTypes []*dlppb.InfoType
for _, it := range infoTypeNames {
infoTypes = append(infoTypes, &dlppb.InfoType{Name: it})
}
// Convert the custom dictionary word lists and custom regexes to a list of CustomInfoTypes.
var customInfoTypes []*dlppb.CustomInfoType
for idx, it := range customDictionaries {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_DICTIONARY_%d", idx),
},
Type: &dlppb.CustomInfoType_Dictionary_{
Dictionary: &dlppb.CustomInfoType_Dictionary{
Source: &dlppb.CustomInfoType_Dictionary_WordList_{
WordList: &dlppb.CustomInfoType_Dictionary_WordList{
Words: strings.Split(it, ","),
},
},
},
},
})
}
for idx, it := range customRegexes {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_REGEX_%d", idx),
},
Type: &dlppb.CustomInfoType_Regex_{
Regex: &dlppb.CustomInfoType_Regex{
Pattern: it,
},
},
})
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(pubSubTopic)
if exists, err := t.Exists(ctx); err != nil {
return fmt.Errorf("t.Exists: %w", err)
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
return fmt.Errorf("CreateTopic: %w", err)
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(pubSubSub)
if exists, err := s.Exists(ctx); err != nil {
return fmt.Errorf("s.Exists: %w", err)
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return fmt.Errorf("CreateSubscription: %w", err)
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + pubSubTopic
// Create a configured request.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: &dlppb.InspectJobConfig{
// StorageConfig describes where to find the data.
StorageConfig: &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_CloudStorageOptions{
CloudStorageOptions: &dlppb.CloudStorageOptions{
FileSet: &dlppb.CloudStorageOptions_FileSet{
Url: "gs://" + bucketName + "/" + fileName,
},
},
},
},
// InspectConfig describes what fields to look for.
InspectConfig: &dlppb.InspectConfig{
InfoTypes: infoTypes,
CustomInfoTypes: customInfoTypes,
MinLikelihood: dlppb.Likelihood_POSSIBLE,
Limits: &dlppb.InspectConfig_FindingLimits{
MaxFindingsPerRequest: 10,
},
IncludeQuote: true,
},
// Send a message to PubSub using Actions.
Actions: []*dlppb.Action{
{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
},
},
},
},
}
// Create the inspect job.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return fmt.Errorf("CreateDlpJob: %w", err)
}
fmt.Fprintf(w, "Created job: %v\n", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "Cloud not get job: %v", err)
return
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
}
for _, s := range r {
fmt.Fprintf(w, " Found %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
})
if err != nil {
return fmt.Errorf("Receive: %w", err)
}
return nil
}
Java
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CloudStorageOptions;
import com.google.privacy.dlp.v2.CloudStorageOptions.FileSet;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InspectGcsFile {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt";
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectGcsFile(projectId, gcsUri, topicId, subscriptionId);
}
// Inspects a file in a Google Cloud Storage Bucket.
public static void inspectGcsFile(
String projectId, String gcsUri, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlp = DlpServiceClient.create()) {
// Specify the GCS file to be inspected.
CloudStorageOptions cloudStorageOptions =
CloudStorageOptions.newBuilder().setFileSet(FileSet.newBuilder().setUrl(gcsUri)).build();
StorageConfig storageConfig =
StorageConfig.newBuilder().setCloudStorageOptions(cloudStorageOptions).build();
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
List<InfoType> infoTypes =
Stream.of("PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD_NUMBER")
.map(it -> InfoType.newBuilder().setName(it).build())
.collect(Collectors.toList());
// Specify how the content should be inspected.
InspectConfig inspectConfig =
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();
// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the long running job we want the service to perform.
InspectJobConfig inspectJobConfig =
InspectJobConfig.newBuilder()
.setStorageConfig(storageConfig)
.setInspectConfig(inspectConfig)
.addActions(action)
.build();
// Create the request for the job configured above.
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setInspectJob(inspectJobConfig)
.build();
// Use the client to send the request.
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);
// Parse the response and process results.
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
System.out.println("Findings: ");
for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
System.out.println("\tCount: " + infoTypeStat.getCount());
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The name of the bucket where the file resides.
// const bucketName = 'YOUR-BUCKET';
// The path to the file within the bucket to inspect.
// Can contain wildcards, e.g. "my-image.*"
// const fileName = 'my-image.png';
// The minimum likelihood required before returning a match
// const minLikelihood = 'LIKELIHOOD_UNSPECIFIED';
// The maximum number of findings to report per request (0 = server maximum)
// const maxFindings = 0;
// The infoTypes of information to match
// const infoTypes = [{ name: 'PHONE_NUMBER' }, { name: 'EMAIL_ADDRESS' }, { name: 'CREDIT_CARD_NUMBER' }];
// The customInfoTypes of information to match
// const customInfoTypes = [{ infoType: { name: 'DICT_TYPE' }, dictionary: { wordList: { words: ['foo', 'bar', 'baz']}}},
// { infoType: { name: 'REGEX_TYPE' }, regex: {pattern: '\\(\\d{3}\\) \\d{3}-\\d{4}'}}];
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
async function inspectGCSFile() {
// Get reference to the file to be inspected
const storageItem = {
cloudStorageOptions: {
fileSet: {url: `gs://${bucketName}/${fileName}`},
},
};
// Construct request for creating an inspect job
const request = {
parent: `projects/${projectId}/locations/global`,
inspectJob: {
inspectConfig: {
infoTypes: infoTypes,
customInfoTypes: customInfoTypes,
minLikelihood: minLikelihood,
limits: {
maxFindingsPerRequest: maxFindings,
},
},
storageConfig: storageItem,
actions: [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
],
},
};
// Create a GCS File inspection job and wait for it to complete
const [topicResponse] = await pubsub.topic(topicId).get();
// Verify the Pub/Sub topic and listen for job notifications via an
// existing subscription.
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
// Get the job's ID
const jobName = jobsResponse.name;
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
setTimeout(() => {
console.log('Waiting for DLP job to fully complete');
}, 500);
const [job] = await dlp.getDlpJob({name: jobName});
console.log(`Job ${job.name} status: ${job.state}`);
const infoTypeStats = job.inspectDetails.result.infoTypeStats;
if (infoTypeStats.length > 0) {
infoTypeStats.forEach(infoTypeStat => {
console.log(
` Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
);
});
} else {
console.log('No findings.');
}
}
await inspectGCSFile();
PHP
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CloudStorageOptions;
use Google\Cloud\Dlp\V2\CloudStorageOptions\FileSet;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\InspectConfig\FindingLimits;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\Dlp\V2\Likelihood;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Inspect a file stored on Google Cloud Storage , using Pub/Sub for job status notifications.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $bucketId The name of the bucket where the file resides
* @param string $file The path to the file within the bucket to inspect. Can contain wildcards e.g. "my-image.*"
* @param int $maxFindings (Optional) The maximum number of findings to report per request (0 = server maximum)
*/
function inspect_gcs(
string $callingProjectId,
string $topicId,
string $subscriptionId,
string $bucketId,
string $file,
int $maxFindings = 0
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// The infoTypes of information to match
$personNameInfoType = (new InfoType())
->setName('PERSON_NAME');
$creditCardNumberInfoType = (new InfoType())
->setName('CREDIT_CARD_NUMBER');
$infoTypes = [$personNameInfoType, $creditCardNumberInfoType];
// The minimum likelihood required before returning a match
$minLikelihood = likelihood::LIKELIHOOD_UNSPECIFIED;
// Specify finding limits
$limits = (new FindingLimits())
->setMaxFindingsPerRequest($maxFindings);
// Construct items to be inspected
$fileSet = (new FileSet())
->setUrl('gs://' . $bucketId . '/' . $file);
$cloudStorageOptions = (new CloudStorageOptions())
->setFileSet($fileSet);
$storageConfig = (new StorageConfig())
->setCloudStorageOptions($cloudStorageOptions);
// Construct the inspect config object
$inspectConfig = (new InspectConfig())
->setMinLikelihood($minLikelihood)
->setLimits($limits)
->setInfoTypes($infoTypes);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct inspect job config to run
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setInspectJob($inspectJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
print('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(' Found %s instance(s) of infoType %s' . PHP_EOL, $infoTypeStat->getCount(), $infoTypeStat->getInfoType()->getName());
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
Python
如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库。
如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import threading
from typing import List, Optional
import google.cloud.dlp
import google.cloud.pubsub
def inspect_gcs_file(
project: str,
bucket: str,
filename: str,
topic_id: str,
subscription_id: str,
info_types: List[str],
custom_dictionaries: List[str] = None,
custom_regexes: List[str] = None,
min_likelihood: Optional[str] = None,
max_findings: Optional[int] = None,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to analyze a file on GCS.
Args:
project: The Google Cloud project id to use as a parent resource.
bucket: The name of the GCS bucket containing the file, as a string.
filename: The name of the file in the bucket, including the path, as a
string; e.g. 'images/myfile.png'.
topic_id: The id of the Cloud Pub/Sub topic to which the API will
broadcast job completion. The topic must already exist.
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
while waiting for job completion. The subscription must already
exist and be subscribed to the topic.
info_types: A list of strings representing info types to look for.
A full list of info type categories can be fetched from the API.
min_likelihood: A string representing the minimum likelihood threshold
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
max_findings: The maximum number of findings to report; 0 = no maximum.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Prepare info_types by converting the list of strings into a list of
# dictionaries (protos are also accepted).
if not info_types:
info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
info_types = [{"name": info_type} for info_type in info_types]
# Prepare custom_info_types by parsing the dictionary word lists and
# regex patterns.
if custom_dictionaries is None:
custom_dictionaries = []
dictionaries = [
{
"info_type": {"name": f"CUSTOM_DICTIONARY_{i}"},
"dictionary": {"word_list": {"words": custom_dict.split(",")}},
}
for i, custom_dict in enumerate(custom_dictionaries)
]
if custom_regexes is None:
custom_regexes = []
regexes = [
{
"info_type": {"name": f"CUSTOM_REGEX_{i}"},
"regex": {"pattern": custom_regex},
}
for i, custom_regex in enumerate(custom_regexes)
]
custom_info_types = dictionaries + regexes
# Construct the configuration dictionary. Keys which are None may
# optionally be omitted entirely.
inspect_config = {
"info_types": info_types,
"custom_info_types": custom_info_types,
"min_likelihood": min_likelihood,
"limits": {"max_findings_per_request": max_findings},
}
# Construct a storage_config containing the file's URL.
url = f"gs://{bucket}/{filename}"
storage_config = {"cloud_storage_options": {"file_set": {"url": url}}}
# Convert the project id into full resource ids.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
parent = f"projects/{project}/locations/global"
# Tell the API where to send a notification when the job is complete.
actions = [{"pub_sub": {"topic": topic}}]
# Construct the inspect_job, which defines the entire inspect content task.
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print(f"Inspection operation started: {operation.name}")
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
# Set up a callback to acknowledge a message. This closes around an event
# so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
try:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
if job.inspect_details.result.info_type_stats:
for finding in job.inspect_details.result.info_type_stats:
print(
f"Info type: {finding.info_type.name}; Count: {finding.count}"
)
else:
print("No findings.")
# Signal to the main thread that we can exit.
job_done.set()
else:
# This is not the message we're looking for.
message.drop()
except Exception as e:
# Because this is executing in a thread, an exception won't be
# noted unless we print it manually.
print(e)
raise
subscriber.subscribe(subscription_path, callback=callback)
finished = job_done.wait(timeout=timeout)
if not finished:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。