检查 Datastore

演示如何查找存储在 Datastore 中的敏感数据。

包含此代码示例的文档页面

如需查看上下文中使用的代码示例,请参阅以下文档:

代码示例

C#

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库


using Google.Api.Gax.ResourceNames;
using Google.Cloud.BigQuery.V2;
using Google.Cloud.Dlp.V2;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Collections.Generic;
using System.Threading;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;

public class InspectCloudDataStore
{
    public static object Inspect(
        string projectId,
        Likelihood minLikelihood,
        int maxFindings,
        bool includeQuote,
        string kindName,
        string namespaceId,
        IEnumerable<InfoType> infoTypes,
        IEnumerable<CustomInfoType> customInfoTypes,
        string datasetId,
        string tableId)
    {
        var inspectJob = new InspectJobConfig
        {
            StorageConfig = new StorageConfig
            {
                DatastoreOptions = new DatastoreOptions
                {
                    Kind = new KindExpression { Name = kindName },
                    PartitionId = new PartitionId
                    {
                        NamespaceId = namespaceId,
                        ProjectId = projectId,
                    }
                },
                TimespanConfig = new StorageConfig.Types.TimespanConfig
                {
                    StartTime = Timestamp.FromDateTime(System.DateTime.UtcNow.AddYears(-1)),
                    EndTime = Timestamp.FromDateTime(System.DateTime.UtcNow)
                }
            },

            InspectConfig = new InspectConfig
            {
                InfoTypes = { infoTypes },
                CustomInfoTypes = { customInfoTypes },
                Limits = new FindingLimits
                {
                    MaxFindingsPerRequest = maxFindings
                },
                ExcludeInfoTypes = false,
                IncludeQuote = includeQuote,
                MinLikelihood = minLikelihood
            },
            Actions =
                {
                    new Google.Cloud.Dlp.V2.Action
                    {
                        // Save results in BigQuery Table
                        SaveFindings = new Google.Cloud.Dlp.V2.Action.Types.SaveFindings
                        {
                            OutputConfig = new OutputStorageConfig
                            {
                                Table = new Google.Cloud.Dlp.V2.BigQueryTable
                                {
                                    ProjectId = projectId,
                                    DatasetId = datasetId,
                                    TableId = tableId
                                }
                            }
                        },
                    }
                }
        };

        // Issue Create Dlp Job Request
        var client = DlpServiceClient.Create();
        var request = new CreateDlpJobRequest
        {
            InspectJob = inspectJob,
            Parent = new LocationName(projectId, "global").ToString(),
        };

        // We need created job name
        var dlpJob = client.CreateDlpJob(request);
        var jobName = dlpJob.Name;

        // Make sure the job finishes before inspecting the results.
        // Alternatively, we can inspect results opportunistically, but
        // for testing purposes, we want consistent outcome
        var finishedJob = EnsureJobFinishes(projectId, jobName);
        var bigQueryClient = BigQueryClient.Create(projectId);
        var table = bigQueryClient.GetTable(datasetId, tableId);

        // Return only first page of 10 rows
        Console.WriteLine("DLP v2 Results:");
        var firstPage = table.ListRows(new ListRowsOptions { StartIndex = 0, PageSize = 10 });
        foreach (var item in firstPage)
        {
            Console.WriteLine($"\t {item[""]}");
        }

        return finishedJob;
    }

    private static DlpJob EnsureJobFinishes(string projectId, string jobName)
    {
        var client = DlpServiceClient.Create();
        var request = new GetDlpJobRequest
        {
            DlpJobName = new DlpJobName(projectId, jobName),
        };

        // Simple logic that gives the job 5*30 sec at most to complete - for testing purposes only
        var numOfAttempts = 5;
        do
        {
            var dlpJob = client.GetDlpJob(request);
            numOfAttempts--;
            if (dlpJob.State != DlpJob.Types.JobState.Running)
            {
                return dlpJob;
            }

            Thread.Sleep(TimeSpan.FromSeconds(30));
        } while (numOfAttempts > 0);

        throw new InvalidOperationException("Job did not complete in time");
    }
}

Go

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

import (
	"context"
	"fmt"
	"io"
	"strings"
	"time"

	dlp "cloud.google.com/go/dlp/apiv2"
	"cloud.google.com/go/pubsub"
	dlppb "google.golang.org/genproto/googleapis/privacy/dlp/v2"
)

// inspectDatastore searches for the given info types in the given dataset kind.
func inspectDatastore(w io.Writer, projectID string, infoTypeNames []string, customDictionaries []string, customRegexes []string, pubSubTopic, pubSubSub, dataProject, namespaceID, kind string) error {
	// projectID := "my-project-id"
	// infoTypeNames := []string{"US_SOCIAL_SECURITY_NUMBER"}
	// customDictionaries := []string{...}
	// customRegexes := []string{...}
	// pubSubTopic := "dlp-risk-sample-topic"
	// pubSubSub := "dlp-risk-sample-sub"
	// namespaceID := "namespace-id"
	// kind := "MyKind"

	ctx := context.Background()
	client, err := dlp.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("dlp.NewClient: %v", err)
	}

	// Convert the info type strings to a list of InfoTypes.
	var infoTypes []*dlppb.InfoType
	for _, it := range infoTypeNames {
		infoTypes = append(infoTypes, &dlppb.InfoType{Name: it})
	}
	// Convert the custom dictionary word lists and custom regexes to a list of CustomInfoTypes.
	var customInfoTypes []*dlppb.CustomInfoType
	for idx, it := range customDictionaries {
		customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
			InfoType: &dlppb.InfoType{
				Name: fmt.Sprintf("CUSTOM_DICTIONARY_%d", idx),
			},
			Type: &dlppb.CustomInfoType_Dictionary_{
				Dictionary: &dlppb.CustomInfoType_Dictionary{
					Source: &dlppb.CustomInfoType_Dictionary_WordList_{
						WordList: &dlppb.CustomInfoType_Dictionary_WordList{
							Words: strings.Split(it, ","),
						},
					},
				},
			},
		})
	}
	for idx, it := range customRegexes {
		customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
			InfoType: &dlppb.InfoType{
				Name: fmt.Sprintf("CUSTOM_REGEX_%d", idx),
			},
			Type: &dlppb.CustomInfoType_Regex_{
				Regex: &dlppb.CustomInfoType_Regex{
					Pattern: it,
				},
			},
		})
	}

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer pubsubClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	// Create the Topic if it doesn't exist.
	t := pubsubClient.Topic(pubSubTopic)
	if exists, err := t.Exists(ctx); err != nil {
		return fmt.Errorf("t.Exists: %v", err)
	} else if !exists {
		if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
			return fmt.Errorf("CreateTopic: %v", err)
		}
	}

	// Create the Subscription if it doesn't exist.
	s := pubsubClient.Subscription(pubSubSub)
	if exists, err := s.Exists(ctx); err != nil {
		return fmt.Errorf("s.Exists: %v", err)
	} else if !exists {
		if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
			return fmt.Errorf("CreateSubscription: %v", err)
		}
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + projectID + "/topics/" + pubSubTopic

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
		Job: &dlppb.CreateDlpJobRequest_InspectJob{
			InspectJob: &dlppb.InspectJobConfig{
				// StorageConfig describes where to find the data.
				StorageConfig: &dlppb.StorageConfig{
					Type: &dlppb.StorageConfig_DatastoreOptions{
						DatastoreOptions: &dlppb.DatastoreOptions{
							PartitionId: &dlppb.PartitionId{
								ProjectId:   dataProject,
								NamespaceId: namespaceID,
							},
							Kind: &dlppb.KindExpression{
								Name: kind,
							},
						},
					},
				},
				// InspectConfig describes what fields to look for.
				InspectConfig: &dlppb.InspectConfig{
					InfoTypes:       infoTypes,
					CustomInfoTypes: customInfoTypes,
					MinLikelihood:   dlppb.Likelihood_POSSIBLE,
					Limits: &dlppb.InspectConfig_FindingLimits{
						MaxFindingsPerRequest: 10,
					},
					IncludeQuote: true,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the inspect job.
	j, err := client.CreateDlpJob(ctx, req)
	if err != nil {
		return fmt.Errorf("CreateDlpJob: %v", err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the inspect job to finish by waiting for a PubSub message.
	// This only waits for 10 minutes. For long jobs, consider using a truly
	// asynchronous execution model such as Cloud Functions.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
	defer cancel()
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()

		// Stop listening for more messages.
		defer cancel()

		resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			fmt.Fprintf(w, "Error getting completed job: %v\n", err)
			return
		}
		r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
		if len(r) == 0 {
			fmt.Fprintf(w, "No results")
			return
		}
		for _, s := range r {
			fmt.Fprintf(w, "  Found %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库


import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DatastoreOptions;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.KindExpression;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.PartitionId;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class InspectDatastoreEntity {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String datastoreNamespace = "your-datastore-namespace";
    String datastoreKind = "your-datastore-kind";
    String topicId = "your-pubsub-topic-id";
    String subscriptionId = "your-pubsub-subscription-id";
    insepctDatastoreEntity(projectId, datastoreNamespace, datastoreKind, topicId, subscriptionId);
  }

  // Inspects a Datastore Entity.
  public static void insepctDatastoreEntity(
      String projectId,
      String datastoreNamespce,
      String datastoreKind,
      String topicId,
      String subscriptionId)
      throws ExecutionException, InterruptedException, IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (DlpServiceClient dlp = DlpServiceClient.create()) {
      // Specify the Datastore entity to be inspected.
      PartitionId partitionId =
          PartitionId.newBuilder()
              .setProjectId(projectId)
              .setNamespaceId(datastoreNamespce)
              .build();
      KindExpression kindExpression = KindExpression.newBuilder().setName(datastoreKind).build();

      DatastoreOptions datastoreOptions =
          DatastoreOptions.newBuilder().setKind(kindExpression).setPartitionId(partitionId).build();

      StorageConfig storageConfig =
          StorageConfig.newBuilder().setDatastoreOptions(datastoreOptions).build();

      // Specify the type of info the inspection will look for.
      // See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
      List<InfoType> infoTypes =
          Stream.of("PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD_NUMBER")
              .map(it -> InfoType.newBuilder().setName(it).build())
              .collect(Collectors.toList());

      // Specify how the content should be inspected.
      InspectConfig inspectConfig =
          InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

      // Specify the action that is triggered when the job completes.
      String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
      Action.PublishToPubSub publishToPubSub =
          Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
      Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

      // Configure the long running job we want the service to perform.
      InspectJobConfig inspectJobConfig =
          InspectJobConfig.newBuilder()
              .setStorageConfig(storageConfig)
              .setInspectConfig(inspectConfig)
              .addActions(action)
              .build();

      // Create the request for the job configured above.
      CreateDlpJobRequest createDlpJobRequest =
          CreateDlpJobRequest.newBuilder()
              .setParent(LocationName.of(projectId, "global").toString())
              .setInspectJob(inspectJobConfig)
              .build();

      // Use the client to send the request.
      final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
      System.out.println("Job created: " + dlpJob.getName());

      // Set up a Pub/Sub subscriber to listen on the job completion status
      final SettableApiFuture<Boolean> done = SettableApiFuture.create();

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      MessageReceiver messageHandler =
          (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
            handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
          };
      Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
      subscriber.startAsync();

      // Wait for job completion semi-synchronously
      // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
      try {
        done.get(15, TimeUnit.MINUTES);
      } catch (TimeoutException e) {
        System.out.println("Job was not completed after 15 minutes.");
        return;
      } finally {
        subscriber.stopAsync();
        subscriber.awaitTerminated();
      }

      // Get the latest state of the job from the service
      GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
      DlpJob completedJob = dlp.getDlpJob(request);

      // Parse the response and process results.
      System.out.println("Job status: " + completedJob.getState());
      InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
      System.out.println("Findings: ");
      for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
        System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
        System.out.println("\tCount: " + infoTypeStat.getCount());
      }
    }
  }

  // handleMessage injects the job and settableFuture into the message reciever interface
  private static void handleMessage(
      DlpJob job,
      SettableApiFuture<Boolean> done,
      PubsubMessage pubsubMessage,
      AckReplyConsumer ackReplyConsumer) {
    String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
    if (job.getName().equals(messageAttribute)) {
      done.set(true);
      ackReplyConsumer.ack();
    } else {
      ackReplyConsumer.nack();
    }
  }
}

Node.js

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const projectId = 'my-project';

// The project ID the target Datastore is stored under
// This may or may not equal the calling project ID
// const dataProjectId = 'my-project';

// (Optional) The ID namespace of the Datastore document to inspect.
// To ignore Datastore namespaces, set this to an empty string ('')
// const namespaceId = '';

// The kind of the Datastore entity to inspect.
// const kind = 'Person';

// The minimum likelihood required before returning a match
// const minLikelihood = 'LIKELIHOOD_UNSPECIFIED';

// The maximum number of findings to report per request (0 = server maximum)
// const maxFindings = 0;

// The infoTypes of information to match
// const infoTypes = [{ name: 'PHONE_NUMBER' }, { name: 'EMAIL_ADDRESS' }, { name: 'CREDIT_CARD_NUMBER' }];

// The customInfoTypes of information to match
// const customInfoTypes = [{ infoType: { name: 'DICT_TYPE' }, dictionary: { wordList: { words: ['foo', 'bar', 'baz']}}},
//   { infoType: { name: 'REGEX_TYPE' }, regex: {pattern: '\\(\\d{3}\\) \\d{3}-\\d{4}'}}];

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

async function inspectDatastore() {
  // Construct items to be inspected
  const storageItems = {
    datastoreOptions: {
      partitionId: {
        projectId: dataProjectId,
        namespaceId: namespaceId,
      },
      kind: {
        name: kind,
      },
    },
  };

  // Construct request for creating an inspect job
  const request = {
    parent: `projects/${projectId}/locations/global`,
    inspectJob: {
      inspectConfig: {
        infoTypes: infoTypes,
        customInfoTypes: customInfoTypes,
        minLikelihood: minLikelihood,
        limits: {
          maxFindingsPerRequest: maxFindings,
        },
      },
      storageConfig: storageItems,
      actions: [
        {
          pubSub: {
            topic: `projects/${projectId}/topics/${topicId}`,
          },
        },
      ],
    },
  };
  // Run inspect-job creation request
  const [topicResponse] = await pubsub.topic(topicId).get();
  // Verify the Pub/Sub topic and listen for job notifications via an
  // existing subscription.
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  // Wait for DLP job to fully complete
  setTimeout(() => {
    console.log('Waiting for DLP job to fully complete');
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  console.log(`Job ${job.name} status: ${job.state}`);

  const infoTypeStats = job.inspectDetails.result.infoTypeStats;
  if (infoTypeStats.length > 0) {
    infoTypeStats.forEach(infoTypeStat => {
      console.log(
        `  Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
      );
    });
  } else {
    console.log('No findings.');
  }
}
inspectDatastore();

PHP

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

/**
 * Inspect Datastore, using Pub/Sub for job status notifications.
 */
use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\DatastoreOptions;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\Dlp\V2\KindExpression;
use Google\Cloud\Dlp\V2\PartitionId;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\Dlp\V2\Likelihood;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\InspectConfig\FindingLimits;
use Google\Cloud\PubSub\PubSubClient;

/** Uncomment and populate these variables in your code */
// $callingProjectId = 'The project ID to run the API call under';
// $dataProjectId = 'The project ID containing the target Datastore';
// $topicId = 'The name of the Pub/Sub topic to notify once the job completes';
// $subscriptionId = 'The name of the Pub/Sub subscription to use when listening for job';
// $kind = 'The datastore kind to inspect';
// $namespaceId = 'The ID namespace of the Datastore document to inspect';
// $maxFindings = 0;  // (Optional) The maximum number of findings to report per request (0 = server maximum)

// Instantiate clients
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);

// The infoTypes of information to match
$personNameInfoType = (new InfoType())
    ->setName('PERSON_NAME');
$phoneNumberInfoType = (new InfoType())
    ->setName('PHONE_NUMBER');
$infoTypes = [$personNameInfoType, $phoneNumberInfoType];

// The minimum likelihood required before returning a match
$minLikelihood = likelihood::LIKELIHOOD_UNSPECIFIED;

// Specify finding limits
$limits = (new FindingLimits())
    ->setMaxFindingsPerRequest($maxFindings);

// Construct items to be inspected
$partitionId = (new PartitionId())
    ->setProjectId($dataProjectId)
    ->setNamespaceId($namespaceId);

$kindExpression = (new KindExpression())
    ->setName($kind);

$datastoreOptions = (new DatastoreOptions())
    ->setPartitionId($partitionId)
    ->setKind($kindExpression);

// Construct the inspect config object
$inspectConfig = (new InspectConfig())
    ->setInfoTypes($infoTypes)
    ->setMinLikelihood($minLikelihood)
    ->setLimits($limits);

// Construct the storage config object
$storageConfig = (new StorageConfig())
    ->setDatastoreOptions($datastoreOptions);

// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
    ->setTopic($topic->name());

$action = (new Action())
    ->setPubSub($pubSubAction);

// Construct inspect job config to run
$inspectJob = (new InspectJobConfig())
    ->setInspectConfig($inspectConfig)
    ->setStorageConfig($storageConfig)
    ->setActions([$action]);

// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);

// Submit request
$parent = "projects/$callingProjectId/locations/global";
$job = $dlp->createDlpJob($parent, [
    'inspectJob' => $inspectJob
]);

// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
    foreach ($subscription->pull() as $message) {
        if (isset($message->attributes()['DlpJobName']) &&
            $message->attributes()['DlpJobName'] === $job->getName()) {
            $subscription->acknowledge($message);
            // Get the updated job. Loop to avoid race condition with DLP API.
            do {
                $job = $dlp->getDlpJob($job->getName());
            } while ($job->getState() == JobState::RUNNING);
            break 2; // break from parent do while
        }
    }
    printf('Waiting for job to complete' . PHP_EOL);
    // Exponential backoff with max delay of 60 seconds
    sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout

// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
    case JobState::DONE:
        $infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
        if (count($infoTypeStats) === 0) {
            print('No findings.' . PHP_EOL);
        } else {
            foreach ($infoTypeStats as $infoTypeStat) {
                printf('  Found %s instance(s) of infoType %s' . PHP_EOL, $infoTypeStat->getCount(), $infoTypeStat->getInfoType()->getName());
            }
        }
        break;
    case JobState::FAILED:
        printf('Job %s had errors:' . PHP_EOL, $job->getName());
        $errors = $job->getErrors();
        foreach ($errors as $error) {
            var_dump($error->getDetails());
        }
        break;
    case JobState::PENDING:
        printf('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
        break;
    default:
        print('Unexpected job state.');
}

Python

如需了解如何安装和使用 Cloud DLP 客户端库,请参阅 Cloud DLP 客户端库

def inspect_datastore(
    project,
    datastore_project,
    kind,
    topic_id,
    subscription_id,
    info_types,
    custom_dictionaries=None,
    custom_regexes=None,
    namespace_id=None,
    min_likelihood=None,
    max_findings=None,
    timeout=300,
):
    """Uses the Data Loss Prevention API to analyze Datastore data.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        datastore_project: The Google Cloud project id of the target Datastore.
        kind: The kind of the Datastore entity to inspect, e.g. 'Person'.
        topic_id: The id of the Cloud Pub/Sub topic to which the API will
            broadcast job completion. The topic must already exist.
        subscription_id: The id of the Cloud Pub/Sub subscription to listen on
            while waiting for job completion. The subscription must already
            exist and be subscribed to the topic.
        info_types: A list of strings representing info types to look for.
            A full list of info type categories can be fetched from the API.
        namespace_id: The namespace of the Datastore document, if applicable.
        min_likelihood: A string representing the minimum likelihood threshold
            that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
            'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
        max_findings: The maximum number of findings to report; 0 = no maximum.
        timeout: The number of seconds to wait for a response from the API.
    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Import the client library.
    import google.cloud.dlp

    # This sample additionally uses Cloud Pub/Sub to receive results from
    # potentially long-running operations.
    import google.cloud.pubsub

    # This sample also uses threading.Event() to wait for the job to finish.
    import threading

    # Instantiate a client.
    dlp = google.cloud.dlp_v2.DlpServiceClient()

    # Prepare info_types by converting the list of strings into a list of
    # dictionaries (protos are also accepted).
    if not info_types:
        info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
    info_types = [{"name": info_type} for info_type in info_types]

    # Prepare custom_info_types by parsing the dictionary word lists and
    # regex patterns.
    if custom_dictionaries is None:
        custom_dictionaries = []
    dictionaries = [
        {
            "info_type": {"name": "CUSTOM_DICTIONARY_{}".format(i)},
            "dictionary": {"word_list": {"words": custom_dict.split(",")}},
        }
        for i, custom_dict in enumerate(custom_dictionaries)
    ]
    if custom_regexes is None:
        custom_regexes = []
    regexes = [
        {
            "info_type": {"name": "CUSTOM_REGEX_{}".format(i)},
            "regex": {"pattern": custom_regex},
        }
        for i, custom_regex in enumerate(custom_regexes)
    ]
    custom_info_types = dictionaries + regexes

    # Construct the configuration dictionary. Keys which are None may
    # optionally be omitted entirely.
    inspect_config = {
        "info_types": info_types,
        "custom_info_types": custom_info_types,
        "min_likelihood": min_likelihood,
        "limits": {"max_findings_per_request": max_findings},
    }

    # Construct a storage_config containing the target Datastore info.
    storage_config = {
        "datastore_options": {
            "partition_id": {
                "project_id": datastore_project,
                "namespace_id": namespace_id,
            },
            "kind": {"name": kind},
        }
    }

    # Convert the project id into full resource ids.
    topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
    parent = f"projects/{project}/locations/global"

    # Tell the API where to send a notification when the job is complete.
    actions = [{"pub_sub": {"topic": topic}}]

    # Construct the inspect_job, which defines the entire inspect content task.
    inspect_job = {
        "inspect_config": inspect_config,
        "storage_config": storage_config,
        "actions": actions,
    }

    operation = dlp.create_dlp_job(
        request={"parent": parent, "inspect_job": inspect_job}
    )
    print("Inspection operation started: {}".format(operation.name))

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_id)

    # Set up a callback to acknowledge a message. This closes around an event
    # so that it can signal that it is done and the main thread can continue.
    job_done = threading.Event()

    def callback(message):
        try:
            if message.attributes["DlpJobName"] == operation.name:
                # This is the message we're looking for, so acknowledge it.
                message.ack()

                # Now that the job is done, fetch the results and print them.
                job = dlp.get_dlp_job(request={"name": operation.name})
                if job.inspect_details.result.info_type_stats:
                    for finding in job.inspect_details.result.info_type_stats:
                        print(
                            "Info type: {}; Count: {}".format(
                                finding.info_type.name, finding.count
                            )
                        )
                else:
                    print("No findings.")

                # Signal to the main thread that we can exit.
                job_done.set()
            else:
                # This is not the message we're looking for.
                message.drop()
        except Exception as e:
            # Because this is executing in a thread, an exception won't be
            # noted unless we print it manually.
            print(e)
            raise

    # Register the callback and wait on the event.
    subscriber.subscribe(subscription_path, callback=callback)

    finished = job_done.wait(timeout=timeout)
    if not finished:
        print(
            "No event received before the timeout. Please verify that the "
            "subscription provided is subscribed to the topic provided."
        )

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器