使用采样检查存储空间

以下示例演示了如何使用 Cloud DLP API 扫描 Cloud Storage 存储分区中 90%的子集以查找人员姓名。从数据集中的随机位置开始扫描,并且仅包含 200 字节以下的文本文件。

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

C#

如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库

如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class InspectStorageWithSampling
{
    public static async Task<DlpJob> InspectAsync(
        string projectId,
        string gcsUri,
        string topicId,
        string subId,
        Likelihood minLikelihood = Likelihood.Possible,
        IEnumerable<InfoType> infoTypes = null)
    {

        // Instantiate the dlp client.
        var dlp = DlpServiceClient.Create();

        // Construct Storage config by specifying the GCS file to be inspected
        // and sample method.
        var storageConfig = new StorageConfig
        {
            CloudStorageOptions = new CloudStorageOptions
            {
                FileSet = new CloudStorageOptions.Types.FileSet
                {
                    Url = gcsUri
                },
                BytesLimitPerFile = 200,
                FileTypes = { new FileType[] { FileType.Csv } },
                FilesLimitPercent = 90,
                SampleMethod = CloudStorageOptions.Types.SampleMethod.RandomStart
            }
        };

        // Construct the Inspect Config and specify the type of info the inspection
        // will look for.
        var inspectConfig = new InspectConfig
        {
            InfoTypes =
            {
                infoTypes ?? new InfoType[] { new InfoType { Name = "PERSON_NAME" } }
            },
            IncludeQuote = true,
            MinLikelihood = minLikelihood
        };

        // Construct the pubsub action.
        var actions = new Action[]
        {
            new Action
            {
                PubSub = new Action.Types.PublishToPubSub
                {
                    Topic = $"projects/{projectId}/topics/{topicId}"
                }
            }
        };

        // Construct the inspect job config using above created objects.
        var inspectJob = new InspectJobConfig
        {
            StorageConfig = storageConfig,
            InspectConfig = inspectConfig,
            Actions = { actions }
        };

        // Issue Create Dlp Job Request
        var request = new CreateDlpJobRequest
        {
            InspectJob = inspectJob,
            ParentAsLocationName = new LocationName(projectId, "global"),
        };

        // We keep the name of the job that we just created.
        var dlpJob = dlp.CreateDlpJob(request);
        var jobName = dlpJob.Name;

        // Listen to pub/sub for the job
        var subscriptionName = new SubscriptionName(projectId, subId);
        var subscriber = await SubscriberClient.CreateAsync(
            subscriptionName);

        await subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            if (message.Attributes["DlpJobName"] == jobName)
            {
                subscriber.StopAsync(cancel);
                return Task.FromResult(SubscriberClient.Reply.Ack);
            }
            else
            {
                return Task.FromResult(SubscriberClient.Reply.Nack);
            }
        });

        // Get the latest state of the job from the service
        var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
        {
            DlpJobName = DlpJobName.Parse(jobName)
        });

        // Parse the response and process results.
        System.Console.WriteLine($"Job status: {resultJob.State}");
        System.Console.WriteLine($"Job Name: {resultJob.Name}");

        var result = resultJob.InspectDetails.Result;
        foreach (var infoType in result.InfoTypeStats)
        {
            System.Console.WriteLine($"Info Type: {infoType.InfoType.Name}");
            System.Console.WriteLine($"Count: {infoType.Count}");
        }
        return resultJob;
    }
}

Go

如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库

如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	dlp "cloud.google.com/go/dlp/apiv2"
	"cloud.google.com/go/dlp/apiv2/dlppb"
	"cloud.google.com/go/pubsub"
)

// inspectGcsFileWithSampling inspects a storage with sampling
func inspectGcsFileWithSampling(w io.Writer, projectID, gcsUri, topicID, subscriptionId string) error {
	// projectId := "your-project-id"
	// gcsUri := "gs://" + "your-bucket-name" + "/path/to/your/file.txt"
	// topicID := "your-pubsub-topic-id"
	// subscriptionId := "your-pubsub-subscription-id"

	ctx := context.Background()

	// Initialize a client once and reuse it to send multiple requests. Clients
	// are safe to use across goroutines. When the client is no longer needed,
	// call the Close method to cleanup its resources.
	client, err := dlp.NewClient(ctx)
	if err != nil {
		return err
	}
	// Closing the client safely cleans up background resources.
	defer client.Close()

	// Specify the GCS file to be inspected and sampling configuration
	var cloudStorageOptions = &dlppb.CloudStorageOptions{
		FileSet: &dlppb.CloudStorageOptions_FileSet{
			Url: gcsUri,
		},
		BytesLimitPerFile: int64(200),
		FileTypes: []dlppb.FileType{
			dlppb.FileType_TEXT_FILE,
		},
		FilesLimitPercent: int32(90),
		SampleMethod:      dlppb.CloudStorageOptions_RANDOM_START,
	}

	var storageConfig = &dlppb.StorageConfig{
		Type: &dlppb.StorageConfig_CloudStorageOptions{
			CloudStorageOptions: cloudStorageOptions,
		},
	}

	// Specify the type of info the inspection will look for.
	// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
	// Specify how the content should be inspected.
	var inspectConfig = &dlppb.InspectConfig{
		InfoTypes: []*dlppb.InfoType{
			{Name: "PERSON_NAME"},
		},
		ExcludeInfoTypes: true,
		IncludeQuote:     true,
		MinLikelihood:    dlppb.Likelihood_POSSIBLE,
	}

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return err
	}
	defer pubsubClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	// Create the Topic if it doesn't exist.
	t := pubsubClient.Topic(topicID)
	if exists, err := t.Exists(ctx); err != nil {
		return err
	} else if !exists {
		if t, err = pubsubClient.CreateTopic(ctx, topicID); err != nil {
			return err
		}
	}

	// Create the Subscription if it doesn't exist.
	s := pubsubClient.Subscription(subscriptionId)
	if exists, err := s.Exists(ctx); err != nil {
		return err
	} else if !exists {
		if s, err = pubsubClient.CreateSubscription(ctx, subscriptionId, pubsub.SubscriptionConfig{Topic: t}); err != nil {
			return err
		}
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + projectID + "/topics/" + topicID

	var action = &dlppb.Action{
		Action: &dlppb.Action_PubSub{
			PubSub: &dlppb.Action_PublishToPubSub{
				Topic: topic,
			},
		},
	}

	// Configure the long running job we want the service to perform.
	var inspectJobConfig = &dlppb.InspectJobConfig{
		StorageConfig: storageConfig,
		InspectConfig: inspectConfig,
		Actions: []*dlppb.Action{
			action,
		},
	}

	// Create the request for the job configured above.
	req := &dlppb.CreateDlpJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
		Job: &dlppb.CreateDlpJobRequest_InspectJob{
			InspectJob: inspectJobConfig,
		},
	}

	// Use the client to send the request.
	j, err := client.CreateDlpJob(ctx, req)
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "Job Created: %v", j.GetName())

	// Wait for the inspect job to finish by waiting for a PubSub message.
	// This only waits for 10 minutes. For long jobs, consider using a truly
	// asynchronous execution model such as Cloud Functions.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
	defer cancel()
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()

		// Stop listening for more messages.
		defer cancel()

		resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			fmt.Fprintf(w, "Error getting completed job: %v\n", err)
			return
		}
		r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
		if len(r) == 0 {
			fmt.Fprintf(w, "No results")
			return
		}
		for _, s := range r {
			fmt.Fprintf(w, "\nFound %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
		}
	})
	if err != nil {
		return err
	}
	return nil

}

Java

如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库

如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CloudStorageOptions;
import com.google.privacy.dlp.v2.CloudStorageOptions.FileSet;
import com.google.privacy.dlp.v2.CloudStorageOptions.SampleMethod;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FileType;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.Likelihood;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class InspectGcsFileWithSampling {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt";
    String topicId = "your-pubsub-topic-id";
    String subscriptionId = "your-pubsub-subscription-id";
    inspectGcsFileWithSampling(projectId, gcsUri, topicId, subscriptionId);
  }

  // Inspects a file in a Google Cloud Storage Bucket.
  public static void inspectGcsFileWithSampling(
      String projectId, String gcsUri, String topicId, String subscriptionId)
      throws ExecutionException, InterruptedException, IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (DlpServiceClient dlp = DlpServiceClient.create()) {
      // Specify the GCS file to be inspected and sampling configuration
      CloudStorageOptions cloudStorageOptions =
          CloudStorageOptions.newBuilder()
              .setFileSet(FileSet.newBuilder().setUrl(gcsUri))
              .setBytesLimitPerFile(200)
              .addFileTypes(FileType.TEXT_FILE)
              .setFilesLimitPercent(90)
              .setSampleMethod(SampleMethod.RANDOM_START)
              .build();

      StorageConfig storageConfig =
          StorageConfig.newBuilder().setCloudStorageOptions(cloudStorageOptions).build();

      // Specify the type of info the inspection will look for.
      // See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
      InfoType infoType = InfoType.newBuilder().setName("PERSON_NAME").build();

      // Specify how the content should be inspected.
      InspectConfig inspectConfig =
          InspectConfig.newBuilder()
              .addInfoTypes(infoType)
              .setExcludeInfoTypes(true)
              .setIncludeQuote(true)
              .setMinLikelihood(Likelihood.POSSIBLE)
              .build();

      // Specify the action that is triggered when the job completes.
      String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
      Action.PublishToPubSub publishToPubSub =
          Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
      Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

      // Configure the long running job we want the service to perform.
      InspectJobConfig inspectJobConfig =
          InspectJobConfig.newBuilder()
              .setStorageConfig(storageConfig)
              .setInspectConfig(inspectConfig)
              .addActions(action)
              .build();

      // Create the request for the job configured above.
      CreateDlpJobRequest createDlpJobRequest =
          CreateDlpJobRequest.newBuilder()
              .setParent(LocationName.of(projectId, "global").toString())
              .setInspectJob(inspectJobConfig)
              .build();

      // Use the client to send the request.
      final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
      System.out.println("Job created: " + dlpJob.getName());

      // Set up a Pub/Sub subscriber to listen on the job completion status
      final SettableApiFuture<Boolean> done = SettableApiFuture.create();

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      MessageReceiver messageHandler =
          (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
            handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
          };
      Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
      subscriber.startAsync();

      // Wait for job completion semi-synchronously
      // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
      try {
        done.get(15, TimeUnit.MINUTES);
      } catch (TimeoutException e) {
        System.out.println("Job was not completed after 15 minutes.");
        return;
      } finally {
        subscriber.stopAsync();
        subscriber.awaitTerminated();
      }

      // Get the latest state of the job from the service
      GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
      DlpJob completedJob = dlp.getDlpJob(request);

      // Parse the response and process results.
      System.out.println("Job status: " + completedJob.getState());
      System.out.println("Job name: " + dlpJob.getName());
      InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
      System.out.println("Findings: ");
      for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
        System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
        System.out.println("\tCount: " + infoTypeStat.getCount());
      }
    }
  }

  // handleMessage injects the job and settableFuture into the message reciever interface
  private static void handleMessage(
      DlpJob job,
      SettableApiFuture<Boolean> done,
      PubsubMessage pubsubMessage,
      AckReplyConsumer ackReplyConsumer) {
    String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
    if (job.getName().equals(messageAttribute)) {
      done.set(true);
      ackReplyConsumer.ack();
    } else {
      ackReplyConsumer.nack();
    }
  }
}

Node.js

如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库

如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const projectId = 'my-project';

// The gcs file path
// const gcsUri = 'gs://" + "your-bucket-name" + "/path/to/your/file.txt';

// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
// const infoTypes = [{ name: 'PERSON_NAME' }];

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

// DLP Job max time (in milliseconds)
const DLP_JOB_WAIT_TIME = 15 * 1000 * 60;

async function inspectGcsFileSampling() {
  // Specify the GCS file to be inspected and sampling configuration
  const storageItemConfig = {
    cloudStorageOptions: {
      fileSet: {url: gcsUri},
      bytesLimitPerFile: 200,
      filesLimitPercent: 90,
      fileTypes: [DLP.protos.google.privacy.dlp.v2.FileType.TEXT_FILE],
      sampleMethod:
        DLP.protos.google.privacy.dlp.v2.CloudStorageOptions.SampleMethod
          .RANDOM_START,
    },
  };

  // Specify how the content should be inspected.
  const inspectConfig = {
    infoTypes: infoTypes,
    minLikelihood: DLP.protos.google.privacy.dlp.v2.Likelihood.POSSIBLE,
    includeQuote: true,
    excludeInfoTypes: true,
  };

  // Specify the action that is triggered when the job completes.
  const actions = [
    {
      pubSub: {
        topic: `projects/${projectId}/topics/${topicId}`,
      },
    },
  ];

  // Create the request for the job configured above.
  const request = {
    parent: `projects/${projectId}/locations/global`,
    inspectJob: {
      inspectConfig: inspectConfig,
      storageConfig: storageItemConfig,
      actions: actions,
    },
  };

  // Use the client to send the request.
  const [topicResponse] = await pubsub.topic(topicId).get();

  // Verify the Pub/Sub topic and listen for job notifications via an
  // existing subscription.
  const subscription = await topicResponse.subscription(subscriptionId);

  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    // Set up the timeout
    const timer = setTimeout(() => {
      reject(new Error('Timeout'));
    }, DLP_JOB_WAIT_TIME);

    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        clearTimeout(timer);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      clearTimeout(timer);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  const [job] = await dlp.getDlpJob({name: jobName});
  console.log(`Job ${job.name} status: ${job.state}`);

  const infoTypeStats = job.inspectDetails.result.infoTypeStats;
  if (infoTypeStats.length > 0) {
    infoTypeStats.forEach(infoTypeStat => {
      console.log(
        `  Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
      );
    });
  } else {
    console.log('No findings.');
  }
}

await inspectGcsFileSampling();

PHP

如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库

如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryOptions\SampleMethod;
use Google\Cloud\Dlp\V2\CloudStorageOptions;
use Google\Cloud\Dlp\V2\CloudStorageOptions\FileSet;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Inspect storage with sampling.
 * The following examples demonstrate using the Cloud DLP API to scan a 90% subset of a
 * Cloud Storage bucket for person names. The scan starts from a random location in the dataset
 * and only includes text files under 200 bytes.
 *
 * @param string $callingProjectId  The project ID to run the API call under.
 * @param string $gcsUri            Google Cloud Storage file url.
 * @param string $topicId           The ID of the Pub/Sub topic to notify once the job completes.
 * @param string $subscriptionId    The ID of the Pub/Sub subscription to use when listening for job.
 */
function inspect_gcs_with_sampling(
    // TODO(developer): Replace sample parameters before running the code.
    string $callingProjectId,
    string $gcsUri = 'gs://GOOGLE_STORAGE_BUCKET_NAME/dlp_sample.csv',
    string $topicId = 'dlp-pubsub-topic',
    string $subscriptionId = 'dlp_subcription'
): void {
    // Instantiate a client.
    $dlp = new DlpServiceClient();
    $pubsub = new PubSubClient();
    $topic = $pubsub->topic($topicId);

    // Construct the items to be inspected.
    $cloudStorageOptions = (new CloudStorageOptions())
        ->setFileSet((new FileSet())
            ->setUrl($gcsUri))
        ->setBytesLimitPerFile(200)
        ->setFilesLimitPercent(90)
        ->setSampleMethod(SampleMethod::RANDOM_START);

    $storageConfig = (new StorageConfig())
        ->setCloudStorageOptions($cloudStorageOptions);

    // Specify the type of info the inspection will look for.
    $phoneNumberInfoType = (new InfoType())
        ->setName('PHONE_NUMBER');
    $emailAddressInfoType = (new InfoType())
        ->setName('EMAIL_ADDRESS');
    $cardNumberInfoType = (new InfoType())
        ->setName('CREDIT_CARD_NUMBER');
    $infoTypes = [$phoneNumberInfoType, $emailAddressInfoType, $cardNumberInfoType];

    // Specify how the content should be inspected.
    $inspectConfig = (new InspectConfig())
        ->setInfoTypes($infoTypes)
        ->setIncludeQuote(true);

    // Construct the action to run when job completes.
    $action = (new Action())
        ->setPubSub((new PublishToPubSub())
            ->setTopic($topic->name()));

    // Construct inspect job config to run.
    $inspectJob = (new InspectJobConfig())
        ->setInspectConfig($inspectConfig)
        ->setStorageConfig($storageConfig)
        ->setActions([$action]);

    // Listen for job notifications via an existing topic/subscription.
    $subscription = $topic->subscription($subscriptionId);

    // Submit request.
    $parent = "projects/$callingProjectId/locations/global";
    $job = $dlp->createDlpJob($parent, [
        'inspectJob' => $inspectJob
    ]);

    // Poll Pub/Sub using exponential backoff until job finishes.
    // Consider using an asynchronous execution model such as Cloud Functions.
    $attempt = 1;
    $startTime = time();
    do {
        foreach ($subscription->pull() as $message) {
            if (
                isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()
            ) {
                $subscription->acknowledge($message);
                // Get the updated job. Loop to avoid race condition with DLP API.
                do {
                    $job = $dlp->getDlpJob($job->getName());
                } while ($job->getState() == JobState::RUNNING);
                break 2; // break from parent do while.
            }
        }
        printf('Waiting for job to complete' . PHP_EOL);
        // Exponential backoff with max delay of 60 seconds.
        sleep(min(60, pow(2, ++$attempt)));
    } while (time() - $startTime < 600); // 10 minute timeout.

    // Print finding counts.
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
    switch ($job->getState()) {
        case JobState::DONE:
            $infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
            if (count($infoTypeStats) === 0) {
                printf('No findings.' . PHP_EOL);
            } else {
                foreach ($infoTypeStats as $infoTypeStat) {
                    printf(
                        '  Found %s instance(s) of infoType %s' . PHP_EOL,
                        $infoTypeStat->getCount(),
                        $infoTypeStat->getInfoType()->getName()
                    );
                }
            }
            break;
        case JobState::FAILED:
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            $errors = $job->getErrors();
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        case JobState::PENDING:
            printf('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
            break;
        default:
            printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
    }
}

Python

如需了解如何安装和使用敏感数据保护客户端库,请参阅 敏感数据保护客户端库

如需向 Sensitive Data Protection 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import threading
from typing import List

import google.cloud.dlp
import google.cloud.pubsub


def inspect_gcs_with_sampling(
    project: str,
    bucket: str,
    topic_id: str,
    subscription_id: str,
    info_types: List[str] = None,
    file_types: List[str] = None,
    min_likelihood: str = None,
    max_findings: int = None,
    timeout: int = 300,
) -> None:
    """Uses the Data Loss Prevention API to analyze files in GCS by
    limiting the amount of data to be scanned.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        bucket: The name of the GCS bucket containing the file, as a string.
        topic_id: The id of the Cloud Pub/Sub topic to which the API will
            broadcast job completion. The topic must already exist.
        subscription_id: The id of the Cloud Pub/Sub subscription to listen on
            while waiting for job completion. The subscription must already
            exist and be subscribed to the topic.
        info_types: A list of strings representing infoTypes to look for.
            A full list of info type categories can be fetched from the API.
        file_types: Type of files in gcs bucket where the inspection would happen.
        min_likelihood: A string representing the minimum likelihood threshold
            that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
            'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
        max_findings: The maximum number of findings to report; 0 = no maximum.
        timeout: The number of seconds to wait for a response from the API.
    """

    # Instantiate a client.
    dlp = google.cloud.dlp_v2.DlpServiceClient()

    # Prepare info_types by converting the list of strings into a list of
    # dictionaries.
    if not info_types:
        info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
    info_types = [{"name": info_type} for info_type in info_types]

    # Specify how the content should be inspected. Keys which are None may
    # optionally be omitted entirely.
    inspect_config = {
        "info_types": info_types,
        "exclude_info_types": True,
        "include_quote": True,
        "min_likelihood": min_likelihood,
        "limits": {"max_findings_per_request": max_findings},
    }

    # Setting default file types as CSV files
    if not file_types:
        file_types = ["CSV"]

    # Construct a cloud_storage_options dictionary with the bucket's URL.
    url = f"gs://{bucket}/*"
    storage_config = {
        "cloud_storage_options": {
            "file_set": {"url": url},
            "bytes_limit_per_file": 200,
            "file_types": file_types,
            "files_limit_percent": 90,
            "sample_method": "RANDOM_START",
        }
    }

    # Tell the API where to send a notification when the job is complete.
    topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
    actions = [{"pub_sub": {"topic": topic}}]

    # Construct the inspect_job, which defines the entire inspect content task.
    inspect_job = {
        "inspect_config": inspect_config,
        "storage_config": storage_config,
        "actions": actions,
    }

    # Convert the project id into full resource ids.
    parent = f"projects/{project}/locations/global"

    # Call the API
    operation = dlp.create_dlp_job(
        request={"parent": parent, "inspect_job": inspect_job}
    )
    print(f"Inspection operation started: {operation.name}")

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_id)

    # Set up a callback to acknowledge a message. This closes around an event
    # so that it can signal that it is done and the main thread can continue.
    job_done = threading.Event()

    def callback(message):
        try:
            if message.attributes["DlpJobName"] == operation.name:
                # This is the message we're looking for, so acknowledge it.
                message.ack()

                # Now that the job is done, fetch the results and print them.
                job = dlp.get_dlp_job(request={"name": operation.name})
                print(f"Job name: {job.name}")
                if job.inspect_details.result.info_type_stats:
                    print("Findings:")
                    for finding in job.inspect_details.result.info_type_stats:
                        print(
                            f"Info type: {finding.info_type.name}; Count: {finding.count}"
                        )
                else:
                    print("No findings.")

                # Signal to the main thread that we can exit.
                job_done.set()
            else:
                # This is not the message we're looking for.
                message.drop()
        except Exception as e:
            # Because this is executing in a thread, an exception won't be
            # noted unless we print it manually.
            print(e)
            raise

    # Register the callback and wait on the event.
    subscriber.subscribe(subscription_path, callback=callback)
    finished = job_done.wait(timeout=timeout)
    if not finished:
        print(
            "No event received before the timeout. Please verify that the "
            "subscription provided is subscribed to the topic provided."
        )

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器