如需正确管理存储区中存储的敏感数据,首先需要进行存储分类:确定存储区中敏感数据的位置、敏感数据的类型以及敏感数据的使用方法。这些信息可以帮助您正确设置访问控制和共享权限,并且可以作为持续监控计划的一部分。
敏感数据保护可以检测存储在 Cloud Storage 位置、Datastore 种类或 BigQuery 表中的敏感数据并对其进行分类。扫描 Cloud Storage 位置中的文件时,敏感数据保护支持扫描二进制文件、文本、图片、Microsoft Word、Microsoft Excel、Microsoft Powerpoint、PDF 和 Apache Avro 文件。无法识别类型的文件将作为二进制文件进行扫描。如需详细了解支持的文件类型,请参阅支持的文件类型。
如需检查存储空间和数据库中是否存在敏感数据,请指定数据的位置以及敏感数据保护应查找的敏感数据类型。敏感数据保护会启动一个作业,以检查给定位置的数据,然后提供在内容中找到的 infoTypes 的详细信息、可能性值等。
您可以通过以下两种方式设置存储空间和数据库检查:在 Google Cloud 控制台中使用敏感数据保护功能,通过 RESTful DLP API 进行检查,或以编程方式使用采用多种语言的敏感数据保护客户端库进行检查。
本主题包含以下内容:
- 设置 Google Cloud 存储区和数据库扫描的最佳做法。
- 有关使用 Google Cloud 控制台中的敏感数据保护设置检查扫描以及(可选)安排定期重复检查扫描的说明。
- 每种 Google Cloud 存储代码库类型的 JSON 和代码示例:(Cloud Storage、Datastore 模式的 Firestore (Datastore) 和 BigQuery)。
- 扫描作业配置选项的详细概览。
- 有关如何对每个成功的请求生成的扫描结果进行检索以及创建的扫描作业进行管理的说明。
最佳做法
确定扫描范围和优先级
请务必先评估您的资源并指定哪些资源的扫描优先级最高。刚开始时,可能有大量积压的数据需要分类,而且无法立即扫描所有数据。首先选择潜在风险最高的数据,例如经常访问、广泛访问或未知的数据。
确保敏感数据保护可以访问您的数据
敏感数据保护必须能够访问要扫描的数据。请确保敏感数据保护服务帐号有权读取您的资源。
限制首次扫描的范围
为达到最佳效果,请限制前几项作业的范围,而不是扫描所有数据。从一个表、一个存储桶或几个文件开始,同时使用抽样。通过限制首次扫描的范围,您可以更好地确定要启用哪些检测器以及可能需要哪些排除规则来减少假正例,让您的发现更有意义。如果您不需要所有 infoType,请避免启用所有 infoType,因为误报或无法使用的结果可能会让评估风险变得更加困难。虽然在某些情况下很有用,但像 DATE
、TIME
、DOMAIN_NAME
和 URL
这样的 infoType 会匹配广泛的结果,可能不适用于大型数据扫描。
对结构化文件(例如 CSV、TSV 或 Avro 文件)进行采样时,请确保样本大小足够大,以覆盖文件的完整标题和一行数据。如需了解详情,请参阅在结构化解析模式下扫描结构化文件。
安排扫描时间
使用敏感数据保护作业触发器每天、每周或每季度自动运行扫描并生成发现结果。这些扫描还可以配置为仅检查自上次扫描以来发生更改的数据,从而节省时间并降低费用。定期运行扫描可帮助您识别扫描结果中的趋势或异常值。
作业延迟时间
对于作业和作业触发器,无法保证服务等级目标 (SLO)。延迟时间受多种因素影响,包括要扫描的数据量、要扫描的存储代码库、要扫描的 infoType 的类型和数量、处理作业的区域,以及该区域中可用的计算资源。因此,无法提前确定检查作业的延迟时间。
如需帮助减少作业延迟时间,您可以尝试以下方法:
- 如果作业或作业触发器可以使用采样,请启用它。
避免启用不需要的 infoType。虽然以下请求在某些情况下很有用,但与不包含这些 infoType 的请求相比,这些 infoType 的运行速度可能会慢得多:
PERSON_NAME
FEMALE_NAME
MALE_NAME
FIRST_NAME
LAST_NAME
DATE_OF_BIRTH
LOCATION
STREET_ADDRESS
ORGANIZATION_NAME
始终明确指定 infoType。请勿使用空的 infoType 列表。
如果可能,请使用其他处理区域。
如果您在尝试这些方法后仍然遇到作业的延迟问题,请考虑使用 content.inspect
或 content.deidentify
请求,而不是作业。这些方法在《服务等级协议》涵盖范围内。如需了解详情,请参阅敏感数据保护服务等级协议。
准备工作
本主题提供的说明做了以下假设:
您已启用结算功能。
您已启用敏感数据保护。
存储分类需要以下 OAuth 范围:https://www.googleapis.com/auth/cloud-platform
。如需了解详情,请参阅向 DLP API 进行身份验证。
检查 Cloud Storage 位置
您可以使用 Google Cloud 控制台、 DLP API(通过 REST 或 RPC 请求)或采用多种语言以编程方式使用客户端库对 Cloud Storage 位置设置敏感数据保护检查。如需了解以下 JSON 和代码示例中包含的参数,请参阅本主题后面的配置存储空间检查。
敏感数据保护依靠文件扩展名和媒体 (MIME) 类型来识别要扫描的文件类型和要应用的扫描模式。例如,敏感数据保护会以纯文本模式扫描 .txt
文件,即使该文件采用 CSV 格式(通常在结构化解析模式下进行扫描)也是如此。
如需设置使用敏感数据保护的 Cloud Storage 存储桶扫描作业,请执行以下操作:
控制台
本部分介绍如何检查 Cloud Storage 存储桶或文件夹。如果您还希望敏感数据保护创建数据的去标识化副本,请参阅使用 Google Cloud 控制台对 Cloud Storage 中存储的敏感数据进行去标识化。
在 Google Cloud 控制台的“敏感数据保护”部分中,前往创建作业或作业触发器页面。
输入敏感数据保护作业信息,然后点击继续以完成每个步骤:
对于第 1 步:选择输入数据,请在名称字段中输入值,为作业命名。在位置中,从存储类型菜单中选择 Cloud Storage,然后输入要扫描的数据的位置。系统已预先配置采样部分,以便针对您的数据运行示例扫描。如果您有大量数据,则可以调整存储分区内要扫描的对象百分比字段以节省资源。如需了解详情,请参阅选择输入数据。
(可选)对于第 2 步:配置检测,您可以配置要查找的数据类型,称为 infoTypes。您可以从预定义的 infoType 列表中进行选择,也可以选择已有的模板(如有)。如需了解详情,请参阅配置检测。
(可选)对于第 3 步:添加操作,确保通过电子邮件发送通知已启用。
启用保存到 BigQuery 以将敏感数据保护发现结果发布到 BigQuery 表。提供以下信息:
- 对于项目 ID,请输入用于存储结果的项目的 ID。
- 对于数据集 ID,请输入用于存储结果的数据集的名称。
- (可选)对于表格 ID,请输入用于存储结果的表格的名称。如果未指定表格 ID,系统会为新表格分配类似于
dlp_googleapis_[DATE]_1234567890
这样的默认名称,其中[DATE]
表示运行扫描的日期。如果您指定的是现有的表格,则系统会将发现结果附加到其中。 - (可选)启用包含引用以包含与 infoType 检测器匹配的字符串。引号可能属于敏感数据,因此默认情况下,敏感数据保护不会将其包含在发现结果中。
将数据写入 BigQuery 表时,结算和配额用量将应用于目标表所属的项目。
如果您想要创建数据的去标识化副本,请启用制作去标识化副本。如需了解详情,请参阅使用 Google Cloud 控制台对存储在 Cloud Storage 中的敏感数据进行去标识化。
您还可以将结果保存到 Pub/Sub、Security Command Center、Data Catalog 和 Cloud Monitoring。如需了解详情,请参阅添加操作。
(可选)对于第 4 步:时间安排,如需仅运行一次扫描,请将菜单设置为无。如需安排定期运行扫描,请点击创建一个触发器来定期运行作业。如需了解详情,请参阅时间安排。
点击创建。
敏感数据保护作业完成后,您将被重定向到作业详情页面,并通过电子邮件通知您。您可以在作业详情页面上查看检查结果。
(可选)如果您选择将敏感数据保护发现结果发布到 BigQuery,请在作业详情页面上点击在 BigQuery 中查看发现结果,以在 BigQuery 网页界面中打开相应表格。然后,您可以查询该表并分析发现结果。如需详细了解如何在 BigQuery 中查询结果,请参阅在 BigQuery 中查询敏感数据保护发现结果。
协议
以下是可以通过 POST 请求发送到指定的敏感数据保护 REST 端点的 JSON 示例。此 JSON 示例演示了如何使用 DLP API 检查 Cloud Storage 存储分区。如需了解请求中包含的参数,请参阅本主题后面的配置存储空间检查。
您可以在 API Explorer 的 content.inspect
参考页面上快速尝试此操作:
请注意,如果请求成功(即使是在 APIs Explorer 中),就会创建一个新的扫描作业。如需了解如何控制扫描作业,请参阅本主题后面的检索检查结果。如需了解有关如何使用 JSON 将请求发送到 DLP API 的一般信息,请参阅 JSON 快速入门。
JSON 输入:
POST https://dlp.googleapis.com/v2/projects/[PROJECT-ID]/dlpJobs?key={YOUR_API_KEY}
{
"inspectJob":{
"storageConfig":{
"cloudStorageOptions":{
"fileSet":{
"url":"gs://[BUCKET-NAME]/*"
},
"bytesLimitPerFile":"1073741824"
},
"timespanConfig":{
"startTime":"2017-11-13T12:34:29.965633345Z",
"endTime":"2018-01-05T04:45:04.240912125Z"
}
},
"inspectConfig":{
"infoTypes":[
{
"name":"PHONE_NUMBER"
}
],
"excludeInfoTypes":false,
"includeQuote":true,
"minLikelihood":"LIKELY"
},
"actions":[
{
"saveFindings":{
"outputConfig":{
"table":{
"projectId":"[PROJECT-ID]",
"datasetId":"[DATASET-ID]"
}
}
}
}
]
}
}
JSON 输出:
{
"name":"projects/[PROJECT-ID]/dlpJobs/[JOB-ID]",
"type":"INSPECT_JOB",
"state":"PENDING",
"inspectDetails":{
"requestedOptions":{
"snapshotInspectTemplate":{
},
"jobConfig":{
"storageConfig":{
"cloudStorageOptions":{
"fileSet":{
"url":"gs://[BUCKET-NAME]/*"
},
"bytesLimitPerFile":"1073741824"
},
"timespanConfig":{
"startTime":"2017-11-13T12:34:29.965633345Z",
"endTime":"2018-01-05T04:45:04.240912125Z"
}
},
"inspectConfig":{
"infoTypes":[
{
"name":"PHONE_NUMBER"
}
],
"minLikelihood":"LIKELY",
"limits":{
},
"includeQuote":true
},
"actions":[
{
"saveFindings":{
"outputConfig":{
"table":{
"projectId":"[PROJECT-ID]",
"datasetId":"[DATASET-ID]",
"tableId":"[NEW-TABLE-ID]"
}
}
}
}
]
}
}
},
"createTime":"2018-11-07T18:01:14.225Z"
}
Java
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CloudStorageOptions;
import com.google.privacy.dlp.v2.CloudStorageOptions.FileSet;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InspectGcsFile {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt";
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectGcsFile(projectId, gcsUri, topicId, subscriptionId);
}
// Inspects a file in a Google Cloud Storage Bucket.
public static void inspectGcsFile(
String projectId, String gcsUri, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlp = DlpServiceClient.create()) {
// Specify the GCS file to be inspected.
CloudStorageOptions cloudStorageOptions =
CloudStorageOptions.newBuilder().setFileSet(FileSet.newBuilder().setUrl(gcsUri)).build();
StorageConfig storageConfig =
StorageConfig.newBuilder().setCloudStorageOptions(cloudStorageOptions).build();
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
List<InfoType> infoTypes =
Stream.of("PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD_NUMBER")
.map(it -> InfoType.newBuilder().setName(it).build())
.collect(Collectors.toList());
// Specify how the content should be inspected.
InspectConfig inspectConfig =
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();
// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the long running job we want the service to perform.
InspectJobConfig inspectJobConfig =
InspectJobConfig.newBuilder()
.setStorageConfig(storageConfig)
.setInspectConfig(inspectConfig)
.addActions(action)
.build();
// Create the request for the job configured above.
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setInspectJob(inspectJobConfig)
.build();
// Use the client to send the request.
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);
// Parse the response and process results.
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
System.out.println("Findings: ");
for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
System.out.println("\tCount: " + infoTypeStat.getCount());
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The name of the bucket where the file resides.
// const bucketName = 'YOUR-BUCKET';
// The path to the file within the bucket to inspect.
// Can contain wildcards, e.g. "my-image.*"
// const fileName = 'my-image.png';
// The minimum likelihood required before returning a match
// const minLikelihood = 'LIKELIHOOD_UNSPECIFIED';
// The maximum number of findings to report per request (0 = server maximum)
// const maxFindings = 0;
// The infoTypes of information to match
// const infoTypes = [{ name: 'PHONE_NUMBER' }, { name: 'EMAIL_ADDRESS' }, { name: 'CREDIT_CARD_NUMBER' }];
// The customInfoTypes of information to match
// const customInfoTypes = [{ infoType: { name: 'DICT_TYPE' }, dictionary: { wordList: { words: ['foo', 'bar', 'baz']}}},
// { infoType: { name: 'REGEX_TYPE' }, regex: {pattern: '\\(\\d{3}\\) \\d{3}-\\d{4}'}}];
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
async function inspectGCSFile() {
// Get reference to the file to be inspected
const storageItem = {
cloudStorageOptions: {
fileSet: {url: `gs://${bucketName}/${fileName}`},
},
};
// Construct request for creating an inspect job
const request = {
parent: `projects/${projectId}/locations/global`,
inspectJob: {
inspectConfig: {
infoTypes: infoTypes,
customInfoTypes: customInfoTypes,
minLikelihood: minLikelihood,
limits: {
maxFindingsPerRequest: maxFindings,
},
},
storageConfig: storageItem,
actions: [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
],
},
};
// Create a GCS File inspection job and wait for it to complete
const [topicResponse] = await pubsub.topic(topicId).get();
// Verify the Pub/Sub topic and listen for job notifications via an
// existing subscription.
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
// Get the job's ID
const jobName = jobsResponse.name;
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
setTimeout(() => {
console.log('Waiting for DLP job to fully complete');
}, 500);
const [job] = await dlp.getDlpJob({name: jobName});
console.log(`Job ${job.name} status: ${job.state}`);
const infoTypeStats = job.inspectDetails.result.infoTypeStats;
if (infoTypeStats.length > 0) {
infoTypeStats.forEach(infoTypeStat => {
console.log(
` Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
);
});
} else {
console.log('No findings.');
}
}
await inspectGCSFile();
Python
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import threading
from typing import List, Optional
import google.cloud.dlp
import google.cloud.pubsub
def inspect_gcs_file(
project: str,
bucket: str,
filename: str,
topic_id: str,
subscription_id: str,
info_types: List[str],
custom_dictionaries: List[str] = None,
custom_regexes: List[str] = None,
min_likelihood: Optional[str] = None,
max_findings: Optional[int] = None,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to analyze a file on GCS.
Args:
project: The Google Cloud project id to use as a parent resource.
bucket: The name of the GCS bucket containing the file, as a string.
filename: The name of the file in the bucket, including the path, as a
string; e.g. 'images/myfile.png'.
topic_id: The id of the Cloud Pub/Sub topic to which the API will
broadcast job completion. The topic must already exist.
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
while waiting for job completion. The subscription must already
exist and be subscribed to the topic.
info_types: A list of strings representing info types to look for.
A full list of info type categories can be fetched from the API.
min_likelihood: A string representing the minimum likelihood threshold
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
max_findings: The maximum number of findings to report; 0 = no maximum.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Prepare info_types by converting the list of strings into a list of
# dictionaries (protos are also accepted).
if not info_types:
info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
info_types = [{"name": info_type} for info_type in info_types]
# Prepare custom_info_types by parsing the dictionary word lists and
# regex patterns.
if custom_dictionaries is None:
custom_dictionaries = []
dictionaries = [
{
"info_type": {"name": f"CUSTOM_DICTIONARY_{i}"},
"dictionary": {"word_list": {"words": custom_dict.split(",")}},
}
for i, custom_dict in enumerate(custom_dictionaries)
]
if custom_regexes is None:
custom_regexes = []
regexes = [
{
"info_type": {"name": f"CUSTOM_REGEX_{i}"},
"regex": {"pattern": custom_regex},
}
for i, custom_regex in enumerate(custom_regexes)
]
custom_info_types = dictionaries + regexes
# Construct the configuration dictionary. Keys which are None may
# optionally be omitted entirely.
inspect_config = {
"info_types": info_types,
"custom_info_types": custom_info_types,
"min_likelihood": min_likelihood,
"limits": {"max_findings_per_request": max_findings},
}
# Construct a storage_config containing the file's URL.
url = f"gs://{bucket}/{filename}"
storage_config = {"cloud_storage_options": {"file_set": {"url": url}}}
# Convert the project id into full resource ids.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
parent = f"projects/{project}/locations/global"
# Tell the API where to send a notification when the job is complete.
actions = [{"pub_sub": {"topic": topic}}]
# Construct the inspect_job, which defines the entire inspect content task.
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print(f"Inspection operation started: {operation.name}")
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
# Set up a callback to acknowledge a message. This closes around an event
# so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
try:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
if job.inspect_details.result.info_type_stats:
for finding in job.inspect_details.result.info_type_stats:
print(
f"Info type: {finding.info_type.name}; Count: {finding.count}"
)
else:
print("No findings.")
# Signal to the main thread that we can exit.
job_done.set()
else:
# This is not the message we're looking for.
message.drop()
except Exception as e:
# Because this is executing in a thread, an exception won't be
# noted unless we print it manually.
print(e)
raise
subscriber.subscribe(subscription_path, callback=callback)
finished = job_done.wait(timeout=timeout)
if not finished:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
Go
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"strings"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectGCSFile searches for the given info types in the given file.
func inspectGCSFile(w io.Writer, projectID string, infoTypeNames []string, customDictionaries []string, customRegexes []string, pubSubTopic, pubSubSub, bucketName, fileName string) error {
// projectID := "my-project-id"
// infoTypeNames := []string{"US_SOCIAL_SECURITY_NUMBER"}
// customDictionaries := []string{...}
// customRegexes := []string{...}
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// bucketName := "my-bucket"
// fileName := "my-file.txt"
ctx := context.Background()
client, err := dlp.NewClient(ctx)
if err != nil {
return fmt.Errorf("dlp.NewClient: %w", err)
}
// Convert the info type strings to a list of InfoTypes.
var infoTypes []*dlppb.InfoType
for _, it := range infoTypeNames {
infoTypes = append(infoTypes, &dlppb.InfoType{Name: it})
}
// Convert the custom dictionary word lists and custom regexes to a list of CustomInfoTypes.
var customInfoTypes []*dlppb.CustomInfoType
for idx, it := range customDictionaries {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_DICTIONARY_%d", idx),
},
Type: &dlppb.CustomInfoType_Dictionary_{
Dictionary: &dlppb.CustomInfoType_Dictionary{
Source: &dlppb.CustomInfoType_Dictionary_WordList_{
WordList: &dlppb.CustomInfoType_Dictionary_WordList{
Words: strings.Split(it, ","),
},
},
},
},
})
}
for idx, it := range customRegexes {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_REGEX_%d", idx),
},
Type: &dlppb.CustomInfoType_Regex_{
Regex: &dlppb.CustomInfoType_Regex{
Pattern: it,
},
},
})
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(pubSubTopic)
if exists, err := t.Exists(ctx); err != nil {
return fmt.Errorf("t.Exists: %w", err)
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
return fmt.Errorf("CreateTopic: %w", err)
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(pubSubSub)
if exists, err := s.Exists(ctx); err != nil {
return fmt.Errorf("s.Exists: %w", err)
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return fmt.Errorf("CreateSubscription: %w", err)
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + pubSubTopic
// Create a configured request.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: &dlppb.InspectJobConfig{
// StorageConfig describes where to find the data.
StorageConfig: &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_CloudStorageOptions{
CloudStorageOptions: &dlppb.CloudStorageOptions{
FileSet: &dlppb.CloudStorageOptions_FileSet{
Url: "gs://" + bucketName + "/" + fileName,
},
},
},
},
// InspectConfig describes what fields to look for.
InspectConfig: &dlppb.InspectConfig{
InfoTypes: infoTypes,
CustomInfoTypes: customInfoTypes,
MinLikelihood: dlppb.Likelihood_POSSIBLE,
Limits: &dlppb.InspectConfig_FindingLimits{
MaxFindingsPerRequest: 10,
},
IncludeQuote: true,
},
// Send a message to PubSub using Actions.
Actions: []*dlppb.Action{
{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
},
},
},
},
}
// Create the inspect job.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return fmt.Errorf("CreateDlpJob: %w", err)
}
fmt.Fprintf(w, "Created job: %v\n", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "Cloud not get job: %v", err)
return
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
}
for _, s := range r {
fmt.Fprintf(w, " Found %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
})
if err != nil {
return fmt.Errorf("Receive: %w", err)
}
return nil
}
PHP
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CloudStorageOptions;
use Google\Cloud\Dlp\V2\CloudStorageOptions\FileSet;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\InspectConfig\FindingLimits;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\Dlp\V2\Likelihood;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Inspect a file stored on Google Cloud Storage , using Pub/Sub for job status notifications.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $bucketId The name of the bucket where the file resides
* @param string $file The path to the file within the bucket to inspect. Can contain wildcards e.g. "my-image.*"
* @param int $maxFindings (Optional) The maximum number of findings to report per request (0 = server maximum)
*/
function inspect_gcs(
string $callingProjectId,
string $topicId,
string $subscriptionId,
string $bucketId,
string $file,
int $maxFindings = 0
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// The infoTypes of information to match
$personNameInfoType = (new InfoType())
->setName('PERSON_NAME');
$creditCardNumberInfoType = (new InfoType())
->setName('CREDIT_CARD_NUMBER');
$infoTypes = [$personNameInfoType, $creditCardNumberInfoType];
// The minimum likelihood required before returning a match
$minLikelihood = likelihood::LIKELIHOOD_UNSPECIFIED;
// Specify finding limits
$limits = (new FindingLimits())
->setMaxFindingsPerRequest($maxFindings);
// Construct items to be inspected
$fileSet = (new FileSet())
->setUrl('gs://' . $bucketId . '/' . $file);
$cloudStorageOptions = (new CloudStorageOptions())
->setFileSet($fileSet);
$storageConfig = (new StorageConfig())
->setCloudStorageOptions($cloudStorageOptions);
// Construct the inspect config object
$inspectConfig = (new InspectConfig())
->setMinLikelihood($minLikelihood)
->setLimits($limits)
->setInfoTypes($infoTypes);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct inspect job config to run
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setInspectJob($inspectJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
print('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(' Found %s instance(s) of infoType %s' . PHP_EOL, $infoTypeStat->getCount(), $infoTypeStat->getInfoType()->getName());
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
C#
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;
public class InspectGoogleCloudStorage
{
public static DlpJob InspectGCS(
string projectId,
Likelihood minLikelihood,
int maxFindings,
bool includeQuote,
IEnumerable<InfoType> infoTypes,
IEnumerable<CustomInfoType> customInfoTypes,
string bucketName,
string topicId,
string subscriptionId)
{
var inspectJob = new InspectJobConfig
{
StorageConfig = new StorageConfig
{
CloudStorageOptions = new CloudStorageOptions
{
FileSet = new CloudStorageOptions.Types.FileSet { Url = $"gs://{bucketName}/*.txt" },
BytesLimitPerFile = 1073741824
},
},
InspectConfig = new InspectConfig
{
InfoTypes = { infoTypes },
CustomInfoTypes = { customInfoTypes },
ExcludeInfoTypes = false,
IncludeQuote = includeQuote,
Limits = new FindingLimits
{
MaxFindingsPerRequest = maxFindings
},
MinLikelihood = minLikelihood
},
Actions =
{
new Google.Cloud.Dlp.V2.Action
{
// Send results to Pub/Sub topic
PubSub = new Google.Cloud.Dlp.V2.Action.Types.PublishToPubSub
{
Topic = topicId,
}
}
}
};
// Issue Create Dlp Job Request
var client = DlpServiceClient.Create();
var request = new CreateDlpJobRequest
{
InspectJob = inspectJob,
Parent = new LocationName(projectId, "global").ToString(),
};
// We need created job name
var dlpJob = client.CreateDlpJob(request);
// Get a pub/sub subscription and listen for DLP results
var fireEvent = new ManualResetEventSlim();
var subscriptionName = new SubscriptionName(projectId, subscriptionId);
var subscriber = SubscriberClient.CreateAsync(subscriptionName).Result;
subscriber.StartAsync(
(pubSubMessage, cancellationToken) =>
{
// Given a message that we receive on this subscription, we should either acknowledge or decline it
if (pubSubMessage.Attributes["DlpJobName"] == dlpJob.Name)
{
fireEvent.Set();
return Task.FromResult(SubscriberClient.Reply.Ack);
}
return Task.FromResult(SubscriberClient.Reply.Nack);
});
// We block here until receiving a signal from a separate thread that is waiting on a message indicating receiving a result of Dlp job
if (fireEvent.Wait(TimeSpan.FromMinutes(1)))
{
// Stop the thread that is listening to messages as a result of StartAsync call earlier
subscriber.StopAsync(CancellationToken.None).Wait();
// Now we can inspect full job results
var job = client.GetDlpJob(new GetDlpJobRequest { DlpJobName = new DlpJobName(projectId, dlpJob.Name) });
// Inspect Job details
Console.WriteLine($"Processed bytes: {job.InspectDetails.Result.ProcessedBytes}");
Console.WriteLine($"Total estimated bytes: {job.InspectDetails.Result.TotalEstimatedBytes}");
var stats = job.InspectDetails.Result.InfoTypeStats;
Console.WriteLine("Found stats:");
foreach (var stat in stats)
{
Console.WriteLine($"{stat.InfoType.Name}");
}
return job;
}
throw new InvalidOperationException("The wait failed on timeout");
}
}
检查 Datastore 种类
您可以使用 Google Cloud 控制台、通过 REST 或 RPC 请求使用 DLP API,或者采用多种语言以编程方式使用客户端库来设置 Datastore 种类检查。
如需设置使用敏感数据保护的 Datastore 种类扫描作业,请执行以下操作:
控制台
如需设置使用敏感数据保护的 Datastore 种类扫描作业,请执行以下操作:
在 Google Cloud 控制台的“敏感数据保护”部分中,前往创建作业或作业触发器页面。
输入敏感数据保护作业信息,然后点击继续以完成各个步骤:
对于第 1 步:选择输入数据,输入要扫描的项目、命名空间(可选)和种类的标识符。如需了解详情,请参阅选择输入数据。
(可选)对于第 2 步:配置检测,您可以配置要查找的数据类型,称为 infoTypes。您可以从预定义的 infoType 列表中进行选择,也可以选择已有的模板(如有)。如需了解详情,请参阅配置检测。
(可选)对于第 3 步:添加操作,确保通过电子邮件发送通知已启用。
启用保存到 BigQuery 以将敏感数据保护发现结果发布到 BigQuery 表。提供以下信息:
- 对于项目 ID,请输入用于存储结果的项目的 ID。
- 对于数据集 ID,请输入用于存储结果的数据集的名称。
- (可选)对于表格 ID,请输入用于存储结果的表格的名称。如果未指定表格 ID,系统会为新表格分配类似于
dlp_googleapis_[DATE]_1234567890
这样的默认名称。如果您指定的是现有的表格,则系统会将发现结果附加到其中。
将数据写入 BigQuery 表时,结算和配额用量将应用于目标表所属的项目。
如需详细了解列出的其他操作,请参阅添加操作。
(可选)对于第 4 步:时间安排,您可以选择指定时间范围或创建一个触发器来定期运行作业以配置时间范围或时间安排。如需了解详情,请参阅时间安排。
点击创建。
敏感数据保护作业完成后,系统会将您重定向到作业详情页面,并通过电子邮件通知您。您可以在作业详情页面上查看检查结果。
(可选)如果您选择将敏感数据保护发现结果发布到 BigQuery,请在作业详情页面上点击在 BigQuery 中查看发现结果,以在 BigQuery 网页界面中打开相应表格。您可以随后对表格执行查询,并分析发现结果。如需详细了解如何在 BigQuery 中查询结果,请参阅在 BigQuery 中查询敏感数据保护发现结果。
协议
以下是可在 POST 请求中发送到指定 DLP API REST 端点的 JSON 示例。此 JSON 示例演示了如何使用 DLP API 检查 Datastore 种类。如需了解请求中包含的参数,请参阅本主题后面的配置存储空间检查。
您可以在 API Explorer 的 dlpJobs.create
参考页面上快速尝试此操作:
请注意,如果请求成功(即使是在 APIs Explorer 中),就会创建一个新的扫描作业。如需了解如何控制扫描作业,请参阅本主题后面的检索检查结果。如需了解有关如何使用 JSON 将请求发送到 DLP API 的一般信息,请参阅 JSON 快速入门。
JSON 输入:
POST https://dlp.googleapis.com/v2/projects/[PROJECT-ID]/dlpJobs?key={YOUR_API_KEY}
{
"inspectJob":{
"storageConfig":{
"datastoreOptions":{
"kind":{
"name":"Example-Kind"
},
"partitionId":{
"namespaceId":"[NAMESPACE-ID]",
"projectId":"[PROJECT-ID]"
}
}
},
"inspectConfig":{
"infoTypes":[
{
"name":"PHONE_NUMBER"
}
],
"excludeInfoTypes":false,
"includeQuote":true,
"minLikelihood":"LIKELY"
},
"actions":[
{
"saveFindings":{
"outputConfig":{
"table":{
"projectId":"[PROJECT-ID]",
"datasetId":"[BIGQUERY-DATASET-NAME]",
"tableId":"[BIGQUERY-TABLE-NAME]"
}
}
}
}
]
}
}
Java
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DatastoreOptions;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.KindExpression;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.PartitionId;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InspectDatastoreEntity {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String datastoreNamespace = "your-datastore-namespace";
String datastoreKind = "your-datastore-kind";
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
insepctDatastoreEntity(projectId, datastoreNamespace, datastoreKind, topicId, subscriptionId);
}
// Inspects a Datastore Entity.
public static void insepctDatastoreEntity(
String projectId,
String datastoreNamespce,
String datastoreKind,
String topicId,
String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlp = DlpServiceClient.create()) {
// Specify the Datastore entity to be inspected.
PartitionId partitionId =
PartitionId.newBuilder()
.setProjectId(projectId)
.setNamespaceId(datastoreNamespce)
.build();
KindExpression kindExpression = KindExpression.newBuilder().setName(datastoreKind).build();
DatastoreOptions datastoreOptions =
DatastoreOptions.newBuilder().setKind(kindExpression).setPartitionId(partitionId).build();
StorageConfig storageConfig =
StorageConfig.newBuilder().setDatastoreOptions(datastoreOptions).build();
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
List<InfoType> infoTypes =
Stream.of("PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD_NUMBER")
.map(it -> InfoType.newBuilder().setName(it).build())
.collect(Collectors.toList());
// Specify how the content should be inspected.
InspectConfig inspectConfig =
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();
// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the long running job we want the service to perform.
InspectJobConfig inspectJobConfig =
InspectJobConfig.newBuilder()
.setStorageConfig(storageConfig)
.setInspectConfig(inspectConfig)
.addActions(action)
.build();
// Create the request for the job configured above.
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setInspectJob(inspectJobConfig)
.build();
// Use the client to send the request.
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);
// Parse the response and process results.
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
System.out.println("Findings: ");
for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
System.out.println("\tCount: " + infoTypeStat.getCount());
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the target Datastore is stored under
// This may or may not equal the calling project ID
// const dataProjectId = 'my-project';
// (Optional) The ID namespace of the Datastore document to inspect.
// To ignore Datastore namespaces, set this to an empty string ('')
// const namespaceId = '';
// The kind of the Datastore entity to inspect.
// const kind = 'Person';
// The minimum likelihood required before returning a match
// const minLikelihood = 'LIKELIHOOD_UNSPECIFIED';
// The maximum number of findings to report per request (0 = server maximum)
// const maxFindings = 0;
// The infoTypes of information to match
// const infoTypes = [{ name: 'PHONE_NUMBER' }, { name: 'EMAIL_ADDRESS' }, { name: 'CREDIT_CARD_NUMBER' }];
// The customInfoTypes of information to match
// const customInfoTypes = [{ infoType: { name: 'DICT_TYPE' }, dictionary: { wordList: { words: ['foo', 'bar', 'baz']}}},
// { infoType: { name: 'REGEX_TYPE' }, regex: {pattern: '\\(\\d{3}\\) \\d{3}-\\d{4}'}}];
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
async function inspectDatastore() {
// Construct items to be inspected
const storageItems = {
datastoreOptions: {
partitionId: {
projectId: dataProjectId,
namespaceId: namespaceId,
},
kind: {
name: kind,
},
},
};
// Construct request for creating an inspect job
const request = {
parent: `projects/${projectId}/locations/global`,
inspectJob: {
inspectConfig: {
infoTypes: infoTypes,
customInfoTypes: customInfoTypes,
minLikelihood: minLikelihood,
limits: {
maxFindingsPerRequest: maxFindings,
},
},
storageConfig: storageItems,
actions: [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
],
},
};
// Run inspect-job creation request
const [topicResponse] = await pubsub.topic(topicId).get();
// Verify the Pub/Sub topic and listen for job notifications via an
// existing subscription.
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
const jobName = jobsResponse.name;
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
// Wait for DLP job to fully complete
setTimeout(() => {
console.log('Waiting for DLP job to fully complete');
}, 500);
const [job] = await dlp.getDlpJob({name: jobName});
console.log(`Job ${job.name} status: ${job.state}`);
const infoTypeStats = job.inspectDetails.result.infoTypeStats;
if (infoTypeStats.length > 0) {
infoTypeStats.forEach(infoTypeStat => {
console.log(
` Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
);
});
} else {
console.log('No findings.');
}
}
await inspectDatastore();
Python
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import threading
from typing import List, Optional
import google.cloud.dlp
import google.cloud.pubsub
def inspect_datastore(
project: str,
datastore_project: str,
kind: str,
topic_id: str,
subscription_id: str,
info_types: List[str],
custom_dictionaries: List[str] = None,
custom_regexes: List[str] = None,
namespace_id: str = None,
min_likelihood: Optional[int] = None,
max_findings: Optional[int] = None,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to analyze Datastore data.
Args:
project: The Google Cloud project id to use as a parent resource.
datastore_project: The Google Cloud project id of the target Datastore.
kind: The kind of the Datastore entity to inspect, e.g. 'Person'.
topic_id: The id of the Cloud Pub/Sub topic to which the API will
broadcast job completion. The topic must already exist.
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
while waiting for job completion. The subscription must already
exist and be subscribed to the topic.
info_types: A list of strings representing info types to look for.
A full list of info type categories can be fetched from the API.
namespace_id: The namespace of the Datastore document, if applicable.
min_likelihood: A string representing the minimum likelihood threshold
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
max_findings: The maximum number of findings to report; 0 = no maximum.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Prepare info_types by converting the list of strings into a list of
# dictionaries (protos are also accepted).
if not info_types:
info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
info_types = [{"name": info_type} for info_type in info_types]
# Prepare custom_info_types by parsing the dictionary word lists and
# regex patterns.
if custom_dictionaries is None:
custom_dictionaries = []
dictionaries = [
{
"info_type": {"name": f"CUSTOM_DICTIONARY_{i}"},
"dictionary": {"word_list": {"words": custom_dict.split(",")}},
}
for i, custom_dict in enumerate(custom_dictionaries)
]
if custom_regexes is None:
custom_regexes = []
regexes = [
{
"info_type": {"name": f"CUSTOM_REGEX_{i}"},
"regex": {"pattern": custom_regex},
}
for i, custom_regex in enumerate(custom_regexes)
]
custom_info_types = dictionaries + regexes
# Construct the configuration dictionary. Keys which are None may
# optionally be omitted entirely.
inspect_config = {
"info_types": info_types,
"custom_info_types": custom_info_types,
"min_likelihood": min_likelihood,
"limits": {"max_findings_per_request": max_findings},
}
# Construct a storage_config containing the target Datastore info.
storage_config = {
"datastore_options": {
"partition_id": {
"project_id": datastore_project,
"namespace_id": namespace_id,
},
"kind": {"name": kind},
}
}
# Convert the project id into full resource ids.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
parent = f"projects/{project}/locations/global"
# Tell the API where to send a notification when the job is complete.
actions = [{"pub_sub": {"topic": topic}}]
# Construct the inspect_job, which defines the entire inspect content task.
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print(f"Inspection operation started: {operation.name}")
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
# Set up a callback to acknowledge a message. This closes around an event
# so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
try:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
if job.inspect_details.result.info_type_stats:
for finding in job.inspect_details.result.info_type_stats:
print(
f"Info type: {finding.info_type.name}; Count: {finding.count}"
)
else:
print("No findings.")
# Signal to the main thread that we can exit.
job_done.set()
else:
# This is not the message we're looking for.
message.drop()
except Exception as e:
# Because this is executing in a thread, an exception won't be
# noted unless we print it manually.
print(e)
raise
# Register the callback and wait on the event.
subscriber.subscribe(subscription_path, callback=callback)
finished = job_done.wait(timeout=timeout)
if not finished:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
Go
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"strings"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectDatastore searches for the given info types in the given dataset kind.
func inspectDatastore(w io.Writer, projectID string, infoTypeNames []string, customDictionaries []string, customRegexes []string, pubSubTopic, pubSubSub, dataProject, namespaceID, kind string) error {
// projectID := "my-project-id"
// infoTypeNames := []string{"US_SOCIAL_SECURITY_NUMBER"}
// customDictionaries := []string{...}
// customRegexes := []string{...}
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// namespaceID := "namespace-id"
// kind := "MyKind"
ctx := context.Background()
client, err := dlp.NewClient(ctx)
if err != nil {
return fmt.Errorf("dlp.NewClient: %w", err)
}
// Convert the info type strings to a list of InfoTypes.
var infoTypes []*dlppb.InfoType
for _, it := range infoTypeNames {
infoTypes = append(infoTypes, &dlppb.InfoType{Name: it})
}
// Convert the custom dictionary word lists and custom regexes to a list of CustomInfoTypes.
var customInfoTypes []*dlppb.CustomInfoType
for idx, it := range customDictionaries {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_DICTIONARY_%d", idx),
},
Type: &dlppb.CustomInfoType_Dictionary_{
Dictionary: &dlppb.CustomInfoType_Dictionary{
Source: &dlppb.CustomInfoType_Dictionary_WordList_{
WordList: &dlppb.CustomInfoType_Dictionary_WordList{
Words: strings.Split(it, ","),
},
},
},
},
})
}
for idx, it := range customRegexes {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_REGEX_%d", idx),
},
Type: &dlppb.CustomInfoType_Regex_{
Regex: &dlppb.CustomInfoType_Regex{
Pattern: it,
},
},
})
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(pubSubTopic)
if exists, err := t.Exists(ctx); err != nil {
return fmt.Errorf("t.Exists: %w", err)
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
return fmt.Errorf("CreateTopic: %w", err)
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(pubSubSub)
if exists, err := s.Exists(ctx); err != nil {
return fmt.Errorf("s.Exists: %w", err)
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return fmt.Errorf("CreateSubscription: %w", err)
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + pubSubTopic
// Create a configured request.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: &dlppb.InspectJobConfig{
// StorageConfig describes where to find the data.
StorageConfig: &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_DatastoreOptions{
DatastoreOptions: &dlppb.DatastoreOptions{
PartitionId: &dlppb.PartitionId{
ProjectId: dataProject,
NamespaceId: namespaceID,
},
Kind: &dlppb.KindExpression{
Name: kind,
},
},
},
},
// InspectConfig describes what fields to look for.
InspectConfig: &dlppb.InspectConfig{
InfoTypes: infoTypes,
CustomInfoTypes: customInfoTypes,
MinLikelihood: dlppb.Likelihood_POSSIBLE,
Limits: &dlppb.InspectConfig_FindingLimits{
MaxFindingsPerRequest: 10,
},
IncludeQuote: true,
},
// Send a message to PubSub using Actions.
Actions: []*dlppb.Action{
{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
},
},
},
},
}
// Create the inspect job.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return fmt.Errorf("CreateDlpJob: %w", err)
}
fmt.Fprintf(w, "Created job: %v\n", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "Error getting completed job: %v\n", err)
return
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
return
}
for _, s := range r {
fmt.Fprintf(w, " Found %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
})
if err != nil {
return fmt.Errorf("Receive: %w", err)
}
return nil
}
PHP
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DatastoreOptions;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\InspectConfig\FindingLimits;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\Dlp\V2\KindExpression;
use Google\Cloud\Dlp\V2\Likelihood;
use Google\Cloud\Dlp\V2\PartitionId;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Inspect Datastore, using Pub/Sub for job status notifications.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $dataProjectId The project ID containing the target Datastore
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $kind The datastore kind to inspect
* @param string $namespaceId The ID namespace of the Datastore document to inspect
* @param int $maxFindings (Optional) The maximum number of findings to report per request (0 = server maximum)
*/
function inspect_datastore(
string $callingProjectId,
string $dataProjectId,
string $topicId,
string $subscriptionId,
string $kind,
string $namespaceId,
int $maxFindings = 0
): void {
// Instantiate clients
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// The infoTypes of information to match
$personNameInfoType = (new InfoType())
->setName('PERSON_NAME');
$phoneNumberInfoType = (new InfoType())
->setName('PHONE_NUMBER');
$infoTypes = [$personNameInfoType, $phoneNumberInfoType];
// The minimum likelihood required before returning a match
$minLikelihood = likelihood::LIKELIHOOD_UNSPECIFIED;
// Specify finding limits
$limits = (new FindingLimits())
->setMaxFindingsPerRequest($maxFindings);
// Construct items to be inspected
$partitionId = (new PartitionId())
->setProjectId($dataProjectId)
->setNamespaceId($namespaceId);
$kindExpression = (new KindExpression())
->setName($kind);
$datastoreOptions = (new DatastoreOptions())
->setPartitionId($partitionId)
->setKind($kindExpression);
// Construct the inspect config object
$inspectConfig = (new InspectConfig())
->setInfoTypes($infoTypes)
->setMinLikelihood($minLikelihood)
->setLimits($limits);
// Construct the storage config object
$storageConfig = (new StorageConfig())
->setDatastoreOptions($datastoreOptions);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct inspect job config to run
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setInspectJob($inspectJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
print('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(' Found %s instance(s) of infoType %s' . PHP_EOL, $infoTypeStat->getCount(), $infoTypeStat->getInfoType()->getName());
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state.');
}
}
C#
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax.ResourceNames;
using Google.Cloud.BigQuery.V2;
using Google.Cloud.Dlp.V2;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Collections.Generic;
using System.Threading;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;
public class InspectCloudDataStore
{
public static object Inspect(
string projectId,
Likelihood minLikelihood,
int maxFindings,
bool includeQuote,
string kindName,
string namespaceId,
IEnumerable<InfoType> infoTypes,
IEnumerable<CustomInfoType> customInfoTypes,
string datasetId,
string tableId)
{
var inspectJob = new InspectJobConfig
{
StorageConfig = new StorageConfig
{
DatastoreOptions = new DatastoreOptions
{
Kind = new KindExpression { Name = kindName },
PartitionId = new PartitionId
{
NamespaceId = namespaceId,
ProjectId = projectId,
}
},
TimespanConfig = new StorageConfig.Types.TimespanConfig
{
StartTime = Timestamp.FromDateTime(System.DateTime.UtcNow.AddYears(-1)),
EndTime = Timestamp.FromDateTime(System.DateTime.UtcNow)
}
},
InspectConfig = new InspectConfig
{
InfoTypes = { infoTypes },
CustomInfoTypes = { customInfoTypes },
Limits = new FindingLimits
{
MaxFindingsPerRequest = maxFindings
},
ExcludeInfoTypes = false,
IncludeQuote = includeQuote,
MinLikelihood = minLikelihood
},
Actions =
{
new Google.Cloud.Dlp.V2.Action
{
// Save results in BigQuery Table
SaveFindings = new Google.Cloud.Dlp.V2.Action.Types.SaveFindings
{
OutputConfig = new OutputStorageConfig
{
Table = new Google.Cloud.Dlp.V2.BigQueryTable
{
ProjectId = projectId,
DatasetId = datasetId,
TableId = tableId
}
}
},
}
}
};
// Issue Create Dlp Job Request
var client = DlpServiceClient.Create();
var request = new CreateDlpJobRequest
{
InspectJob = inspectJob,
Parent = new LocationName(projectId, "global").ToString(),
};
// We need created job name
var dlpJob = client.CreateDlpJob(request);
var jobName = dlpJob.Name;
// Make sure the job finishes before inspecting the results.
// Alternatively, we can inspect results opportunistically, but
// for testing purposes, we want consistent outcome
var finishedJob = EnsureJobFinishes(projectId, jobName);
var bigQueryClient = BigQueryClient.Create(projectId);
var table = bigQueryClient.GetTable(datasetId, tableId);
// Return only first page of 10 rows
Console.WriteLine("DLP v2 Results:");
var firstPage = table.ListRows(new ListRowsOptions { StartIndex = 0, PageSize = 10 });
foreach (var item in firstPage)
{
Console.WriteLine($"\t {item[""]}");
}
return finishedJob;
}
private static DlpJob EnsureJobFinishes(string projectId, string jobName)
{
var client = DlpServiceClient.Create();
var request = new GetDlpJobRequest
{
DlpJobName = new DlpJobName(projectId, jobName),
};
// Simple logic that gives the job 5*30 sec at most to complete - for testing purposes only
var numOfAttempts = 5;
do
{
var dlpJob = client.GetDlpJob(request);
numOfAttempts--;
if (dlpJob.State != DlpJob.Types.JobState.Running)
{
return dlpJob;
}
Thread.Sleep(TimeSpan.FromSeconds(30));
} while (numOfAttempts > 0);
throw new InvalidOperationException("Job did not complete in time");
}
}
检查 BigQuery 表格
您可以通过 REST 请求使用敏感数据保护设置 BigQuery 表的检查,也可以采用多种语言以编程方式使用客户端库进行检查。
如需设置使用敏感数据保护的 BigQuery 表扫描作业,请执行以下操作:
控制台
如需设置使用敏感数据保护的 BigQuery 表扫描作业,请执行以下操作:
在 Google Cloud 控制台的“敏感数据保护”部分中,前往创建作业或作业触发器页面。
输入敏感数据保护作业信息,然后点击继续以完成各个步骤:
对于第 1 步:选择输入数据,请在名称字段中输入值,为作业命名。在位置中,从存储类型菜单中选择 BigQuery,然后输入要扫描的表的信息。
系统已预先配置采样部分,以便针对您的数据运行示例扫描。如果您有大量数据,则可以调整限制行数的依据和行数上限字段以节省资源。如需了解详情,请参阅选择输入数据。
(可选)如果您希望能够将每个发现结果与其所在的行相关联,请设置标识字段字段。
输入可唯一标识表中每一行的列的名称。如有必要,请使用点表示法指定嵌套字段。您可以根据需要添加任意数量的字段。
您还必须启用保存到 BigQuery 操作,将发现结果导出到 BigQuery。将发现结果导出到 BigQuery 时,每个发现结果都包含标识字段的相应值。如需了解详情,请参阅
identifyingFields
。(可选)对于第 2 步:配置检测,您可以配置要查找的数据类型,称为 infoTypes。您可以从预定义的 infoType 列表中进行选择,也可以选择已有的模板(如有)。如需了解详情,请参阅配置检测。
(可选)对于第 3 步:添加操作,确保通过电子邮件发送通知已启用。
启用保存到 BigQuery 以将敏感数据保护发现结果发布到 BigQuery 表。提供以下信息:
- 对于项目 ID,请输入用于存储结果的项目的 ID。
- 对于数据集 ID,请输入用于存储结果的数据集的名称。
- (可选)对于表格 ID,请输入用于存储结果的表格的名称。如果未指定表格 ID,系统会为新表格分配类似于
dlp_googleapis_[DATE]_1234567890
这样的默认名称。如果您指定的是现有的表格,则系统会将发现结果附加到其中。
将数据写入 BigQuery 表时,结算和配额用量将应用于目标表所属的项目。
您还可以将结果保存到 Pub/Sub、Security Command Center 和 Data Catalog。如需了解详情,请参阅添加操作。
(可选)对于第 4 步:时间安排,如需仅运行一次扫描,请将菜单设置为无。如需安排定期运行扫描,请点击创建一个触发器来定期运行作业。如需了解详情,请参阅时间安排。
点击创建。
敏感数据保护作业完成后,系统会将您重定向到作业详情页面,并通过电子邮件通知您。您可以在作业详情页面上查看检查结果。
(可选)如果您选择将敏感数据保护发现结果发布到 BigQuery,请在作业详情页面上点击在 BigQuery 中查看发现结果,以在 BigQuery 网页界面中打开相应表格。您可以随后对表格执行查询,并分析发现结果。如需详细了解如何在 BigQuery 中查询结果,请参阅在 BigQuery 中查询敏感数据保护发现结果。
协议
以下是可在 POST 请求中发送到指定 DLP API REST 端点的 JSON 示例。此 JSON 示例演示了如何使用 DLP API 检查 BigQuery 表。如需了解请求中包含的参数,请参阅本主题后面的配置存储空间检查。您可以在 API Explorer 的 dlpJobs.create
参考页面上快速尝试此操作:
请注意,如果请求成功(即使是在 APIs Explorer 中),就会创建一个新的扫描作业。如需了解如何控制扫描作业,请参阅本主题后面的检索检查结果。如需了解有关如何使用 JSON 将请求发送到 DLP API 的一般信息,请参阅 JSON 快速入门。
JSON 输入:
POST https://dlp.googleapis.com/v2/projects/[PROJECT-ID]/dlpJobs?key={YOUR_API_KEY}
{
"inspectJob":{
"storageConfig":{
"bigQueryOptions":{
"tableReference":{
"projectId":"[PROJECT-ID]",
"datasetId":"[BIGQUERY-DATASET-NAME]",
"tableId":"[BIGQUERY-TABLE-NAME]"
},
"identifyingFields":[
{
"name":"id"
}
]
},
"timespanConfig":{
"startTime":"2017-11-13T12:34:29.965633345Z ",
"endTime":"2018-01-05T04:45:04.240912125Z "
}
},
"inspectConfig":{
"infoTypes":[
{
"name":"PHONE_NUMBER"
}
],
"excludeInfoTypes":false,
"includeQuote":true,
"minLikelihood":"LIKELY"
},
"actions":[
{
"saveFindings":{
"outputConfig":{
"table":{
"projectId":"[PROJECT-ID]",
"datasetId":"[BIGQUERY-DATASET-NAME]",
"tableId":"[BIGQUERY-TABLE-NAME]"
},
"outputSchema": "BASIC_COLUMNS"
}
}
}
]
}
}
Java
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.BigQueryOptions;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InspectBigQueryTable {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String bigQueryDatasetId = "your-bigquery-dataset-id";
String bigQueryTableId = "your-bigquery-table-id";
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectBigQueryTable(projectId, bigQueryDatasetId, bigQueryTableId, topicId, subscriptionId);
}
// Inspects a BigQuery Table
public static void inspectBigQueryTable(
String projectId,
String bigQueryDatasetId,
String bigQueryTableId,
String topicId,
String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlp = DlpServiceClient.create()) {
// Specify the BigQuery table to be inspected.
BigQueryTable tableReference =
BigQueryTable.newBuilder()
.setProjectId(projectId)
.setDatasetId(bigQueryDatasetId)
.setTableId(bigQueryTableId)
.build();
BigQueryOptions bigQueryOptions =
BigQueryOptions.newBuilder().setTableReference(tableReference).build();
StorageConfig storageConfig =
StorageConfig.newBuilder().setBigQueryOptions(bigQueryOptions).build();
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
List<InfoType> infoTypes =
Stream.of("PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD_NUMBER")
.map(it -> InfoType.newBuilder().setName(it).build())
.collect(Collectors.toList());
// Specify how the content should be inspected.
InspectConfig inspectConfig =
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();
// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the long running job we want the service to perform.
InspectJobConfig inspectJobConfig =
InspectJobConfig.newBuilder()
.setStorageConfig(storageConfig)
.setInspectConfig(inspectConfig)
.addActions(action)
.build();
// Create the request for the job configured above.
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setInspectJob(inspectJobConfig)
.build();
// Use the client to send the request.
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);
// Parse the response and process results.
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
System.out.println("Findings: ");
for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
System.out.println("\tCount: " + infoTypeStat.getCount());
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const dataProjectId = 'my-project';
// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';
// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';
// The minimum likelihood required before returning a match
// const minLikelihood = 'LIKELIHOOD_UNSPECIFIED';
// The maximum number of findings to report per request (0 = server maximum)
// const maxFindings = 0;
// The infoTypes of information to match
// const infoTypes = [{ name: 'PHONE_NUMBER' }, { name: 'EMAIL_ADDRESS' }, { name: 'CREDIT_CARD_NUMBER' }];
// The customInfoTypes of information to match
// const customInfoTypes = [{ infoType: { name: 'DICT_TYPE' }, dictionary: { wordList: { words: ['foo', 'bar', 'baz']}}},
// { infoType: { name: 'REGEX_TYPE' }, regex: {pattern: '\\(\\d{3}\\) \\d{3}-\\d{4}'}}];
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
async function inspectBigquery() {
// Construct item to be inspected
const storageItem = {
bigQueryOptions: {
tableReference: {
projectId: dataProjectId,
datasetId: datasetId,
tableId: tableId,
},
},
};
// Construct request for creating an inspect job
const request = {
parent: `projects/${projectId}/locations/global`,
inspectJob: {
inspectConfig: {
infoTypes: infoTypes,
customInfoTypes: customInfoTypes,
minLikelihood: minLikelihood,
limits: {
maxFindingsPerRequest: maxFindings,
},
},
storageConfig: storageItem,
actions: [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
],
},
};
// Run inspect-job creation request
const [topicResponse] = await pubsub.topic(topicId).get();
// Verify the Pub/Sub topic and listen for job notifications via an
// existing subscription.
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
const jobName = jobsResponse.name;
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
// Wait for DLP job to fully complete
setTimeout(() => {
console.log('Waiting for DLP job to fully complete');
}, 500);
const [job] = await dlp.getDlpJob({name: jobName});
console.log(`Job ${job.name} status: ${job.state}`);
const infoTypeStats = job.inspectDetails.result.infoTypeStats;
if (infoTypeStats.length > 0) {
infoTypeStats.forEach(infoTypeStat => {
console.log(
` Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
);
});
} else {
console.log('No findings.');
}
}
await inspectBigquery();
Python
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import threading
from typing import List, Optional
import google.cloud.dlp
import google.cloud.pubsub
def inspect_bigquery(
project: str,
bigquery_project: str,
dataset_id: str,
table_id: str,
topic_id: str,
subscription_id: str,
info_types: List[str],
custom_dictionaries: List[str] = None,
custom_regexes: List[str] = None,
min_likelihood: Optional[int] = None,
max_findings: Optional[int] = None,
timeout: int = 500,
) -> None:
"""Uses the Data Loss Prevention API to analyze BigQuery data.
Args:
project: The Google Cloud project id to use as a parent resource.
bigquery_project: The Google Cloud project id of the target table.
dataset_id: The id of the target BigQuery dataset.
table_id: The id of the target BigQuery table.
topic_id: The id of the Cloud Pub/Sub topic to which the API will
broadcast job completion. The topic must already exist.
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
while waiting for job completion. The subscription must already
exist and be subscribed to the topic.
info_types: A list of strings representing info types to look for.
A full list of info type categories can be fetched from the API.
min_likelihood: A string representing the minimum likelihood threshold
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
max_findings: The maximum number of findings to report; 0 = no maximum.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Prepare info_types by converting the list of strings into a list of
# dictionaries (protos are also accepted).
if not info_types:
info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
info_types = [{"name": info_type} for info_type in info_types]
# Prepare custom_info_types by parsing the dictionary word lists and
# regex patterns.
if custom_dictionaries is None:
custom_dictionaries = []
dictionaries = [
{
"info_type": {"name": f"CUSTOM_DICTIONARY_{i}"},
"dictionary": {"word_list": {"words": custom_dict.split(",")}},
}
for i, custom_dict in enumerate(custom_dictionaries)
]
if custom_regexes is None:
custom_regexes = []
regexes = [
{
"info_type": {"name": f"CUSTOM_REGEX_{i}"},
"regex": {"pattern": custom_regex},
}
for i, custom_regex in enumerate(custom_regexes)
]
custom_info_types = dictionaries + regexes
# Construct the configuration dictionary. Keys which are None may
# optionally be omitted entirely.
inspect_config = {
"info_types": info_types,
"custom_info_types": custom_info_types,
"min_likelihood": min_likelihood,
"limits": {"max_findings_per_request": max_findings},
}
# Construct a storage_config containing the target Bigquery info.
storage_config = {
"big_query_options": {
"table_reference": {
"project_id": bigquery_project,
"dataset_id": dataset_id,
"table_id": table_id,
}
}
}
# Convert the project id into full resource ids.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
parent = f"projects/{project}/locations/global"
# Tell the API where to send a notification when the job is complete.
actions = [{"pub_sub": {"topic": topic}}]
# Construct the inspect_job, which defines the entire inspect content task.
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print(f"Inspection operation started: {operation.name}")
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
# Set up a callback to acknowledge a message. This closes around an event
# so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
try:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
if job.inspect_details.result.info_type_stats:
for finding in job.inspect_details.result.info_type_stats:
print(
"Info type: {}; Count: {}".format(
finding.info_type.name, finding.count
)
)
else:
print("No findings.")
# Signal to the main thread that we can exit.
job_done.set()
else:
# This is not the message we're looking for.
message.drop()
except Exception as e:
# Because this is executing in a thread, an exception won't be
# noted unless we print it manually.
print(e)
raise
# Register the callback and wait on the event.
subscriber.subscribe(subscription_path, callback=callback)
finished = job_done.wait(timeout=timeout)
if not finished:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
Go
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"strings"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectBigquery searches for the given info types in the given Bigquery dataset table.
func inspectBigquery(w io.Writer, projectID string, infoTypeNames []string, customDictionaries []string, customRegexes []string, pubSubTopic, pubSubSub, dataProject, datasetID, tableID string) error {
// projectID := "my-project-id"
// infoTypeNames := []string{"US_SOCIAL_SECURITY_NUMBER"}
// customDictionaries := []string{...}
// customRegexes := []string{...}
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// dataProject := "my-data-project-ID"
// datasetID := "my_dataset"
// tableID := "mytable"
ctx := context.Background()
client, err := dlp.NewClient(ctx)
if err != nil {
return fmt.Errorf("dlp.NewClient: %w", err)
}
// Convert the info type strings to a list of InfoTypes.
var infoTypes []*dlppb.InfoType
for _, it := range infoTypeNames {
infoTypes = append(infoTypes, &dlppb.InfoType{Name: it})
}
// Convert the custom dictionary word lists and custom regexes to a list of CustomInfoTypes.
var customInfoTypes []*dlppb.CustomInfoType
for idx, it := range customDictionaries {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_DICTIONARY_%d", idx),
},
Type: &dlppb.CustomInfoType_Dictionary_{
Dictionary: &dlppb.CustomInfoType_Dictionary{
Source: &dlppb.CustomInfoType_Dictionary_WordList_{
WordList: &dlppb.CustomInfoType_Dictionary_WordList{
Words: strings.Split(it, ","),
},
},
},
},
})
}
for idx, it := range customRegexes {
customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
InfoType: &dlppb.InfoType{
Name: fmt.Sprintf("CUSTOM_REGEX_%d", idx),
},
Type: &dlppb.CustomInfoType_Regex_{
Regex: &dlppb.CustomInfoType_Regex{
Pattern: it,
},
},
})
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(pubSubTopic)
if exists, err := t.Exists(ctx); err != nil {
return fmt.Errorf("t.Exists: %w", err)
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
return fmt.Errorf("CreateTopic: %w", err)
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(pubSubSub)
if exists, err := s.Exists(ctx); err != nil {
return fmt.Errorf("s.Exits: %w", err)
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return fmt.Errorf("CreateSubscription: %w", err)
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + pubSubTopic
// Create a configured request.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: &dlppb.InspectJobConfig{
// StorageConfig describes where to find the data.
StorageConfig: &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_BigQueryOptions{
BigQueryOptions: &dlppb.BigQueryOptions{
TableReference: &dlppb.BigQueryTable{
ProjectId: dataProject,
DatasetId: datasetID,
TableId: tableID,
},
},
},
},
// InspectConfig describes what fields to look for.
InspectConfig: &dlppb.InspectConfig{
InfoTypes: infoTypes,
CustomInfoTypes: customInfoTypes,
MinLikelihood: dlppb.Likelihood_POSSIBLE,
Limits: &dlppb.InspectConfig_FindingLimits{
MaxFindingsPerRequest: 10,
},
IncludeQuote: true,
},
// Send a message to PubSub using Actions.
Actions: []*dlppb.Action{
{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
},
},
},
},
}
// Create the inspect job.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return fmt.Errorf("CreateDlpJob: %w", err)
}
fmt.Fprintf(w, "Created job: %v\n", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "Error getting completed job: %v\n", err)
return
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
return
}
for _, s := range r {
fmt.Fprintf(w, " Found %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
})
if err != nil {
return fmt.Errorf("Receive: %w", err)
}
return nil
}
PHP
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryOptions;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\InspectConfig\FindingLimits;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\Dlp\V2\Likelihood;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Inspect a BigQuery table , using Pub/Sub for job status notifications.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $dataProjectId The project ID containing the target Datastore
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $datasetId The ID of the dataset to inspect
* @param string $tableId The ID of the table to inspect
* @param int $maxFindings (Optional) The maximum number of findings to report per request (0 = server maximum)
*/
function inspect_bigquery(
string $callingProjectId,
string $dataProjectId,
string $topicId,
string $subscriptionId,
string $datasetId,
string $tableId,
int $maxFindings = 0
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// The infoTypes of information to match
$personNameInfoType = (new InfoType())
->setName('PERSON_NAME');
$creditCardNumberInfoType = (new InfoType())
->setName('CREDIT_CARD_NUMBER');
$infoTypes = [$personNameInfoType, $creditCardNumberInfoType];
// The minimum likelihood required before returning a match
$minLikelihood = likelihood::LIKELIHOOD_UNSPECIFIED;
// Specify finding limits
$limits = (new FindingLimits())
->setMaxFindingsPerRequest($maxFindings);
// Construct items to be inspected
$bigqueryTable = (new BigQueryTable())
->setProjectId($dataProjectId)
->setDatasetId($datasetId)
->setTableId($tableId);
$bigQueryOptions = (new BigQueryOptions())
->setTableReference($bigqueryTable);
$storageConfig = (new StorageConfig())
->setBigQueryOptions($bigQueryOptions);
// Construct the inspect config object
$inspectConfig = (new InspectConfig())
->setMinLikelihood($minLikelihood)
->setLimits($limits)
->setInfoTypes($infoTypes);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct inspect job config to run
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setInspectJob($inspectJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
print('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(
' Found %s instance(s) of infoType %s' . PHP_EOL,
$infoTypeStat->getCount(),
$infoTypeStat->getInfoType()->getName()
);
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
C#
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax.ResourceNames;
using Google.Cloud.BigQuery.V2;
using Google.Cloud.Dlp.V2;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Collections.Generic;
using System.Threading;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;
public class InspectBigQuery
{
public static object Inspect(
string projectId,
Likelihood minLikelihood,
int maxFindings,
bool includeQuote,
IEnumerable<FieldId> identifyingFields,
IEnumerable<InfoType> infoTypes,
IEnumerable<CustomInfoType> customInfoTypes,
string datasetId,
string tableId)
{
var inspectJob = new InspectJobConfig
{
StorageConfig = new StorageConfig
{
BigQueryOptions = new BigQueryOptions
{
TableReference = new Google.Cloud.Dlp.V2.BigQueryTable
{
ProjectId = projectId,
DatasetId = datasetId,
TableId = tableId,
},
IdentifyingFields =
{
identifyingFields
}
},
TimespanConfig = new StorageConfig.Types.TimespanConfig
{
StartTime = Timestamp.FromDateTime(System.DateTime.UtcNow.AddYears(-1)),
EndTime = Timestamp.FromDateTime(System.DateTime.UtcNow)
}
},
InspectConfig = new InspectConfig
{
InfoTypes = { infoTypes },
CustomInfoTypes = { customInfoTypes },
Limits = new FindingLimits
{
MaxFindingsPerRequest = maxFindings
},
ExcludeInfoTypes = false,
IncludeQuote = includeQuote,
MinLikelihood = minLikelihood
},
Actions =
{
new Google.Cloud.Dlp.V2.Action
{
// Save results in BigQuery Table
SaveFindings = new Google.Cloud.Dlp.V2.Action.Types.SaveFindings
{
OutputConfig = new OutputStorageConfig
{
Table = new Google.Cloud.Dlp.V2.BigQueryTable
{
ProjectId = projectId,
DatasetId = datasetId,
TableId = tableId
}
}
},
}
}
};
// Issue Create Dlp Job Request
var client = DlpServiceClient.Create();
var request = new CreateDlpJobRequest
{
InspectJob = inspectJob,
Parent = new LocationName(projectId, "global").ToString(),
};
// We need created job name
var dlpJob = client.CreateDlpJob(request);
var jobName = dlpJob.Name;
// Make sure the job finishes before inspecting the results.
// Alternatively, we can inspect results opportunistically, but
// for testing purposes, we want consistent outcome
var finishedJob = EnsureJobFinishes(projectId, jobName);
var bigQueryClient = BigQueryClient.Create(projectId);
var table = bigQueryClient.GetTable(datasetId, tableId);
// Return only first page of 10 rows
Console.WriteLine("DLP v2 Results:");
var firstPage = table.ListRows(new ListRowsOptions { StartIndex = 0, PageSize = 10 });
foreach (var item in firstPage)
{
Console.WriteLine($"\t {item[""]}");
}
return finishedJob;
}
private static DlpJob EnsureJobFinishes(string projectId, string jobName)
{
var client = DlpServiceClient.Create();
var request = new GetDlpJobRequest
{
DlpJobName = new DlpJobName(projectId, jobName),
};
// Simple logic that gives the job 5*30 sec at most to complete - for testing purposes only
var numOfAttempts = 5;
do
{
var dlpJob = client.GetDlpJob(request);
numOfAttempts--;
if (dlpJob.State != DlpJob.Types.JobState.Running)
{
return dlpJob;
}
Thread.Sleep(TimeSpan.FromSeconds(30));
} while (numOfAttempts > 0);
throw new InvalidOperationException("Job did not complete in time");
}
}
配置存储空间检查
如需检查 Cloud Storage 位置、Datastore 种类或 BigQuery 表,请向 DLP API 的 projects.dlpJobs.create
方法发送请求,其中至少包含要扫描的数据的位置和要扫描的内容。除了这些必需参数之外,您还可以指定扫描结果的写入位置、大小和可能性阈值等。请求成功时会创建 DlpJob
对象实例,检索检查结果中对此进行了讨论。
下面汇总了可用的配置选项:
InspectJobConfig
对象:包含检查作业的配置信息。请注意,InspectJobConfig
对象也被JobTriggers
对象用于安排DlpJob
的创建。该对象包含以下内容:StorageConfig
对象:必填。包含要扫描的存储区详情:StorageConfig
对象中必须包含以下任一对象,具体取决于要扫描的存储区的类型:CloudStorageOptions
对象:包含要扫描的 Cloud Storage 存储桶信息。DatastoreOptions
对象:包含要扫描的 Datastore 数据集信息。BigQueryOptions
对象:包含要扫描的 BigQuery 表格(以及标识字段,可选)信息。此对象还启用了结果采样。如需了解详情,请参阅下面的启用结果采样。TimespanConfig
对象:可选。指定要包含在扫描中的项目的时间范围。
InspectConfig
对象:必填。指定要扫描的内容,例如 infoTypes 和可能性值。InfoType
对象:必填。需要扫描的一个或多个 infoType 值。Likelihood
枚举:可选。设置后,敏感数据保护将仅返回等于或大于此可能性阈值的发现结果。如果忽略此枚举,则默认值为POSSIBLE
。FindingLimits
对象:可选。设置后,您可以通过此对象指定返回的结果的数量限制。includeQuote
参数:可选。默认值为false
。如果设置为true
,每个结果都将包含来自触发该结果的数据的上下文引用。excludeInfoTypes
参数:可选。默认值为false
。如果设置为true
,扫描结果将排除结果的类型信息。CustomInfoType
对象:一个或多个用户创建的自定义 infoType。如需详细了解如何创建自定义 infoType,请参阅创建自定义 InfoType 检测器。
inspectTemplateName
字符串:可选。指定用于填充InspectConfig
对象中的默认值的模板。 如果您已指定InspectConfig
,则模板值会合并到其中。Action
对象:可选。完成作业时要执行的一个或多个操作。每个操作都按照各自的列出顺序执行。 您可以在此处指定写入结果的位置,还可以指定是否将通知发布到 Pub/Sub 主题。
jobId
:可选。敏感数据保护返回的作业的标识符。如果省略jobId
或它为空,则系统会为该作业创建 ID。如果指定,则会为作业分配此 ID 值。 作业 ID 必须是唯一的,可以包含大写和小写字母、数字和连字符;也就是说,它必须与正则表达式[a-zA-Z\\d-]+
匹配。
限制检查的内容量
如果您要扫描 BigQuery 表或 Cloud Storage 存储分区,敏感数据保护提供了一种扫描数据集子集的方法。此方法可以提供扫描结果的采样,而不会产生扫描整个数据集的潜在费用。
以下部分将介绍如何限制 Cloud Storage 扫描和 BigQuery 扫描的大小。
限制 Cloud Storage 扫描
您可以通过限制扫描的数据量来实现 Cloud Storage 中的采样。您可以指示 DLP API 仅扫描特定大小的文件、仅扫描特定文件类型、仅扫描输入文件集中占文件总数特定百分比的文件数量。为此,请在 CloudStorageOptions
中指定以下可选字段:
bytesLimitPerFile
:设置从文件中扫描的最大字节数。如果扫描的文件的大小大于此值,则省略其余字节。设置此字段不会影响某些文件类型。如需了解详情,请参阅每个文件扫描的字节数限制。fileTypes[]
:列出要包含在扫描中的FileTypes
。此字段可以设置为以下一种或多种枚举类型:filesLimitPercent
:将要扫描的文件数限制为占输入FileSet
的指定百分比。 在此字段中指定0
或100
表示没有限制。sampleMethod
:在不扫描所有字节的情况下,对字节进行采样的方法。仅当与bytesLimitPerFile
结合使用时,指定此值才有意义。如果未指定,则从顶部开始扫描。此字段可以设置为以下任一值:TOP
:从顶部开始扫描。RANDOM_START
:对于大于bytesLimitPerFile
中指定大小的每个文件,随机选择偏移以开始扫描。扫描的字节是连续的。
以下示例演示了如何使用 DLP API 扫描 Cloud Storage 存储桶中 90% 的子集,以查看人名。从数据集中的随机位置开始扫描,并且仅包含 200 字节以下的文本文件。
C#
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class InspectStorageWithSampling
{
public static async Task<DlpJob> InspectAsync(
string projectId,
string gcsUri,
string topicId,
string subId,
Likelihood minLikelihood = Likelihood.Possible,
IEnumerable<InfoType> infoTypes = null)
{
// Instantiate the dlp client.
var dlp = DlpServiceClient.Create();
// Construct Storage config by specifying the GCS file to be inspected
// and sample method.
var storageConfig = new StorageConfig
{
CloudStorageOptions = new CloudStorageOptions
{
FileSet = new CloudStorageOptions.Types.FileSet
{
Url = gcsUri
},
BytesLimitPerFile = 200,
FileTypes = { new FileType[] { FileType.Csv } },
FilesLimitPercent = 90,
SampleMethod = CloudStorageOptions.Types.SampleMethod.RandomStart
}
};
// Construct the Inspect Config and specify the type of info the inspection
// will look for.
var inspectConfig = new InspectConfig
{
InfoTypes =
{
infoTypes ?? new InfoType[] { new InfoType { Name = "PERSON_NAME" } }
},
IncludeQuote = true,
MinLikelihood = minLikelihood
};
// Construct the pubsub action.
var actions = new Action[]
{
new Action
{
PubSub = new Action.Types.PublishToPubSub
{
Topic = $"projects/{projectId}/topics/{topicId}"
}
}
};
// Construct the inspect job config using above created objects.
var inspectJob = new InspectJobConfig
{
StorageConfig = storageConfig,
InspectConfig = inspectConfig,
Actions = { actions }
};
// Issue Create Dlp Job Request
var request = new CreateDlpJobRequest
{
InspectJob = inspectJob,
ParentAsLocationName = new LocationName(projectId, "global"),
};
// We keep the name of the job that we just created.
var dlpJob = dlp.CreateDlpJob(request);
var jobName = dlpJob.Name;
// Listen to pub/sub for the job
var subscriptionName = new SubscriptionName(projectId, subId);
var subscriber = await SubscriberClient.CreateAsync(
subscriptionName);
await subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
if (message.Attributes["DlpJobName"] == jobName)
{
subscriber.StopAsync(cancel);
return Task.FromResult(SubscriberClient.Reply.Ack);
}
else
{
return Task.FromResult(SubscriberClient.Reply.Nack);
}
});
// Get the latest state of the job from the service
var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
{
DlpJobName = DlpJobName.Parse(jobName)
});
// Parse the response and process results.
System.Console.WriteLine($"Job status: {resultJob.State}");
System.Console.WriteLine($"Job Name: {resultJob.Name}");
var result = resultJob.InspectDetails.Result;
foreach (var infoType in result.InfoTypeStats)
{
System.Console.WriteLine($"Info Type: {infoType.InfoType.Name}");
System.Console.WriteLine($"Count: {infoType.Count}");
}
return resultJob;
}
}
Go
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectGcsFileWithSampling inspects a storage with sampling
func inspectGcsFileWithSampling(w io.Writer, projectID, gcsUri, topicID, subscriptionId string) error {
// projectId := "your-project-id"
// gcsUri := "gs://" + "your-bucket-name" + "/path/to/your/file.txt"
// topicID := "your-pubsub-topic-id"
// subscriptionId := "your-pubsub-subscription-id"
ctx := context.Background()
// Initialize a client once and reuse it to send multiple requests. Clients
// are safe to use across goroutines. When the client is no longer needed,
// call the Close method to cleanup its resources.
client, err := dlp.NewClient(ctx)
if err != nil {
return err
}
// Closing the client safely cleans up background resources.
defer client.Close()
// Specify the GCS file to be inspected and sampling configuration
var cloudStorageOptions = &dlppb.CloudStorageOptions{
FileSet: &dlppb.CloudStorageOptions_FileSet{
Url: gcsUri,
},
BytesLimitPerFile: int64(200),
FileTypes: []dlppb.FileType{
dlppb.FileType_TEXT_FILE,
},
FilesLimitPercent: int32(90),
SampleMethod: dlppb.CloudStorageOptions_RANDOM_START,
}
var storageConfig = &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_CloudStorageOptions{
CloudStorageOptions: cloudStorageOptions,
},
}
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
// Specify how the content should be inspected.
var inspectConfig = &dlppb.InspectConfig{
InfoTypes: []*dlppb.InfoType{
{Name: "PERSON_NAME"},
},
ExcludeInfoTypes: true,
IncludeQuote: true,
MinLikelihood: dlppb.Likelihood_POSSIBLE,
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(topicID)
if exists, err := t.Exists(ctx); err != nil {
return err
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, topicID); err != nil {
return err
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(subscriptionId)
if exists, err := s.Exists(ctx); err != nil {
return err
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, subscriptionId, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return err
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + topicID
var action = &dlppb.Action{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
}
// Configure the long running job we want the service to perform.
var inspectJobConfig = &dlppb.InspectJobConfig{
StorageConfig: storageConfig,
InspectConfig: inspectConfig,
Actions: []*dlppb.Action{
action,
},
}
// Create the request for the job configured above.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: inspectJobConfig,
},
}
// Use the client to send the request.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return err
}
fmt.Fprintf(w, "Job Created: %v", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "Error getting completed job: %v\n", err)
return
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
return
}
for _, s := range r {
fmt.Fprintf(w, "\nFound %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
})
if err != nil {
return err
}
return nil
}
Java
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CloudStorageOptions;
import com.google.privacy.dlp.v2.CloudStorageOptions.FileSet;
import com.google.privacy.dlp.v2.CloudStorageOptions.SampleMethod;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FileType;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.Likelihood;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class InspectGcsFileWithSampling {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt";
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectGcsFileWithSampling(projectId, gcsUri, topicId, subscriptionId);
}
// Inspects a file in a Google Cloud Storage Bucket.
public static void inspectGcsFileWithSampling(
String projectId, String gcsUri, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlp = DlpServiceClient.create()) {
// Specify the GCS file to be inspected and sampling configuration
CloudStorageOptions cloudStorageOptions =
CloudStorageOptions.newBuilder()
.setFileSet(FileSet.newBuilder().setUrl(gcsUri))
.setBytesLimitPerFile(200)
.addFileTypes(FileType.TEXT_FILE)
.setFilesLimitPercent(90)
.setSampleMethod(SampleMethod.RANDOM_START)
.build();
StorageConfig storageConfig =
StorageConfig.newBuilder().setCloudStorageOptions(cloudStorageOptions).build();
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
InfoType infoType = InfoType.newBuilder().setName("PERSON_NAME").build();
// Specify how the content should be inspected.
InspectConfig inspectConfig =
InspectConfig.newBuilder()
.addInfoTypes(infoType)
.setExcludeInfoTypes(true)
.setIncludeQuote(true)
.setMinLikelihood(Likelihood.POSSIBLE)
.build();
// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the long running job we want the service to perform.
InspectJobConfig inspectJobConfig =
InspectJobConfig.newBuilder()
.setStorageConfig(storageConfig)
.setInspectConfig(inspectConfig)
.addActions(action)
.build();
// Create the request for the job configured above.
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setInspectJob(inspectJobConfig)
.build();
// Use the client to send the request.
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);
// Parse the response and process results.
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
System.out.println("Findings: ");
for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
System.out.println("\tCount: " + infoTypeStat.getCount());
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The gcs file path
// const gcsUri = 'gs://" + "your-bucket-name" + "/path/to/your/file.txt';
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
// const infoTypes = [{ name: 'PERSON_NAME' }];
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
// DLP Job max time (in milliseconds)
const DLP_JOB_WAIT_TIME = 15 * 1000 * 60;
async function inspectGcsFileSampling() {
// Specify the GCS file to be inspected and sampling configuration
const storageItemConfig = {
cloudStorageOptions: {
fileSet: {url: gcsUri},
bytesLimitPerFile: 200,
filesLimitPercent: 90,
fileTypes: [DLP.protos.google.privacy.dlp.v2.FileType.TEXT_FILE],
sampleMethod:
DLP.protos.google.privacy.dlp.v2.CloudStorageOptions.SampleMethod
.RANDOM_START,
},
};
// Specify how the content should be inspected.
const inspectConfig = {
infoTypes: infoTypes,
minLikelihood: DLP.protos.google.privacy.dlp.v2.Likelihood.POSSIBLE,
includeQuote: true,
excludeInfoTypes: true,
};
// Specify the action that is triggered when the job completes.
const actions = [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
];
// Create the request for the job configured above.
const request = {
parent: `projects/${projectId}/locations/global`,
inspectJob: {
inspectConfig: inspectConfig,
storageConfig: storageItemConfig,
actions: actions,
},
};
// Use the client to send the request.
const [topicResponse] = await pubsub.topic(topicId).get();
// Verify the Pub/Sub topic and listen for job notifications via an
// existing subscription.
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
const jobName = jobsResponse.name;
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
// Set up the timeout
const timer = setTimeout(() => {
reject(new Error('Timeout'));
}, DLP_JOB_WAIT_TIME);
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
clearTimeout(timer);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
clearTimeout(timer);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
const [job] = await dlp.getDlpJob({name: jobName});
console.log(`Job ${job.name} status: ${job.state}`);
const infoTypeStats = job.inspectDetails.result.infoTypeStats;
if (infoTypeStats.length > 0) {
infoTypeStats.forEach(infoTypeStat => {
console.log(
` Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
);
});
} else {
console.log('No findings.');
}
}
await inspectGcsFileSampling();
PHP
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryOptions\SampleMethod;
use Google\Cloud\Dlp\V2\CloudStorageOptions;
use Google\Cloud\Dlp\V2\CloudStorageOptions\FileSet;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Inspect storage with sampling.
* The following examples demonstrate using the Cloud DLP API to scan a 90% subset of a
* Cloud Storage bucket for person names. The scan starts from a random location in the dataset
* and only includes text files under 200 bytes.
*
* @param string $callingProjectId The project ID to run the API call under.
* @param string $gcsUri Google Cloud Storage file url.
* @param string $topicId The ID of the Pub/Sub topic to notify once the job completes.
* @param string $subscriptionId The ID of the Pub/Sub subscription to use when listening for job.
*/
function inspect_gcs_with_sampling(
// TODO(developer): Replace sample parameters before running the code.
string $callingProjectId,
string $gcsUri = 'gs://GOOGLE_STORAGE_BUCKET_NAME/dlp_sample.csv',
string $topicId = 'dlp-pubsub-topic',
string $subscriptionId = 'dlp_subcription'
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Construct the items to be inspected.
$cloudStorageOptions = (new CloudStorageOptions())
->setFileSet((new FileSet())
->setUrl($gcsUri))
->setBytesLimitPerFile(200)
->setFilesLimitPercent(90)
->setSampleMethod(SampleMethod::RANDOM_START);
$storageConfig = (new StorageConfig())
->setCloudStorageOptions($cloudStorageOptions);
// Specify the type of info the inspection will look for.
$phoneNumberInfoType = (new InfoType())
->setName('PHONE_NUMBER');
$emailAddressInfoType = (new InfoType())
->setName('EMAIL_ADDRESS');
$cardNumberInfoType = (new InfoType())
->setName('CREDIT_CARD_NUMBER');
$infoTypes = [$phoneNumberInfoType, $emailAddressInfoType, $cardNumberInfoType];
// Specify how the content should be inspected.
$inspectConfig = (new InspectConfig())
->setInfoTypes($infoTypes)
->setIncludeQuote(true);
// Construct the action to run when job completes.
$action = (new Action())
->setPubSub((new PublishToPubSub())
->setTopic($topic->name()));
// Construct inspect job config to run.
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request.
$parent = "projects/$callingProjectId/locations/global";
$job = $dlp->createDlpJob($parent, [
'inspectJob' => $inspectJob
]);
// Poll Pub/Sub using exponential backoff until job finishes.
// Consider using an asynchronous execution model such as Cloud Functions.
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$job = $dlp->getDlpJob($job->getName());
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while.
}
}
printf('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds.
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout.
// Print finding counts.
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
printf('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(
' Found %s instance(s) of infoType %s' . PHP_EOL,
$infoTypeStat->getCount(),
$infoTypeStat->getInfoType()->getName()
);
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
printf('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
Python
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import threading
from typing import List
import google.cloud.dlp
import google.cloud.pubsub
def inspect_gcs_with_sampling(
project: str,
bucket: str,
topic_id: str,
subscription_id: str,
info_types: List[str] = None,
file_types: List[str] = None,
min_likelihood: str = None,
max_findings: int = None,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to analyze files in GCS by
limiting the amount of data to be scanned.
Args:
project: The Google Cloud project id to use as a parent resource.
bucket: The name of the GCS bucket containing the file, as a string.
topic_id: The id of the Cloud Pub/Sub topic to which the API will
broadcast job completion. The topic must already exist.
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
while waiting for job completion. The subscription must already
exist and be subscribed to the topic.
info_types: A list of strings representing infoTypes to look for.
A full list of info type categories can be fetched from the API.
file_types: Type of files in gcs bucket where the inspection would happen.
min_likelihood: A string representing the minimum likelihood threshold
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
max_findings: The maximum number of findings to report; 0 = no maximum.
timeout: The number of seconds to wait for a response from the API.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Prepare info_types by converting the list of strings into a list of
# dictionaries.
if not info_types:
info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
info_types = [{"name": info_type} for info_type in info_types]
# Specify how the content should be inspected. Keys which are None may
# optionally be omitted entirely.
inspect_config = {
"info_types": info_types,
"exclude_info_types": True,
"include_quote": True,
"min_likelihood": min_likelihood,
"limits": {"max_findings_per_request": max_findings},
}
# Setting default file types as CSV files
if not file_types:
file_types = ["CSV"]
# Construct a cloud_storage_options dictionary with the bucket's URL.
url = f"gs://{bucket}/*"
storage_config = {
"cloud_storage_options": {
"file_set": {"url": url},
"bytes_limit_per_file": 200,
"file_types": file_types,
"files_limit_percent": 90,
"sample_method": "RANDOM_START",
}
}
# Tell the API where to send a notification when the job is complete.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
actions = [{"pub_sub": {"topic": topic}}]
# Construct the inspect_job, which defines the entire inspect content task.
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
# Convert the project id into full resource ids.
parent = f"projects/{project}/locations/global"
# Call the API
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print(f"Inspection operation started: {operation.name}")
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
# Set up a callback to acknowledge a message. This closes around an event
# so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message):
try:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
if job.inspect_details.result.info_type_stats:
print("Findings:")
for finding in job.inspect_details.result.info_type_stats:
print(
f"Info type: {finding.info_type.name}; Count: {finding.count}"
)
else:
print("No findings.")
# Signal to the main thread that we can exit.
job_done.set()
else:
# This is not the message we're looking for.
message.drop()
except Exception as e:
# Because this is executing in a thread, an exception won't be
# noted unless we print it manually.
print(e)
raise
# Register the callback and wait on the event.
subscriber.subscribe(subscription_path, callback=callback)
finished = job_done.wait(timeout=timeout)
if not finished:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
REST
JSON 输入:
POST https://dlp.googleapis.com/v2/projects/[PROJECT-ID]/dlpJobs?key={YOUR_API_KEY}
{
"inspectJob":{
"storageConfig":{
"cloudStorageOptions":{
"fileSet":{
"url":"gs://[BUCKET-NAME]/*"
},
"bytesLimitPerFile":"200",
"fileTypes":[
"TEXT_FILE"
],
"filesLimitPercent":90,
"sampleMethod":"RANDOM_START"
}
},
"inspectConfig":{
"infoTypes":[
{
"name":"PERSON_NAME"
}
],
"excludeInfoTypes":true,
"includeQuote":true,
"minLikelihood":"POSSIBLE"
},
"actions":[
{
"saveFindings":{
"outputConfig":{
"table":{
"projectId":"[PROJECT-ID]",
"datasetId":"testingdlp"
},
"outputSchema":"BASIC_COLUMNS"
}
}
}
]
}
}
将 POST 请求中的 JSON 输入发送到指定端点后,系统会创建一个敏感数据保护作业,并且 API 会发送以下响应。
JSON 输出:
{
"name":"projects/[PROJECT-ID]/dlpJobs/[JOB-ID]",
"type":"INSPECT_JOB",
"state":"PENDING",
"inspectDetails":{
"requestedOptions":{
"snapshotInspectTemplate":{
},
"jobConfig":{
"storageConfig":{
"cloudStorageOptions":{
"fileSet":{
"url":"gs://[BUCKET_NAME]/*"
},
"bytesLimitPerFile":"200",
"fileTypes":[
"TEXT_FILE"
],
"sampleMethod":"TOP",
"filesLimitPercent":90
}
},
"inspectConfig":{
"infoTypes":[
{
"name":"PERSON_NAME"
}
],
"minLikelihood":"POSSIBLE",
"limits":{
},
"includeQuote":true,
"excludeInfoTypes":true
},
"actions":[
{
"saveFindings":{
"outputConfig":{
"table":{
"projectId":"[PROJECT-ID]",
"datasetId":"[DATASET-ID]",
"tableId":"[TABLE-ID]"
},
"outputSchema":"BASIC_COLUMNS"
}
}
}
]
}
}
},
"createTime":"2018-05-30T22:22:08.279Z"
}
限制 BigQuery 扫描
如需通过限制扫描的数据量来实现 BigQuery 中的采样,请在 BigQueryOptions
中指定以下可选字段:
rowsLimit
:计划扫描的最大行数。如果表的行数大于此值,则忽略其余行。如果未设置此字段,或者设置为 0,则扫描所有行。rowsLimitPercent
:要扫描的行数百分比上限(介于 0 到 100 之间)。其余行会被省略。将此值设置为 0 或 100 表示无限制。默认值为 0。只能指定rowsLimit
和rowsLimitPercent
中的一个。sampleMethod
:在不扫描所有行的情况下,对行进行采样的方法。如果未指定,则从顶部开始扫描。此字段可以设置为以下两个值之一:TOP
:从顶部开始扫描。RANDOM_START
:从随机选择的行开始扫描。
excludedFields
:用于唯一标识要阻止读取的列的表字段。这有助于减少扫描的数据量并降低检查作业的总费用。includedFields
:唯一标识要扫描的表中特定行的表字段。
另一个有助于限制扫描数据量(尤其是在扫描分区表时)的功能是 TimespanConfig
。
TimespanConfig
允许您通过提供开始时间和结束时间值来定义时间范围,以过滤掉 BigQuery 表格的一些行。然后,敏感数据保护服务将仅扫描包含该时间范围内的时间戳的行。
以下示例演示了如何使用 DLP API 扫描 BigQuery 表的一个 1000 行子集。从随机行开始扫描。
Go
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectBigQueryTableWithSampling inspect bigQueries for sensitive data with sampling
func inspectBigQueryTableWithSampling(w io.Writer, projectID, topicID, subscriptionID string) error {
// projectId := "your-project-id"
// topicID := "your-pubsub-topic-id"
// or provide a topicID name to create one
// subscriptionID := "your-pubsub-subscription-id"
// or provide a subscription name to create one
ctx := context.Background()
// Initialize a client once and reuse it to send multiple requests. Clients
// are safe to use across goroutines. When the client is no longer needed,
// call the Close method to cleanup its resources.
client, err := dlp.NewClient(ctx)
if err != nil {
return err
}
// Closing the client safely cleans up background resources.
defer client.Close()
// Specify the BigQuery table to be inspected.
tableReference := &dlppb.BigQueryTable{
ProjectId: "bigquery-public-data",
DatasetId: "usa_names",
TableId: "usa_1910_current",
}
bigQueryOptions := &dlppb.BigQueryOptions{
TableReference: tableReference,
RowsLimit: int64(10000),
SampleMethod: dlppb.BigQueryOptions_RANDOM_START,
IdentifyingFields: []*dlppb.FieldId{
{Name: "name"},
},
}
// Provide storage config with BigqueryOptions
storageConfig := &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_BigQueryOptions{
BigQueryOptions: bigQueryOptions,
},
}
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
infoTypes := []*dlppb.InfoType{
{Name: "PERSON_NAME"},
}
// Specify how the content should be inspected.
inspectConfig := &dlppb.InspectConfig{
InfoTypes: infoTypes,
IncludeQuote: true,
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(topicID)
if exists, err := t.Exists(ctx); err != nil {
return err
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, topicID); err != nil {
return err
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(subscriptionID)
if exists, err := s.Exists(ctx); err != nil {
return err
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return err
}
}
// topic is the PubSub topic string where messages should be sent.
topic := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID)
action := &dlppb.Action{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
}
// Configure the long running job we want the service to perform.
inspectJobConfig := &dlppb.InspectJobConfig{
StorageConfig: storageConfig,
InspectConfig: inspectConfig,
Actions: []*dlppb.Action{
action,
},
}
// Create the request for the job configured above.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: inspectJobConfig,
},
}
// Use the client to send the request.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return err
}
fmt.Fprintf(w, "Job Created: %v", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
c, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(c, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
})
if err != nil {
return err
}
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
return err
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
return err
}
for _, s := range r {
fmt.Fprintf(w, "\nFound %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
return nil
}
Java
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.BigQueryOptions;
import com.google.privacy.dlp.v2.BigQueryOptions.SampleMethod;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.FieldId;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class InspectBigQueryTableWithSampling {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectBigQueryTableWithSampling(projectId, topicId, subscriptionId);
}
// Inspects a BigQuery Table
public static void inspectBigQueryTableWithSampling(
String projectId, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlp = DlpServiceClient.create()) {
// Specify the BigQuery table to be inspected.
BigQueryTable tableReference =
BigQueryTable.newBuilder()
.setProjectId("bigquery-public-data")
.setDatasetId("usa_names")
.setTableId("usa_1910_current")
.build();
BigQueryOptions bigQueryOptions =
BigQueryOptions.newBuilder()
.setTableReference(tableReference)
.setRowsLimit(1000)
.setSampleMethod(SampleMethod.RANDOM_START)
.addIdentifyingFields(FieldId.newBuilder().setName("name"))
.build();
StorageConfig storageConfig =
StorageConfig.newBuilder().setBigQueryOptions(bigQueryOptions).build();
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
InfoType infoType = InfoType.newBuilder().setName("PERSON_NAME").build();
// Specify how the content should be inspected.
InspectConfig inspectConfig =
InspectConfig.newBuilder().addInfoTypes(infoType).setIncludeQuote(true).build();
// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the long running job we want the service to perform.
InspectJobConfig inspectJobConfig =
InspectJobConfig.newBuilder()
.setStorageConfig(storageConfig)
.setInspectConfig(inspectConfig)
.addActions(action)
.build();
// Create the request for the job configured above.
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setInspectJob(inspectJobConfig)
.build();
// Use the client to send the request.
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);
// Parse the response and process results.
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
System.out.println("Findings: ");
for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
System.out.println("\tCount: " + infoTypeStat.getCount());
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
Node.js
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const dataProjectId = 'my-project';
// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';
// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
// DLP Job max time (in milliseconds)
const DLP_JOB_WAIT_TIME = 15 * 1000 * 60;
async function inspectBigqueryWithSampling() {
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
const infoTypes = [{name: 'PERSON_NAME'}];
// Specify the BigQuery options required for inspection.
const storageItem = {
bigQueryOptions: {
tableReference: {
projectId: dataProjectId,
datasetId: datasetId,
tableId: tableId,
},
rowsLimit: 1000,
sampleMethod:
DLP.protos.google.privacy.dlp.v2.BigQueryOptions.SampleMethod
.RANDOM_START,
includedFields: [{name: 'name'}],
},
};
// Specify the action that is triggered when the job completes.
const actions = [
{
pubSub: {
topic: `projects/${projectId}/topics/${topicId}`,
},
},
];
// Construct request for creating an inspect job
const request = {
parent: `projects/${projectId}/locations/global`,
inspectJob: {
inspectConfig: {
infoTypes: infoTypes,
includeQuote: true,
},
storageConfig: storageItem,
actions: actions,
},
};
// Use the client to send the request.
const [topicResponse] = await pubsub.topic(topicId).get();
// Verify the Pub/Sub topic and listen for job notifications via an
// existing subscription.
const subscription = await topicResponse.subscription(subscriptionId);
const [jobsResponse] = await dlp.createDlpJob(request);
const jobName = jobsResponse.name;
// Watch the Pub/Sub topic until the DLP job finishes
await new Promise((resolve, reject) => {
// Set up the timeout
const timer = setTimeout(() => {
reject(new Error('Timeout'));
}, DLP_JOB_WAIT_TIME);
const messageHandler = message => {
if (message.attributes && message.attributes.DlpJobName === jobName) {
message.ack();
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
clearTimeout(timer);
resolve(jobName);
} else {
message.nack();
}
};
const errorHandler = err => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
clearTimeout(timer);
reject(err);
};
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
});
const [job] = await dlp.getDlpJob({name: jobName});
console.log(`Job ${job.name} status: ${job.state}`);
const infoTypeStats = job.inspectDetails.result.infoTypeStats;
if (infoTypeStats.length > 0) {
infoTypeStats.forEach(infoTypeStat => {
console.log(
` Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
);
});
} else {
console.log('No findings.');
}
}
await inspectBigqueryWithSampling();
PHP
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\Dlp\V2\DlpServiceClient;
use Google\Cloud\Dlp\V2\BigQueryOptions;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryOptions\SampleMethod;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Inspect BigQuery for sensitive data with sampling.
* The following examples demonstrate using the Cloud Data Loss Prevention
* API to scan a 1000-row subset of a BigQuery table. The scan starts from
* a random row.
*
* @param string $callingProjectId The project ID to run the API call under.
* @param string $topicId The Pub/Sub topic ID to notify once the job is completed.
* @param string $subscriptionId The Pub/Sub subscription ID to use when listening for job.
* @param string $projectId The Google Cloud Project ID.
* @param string $datasetId The BigQuery Dataset ID.
* @param string $tableId The BigQuery Table ID to be inspected.
*/
function inspect_bigquery_with_sampling(
string $callingProjectId,
string $topicId,
string $subscriptionId,
string $projectId,
string $datasetId,
string $tableId
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Specify the BigQuery table to be inspected.
$bigqueryTable = (new BigQueryTable())
->setProjectId($projectId)
->setDatasetId($datasetId)
->setTableId($tableId);
$bigQueryOptions = (new BigQueryOptions())
->setTableReference($bigqueryTable)
->setRowsLimit(1000)
->setSampleMethod(SampleMethod::RANDOM_START)
->setIdentifyingFields([
(new FieldId())
->setName('name')
]);
$storageConfig = (new StorageConfig())
->setBigQueryOptions($bigQueryOptions);
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
$personNameInfoType = (new InfoType())
->setName('PERSON_NAME');
$infoTypes = [$personNameInfoType];
// Specify how the content should be inspected.
$inspectConfig = (new InspectConfig())
->setInfoTypes($infoTypes)
->setIncludeQuote(true);
// Specify the action that is triggered when the job completes.
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Configure the long running job we want the service to perform.
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$job = $dlp->createDlpJob($parent, [
'inspectJob' => $inspectJob
]);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$job = $dlp->getDlpJob($job->getName());
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
printf('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
printf('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(
' Found %s instance(s) of infoType %s' . PHP_EOL,
$infoTypeStat->getCount(),
$infoTypeStat->getInfoType()->getName()
);
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
printf('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
Python
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import threading
import google.cloud.dlp
import google.cloud.pubsub
def inspect_bigquery_table_with_sampling(
project: str,
topic_id: str,
subscription_id: str,
min_likelihood: str = None,
max_findings: str = None,
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to analyze BigQuery data by limiting
the amount of data to be scanned.
Args:
project: The Google Cloud project id to use as a parent resource.
topic_id: The id of the Cloud Pub/Sub topic to which the API will
broadcast job completion. The topic must already exist.
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
while waiting for job completion. The subscription must already
exist and be subscribed to the topic.
min_likelihood: A string representing the minimum likelihood threshold
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
max_findings: The maximum number of findings to report; 0 = no maximum.
timeout: The number of seconds to wait for a response from the API.
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Specify how the content should be inspected. Keys which are None may
# optionally be omitted entirely.
inspect_config = {
"info_types": [{"name": "PERSON_NAME"}],
"min_likelihood": min_likelihood,
"limits": {"max_findings_per_request": max_findings},
"include_quote": True,
}
# Specify the BigQuery table to be inspected.
# Here we are using public bigquery table.
table_reference = {
"project_id": "bigquery-public-data",
"dataset_id": "usa_names",
"table_id": "usa_1910_current",
}
# Construct a storage_config containing the target BigQuery info.
storage_config = {
"big_query_options": {
"table_reference": table_reference,
"rows_limit": 1000,
"sample_method": "RANDOM_START",
"identifying_fields": [{"name": "name"}],
}
}
# Tell the API where to send a notification when the job is complete.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
actions = [{"pub_sub": {"topic": topic}}]
# Construct the inspect_job, which defines the entire inspect content task.
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
# Convert the project id into full resource ids.
parent = f"projects/{project}/locations/global"
# Call the API
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print(f"Inspection operation started: {operation.name}")
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
# Set up a callback to acknowledge a message. This closes around an event
# so that it can signal that it is done and the main thread can continue.
job_done = threading.Event()
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
try:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
if job.inspect_details.result.info_type_stats:
for finding in job.inspect_details.result.info_type_stats:
print(
f"Info type: {finding.info_type.name}; Count: {finding.count}"
)
else:
print("No findings.")
# Signal to the main thread that we can exit.
job_done.set()
else:
# This is not the message we're looking for.
message.drop()
except Exception as e:
# Because this is executing in a thread, an exception won't be
# noted unless we print it manually.
print(e)
raise
# Register the callback and wait on the event.
subscriber.subscribe(subscription_path, callback=callback)
finished = job_done.wait(timeout=timeout)
if not finished:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
C#
如需了解如何安装和使用用于敏感数据保护的客户端库,请参阅敏感数据保护客户端库。
如需向敏感数据保护服务进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Dlp.V2;
using Google.Cloud.PubSub.V1;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;
public class InspectBigQueryWithSampling
{
public static async Task<DlpJob> InspectAsync(
string projectId,
int maxFindings,
bool includeQuote,
string topicId,
string subId,
Likelihood minLikelihood = Likelihood.Possible,
IEnumerable<FieldId> identifyingFields = null,
IEnumerable<InfoType> infoTypes = null)
{
// Instantiate the dlp client.
var dlp = DlpServiceClient.Create();
// Construct Storage config.
var storageConfig = new StorageConfig
{
BigQueryOptions = new BigQueryOptions
{
TableReference = new BigQueryTable
{
ProjectId = "bigquery-public-data",
DatasetId = "usa_names",
TableId = "usa_1910_current",
},
IdentifyingFields =
{
identifyingFields ?? new FieldId[] { new FieldId { Name = "name" } }
},
RowsLimit = 100,
SampleMethod = BigQueryOptions.Types.SampleMethod.RandomStart
}
};
// Construct the inspect config.
var inspectConfig = new InspectConfig
{
InfoTypes = { infoTypes ?? new InfoType[] { new InfoType { Name = "PERSON_NAME" } } },
Limits = new FindingLimits
{
MaxFindingsPerRequest = maxFindings,
},
IncludeQuote = includeQuote,
MinLikelihood = minLikelihood
};
// Construct the pubsub action.
var actions = new Action[]
{
new Action
{
PubSub = new Action.Types.PublishToPubSub
{
Topic = $"projects/{projectId}/topics/{topicId}"
}
}
};
// Construct the inspect job config using the actions.
var inspectJob = new InspectJobConfig
{
StorageConfig = storageConfig,
InspectConfig = inspectConfig,
Actions = { actions }
};
// Issue Create Dlp Job Request.
var request = new CreateDlpJobRequest
{
InspectJob = inspectJob,
ParentAsLocationName = new LocationName(projectId, "global"),
};
// We keep the name of the job that we just created.
var dlpJob = dlp.CreateDlpJob(request);
var jobName = dlpJob.Name;
// Listen to pub/sub for the job.
var subscriptionName = new SubscriptionName(projectId, subId);
var subscriber = await SubscriberClient.CreateAsync(
subscriptionName);
// SimpleSubscriber runs your message handle function on multiple threads to maximize throughput.
await subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
if (message.Attributes["DlpJobName"] == jobName)
{
subscriber.StopAsync(cancel);
return Task.FromResult(SubscriberClient.Reply.Ack);
}
else
{
return Task.FromResult(SubscriberClient.Reply.Nack);
}
});
// Get the latest state of the job from the service.
var resultJob = dlp.GetDlpJob(new GetDlpJobRequest
{
DlpJobName = DlpJobName.Parse(jobName)
});
// Parse the response and process results.
System.Console.WriteLine($"Job status: {resultJob.State}");
System.Console.WriteLine($"Job Name: {resultJob.Name}");
var result = resultJob.InspectDetails.Result;
foreach (var infoType in result.InfoTypeStats)
{
System.Console.WriteLine($"Info Type: {infoType.InfoType.Name}");
System.Console.WriteLine($"Count: {infoType.Count}");
}
return resultJob;
}
}
REST
JSON 输入:
POST https://dlp.googleapis.com/v2/projects/[PROJECT-ID]/dlpJobs?key={YOUR_API_KEY}
{
"inspectJob":{
"storageConfig":{
"bigQueryOptions":{
"tableReference":{
"projectId":"bigquery-public-data",
"datasetId":"usa_names",
"tableId":"usa_1910_current"
},
"rowsLimit":"1000",
"sampleMethod":"RANDOM_START",
"includedFields":[
{
"name":"name"
}
]
}
},
"inspectConfig":{
"infoTypes":[
{
"name":"FIRST_NAME"
}
],
"includeQuote":true
},
"actions":[
{
"saveFindings":{
"outputConfig":{
"table":{
"projectId":"[PROJECT-ID]",
"datasetId":"testingdlp",
"tableId":"bqsample3"
},
"outputSchema":"BASIC_COLUMNS"
}
}
}
]
}
}
将 POST 请求中的 JSON 输入发送到指定端点后,系统会创建一个敏感数据保护作业,并且 API 会发送以下响应。
JSON 输出:
{
"name": "projects/[PROJECT-ID]/dlpJobs/[JOB-ID]",
"type": "INSPECT_JOB",
"state": "PENDING",
"inspectDetails": {
"requestedOptions": {
"snapshotInspectTemplate": {},
"jobConfig": {
"storageConfig": {
"bigQueryOptions": {
"tableReference": {
"projectId": "bigquery-public-data",
"datasetId": "usa_names",
"tableId": "usa_1910_current"
},
"rowsLimit": "1000",
"sampleMethod": "RANDOM_START",
"includedFields": [
{
"name": "name"
}
]
}
},
"inspectConfig": {
"infoTypes": [
{
"name": "FIRST_NAME"
}
],
"limits": {},
"includeQuote": true
},
"actions": [
{
"saveFindings": {
"outputConfig": {
"table": {
"projectId": "[PROJECT-ID]",
"datasetId": "[DATASET-ID]",
"tableId": "bqsample"
},
"outputSchema": "BASIC_COLUMNS"
}
}
}
]
}
},
"result": {}
},
"createTime": "2022-11-04T18:53:48.350Z"
}
检查作业完成运行并且其结果已由 BigQuery 处理后,指定的 BigQuery 输出表格中会提供扫描结果。如需详细了解如何检索检查结果,请参阅下一部分。
检索检查结果
您可以使用 projects.dlpJobs.get
方法检索 DlpJob
的摘要。返回的 DlpJob
包含其 InspectDataSourceDetails
对象,该对象包含作业配置的摘要 (RequestedOptions
) 和作业结果的摘要 (Result
)。结果摘要包括以下项:
processedBytes
:已处理的总大小(以字节为单位)。totalEstimatedBytes
:估计要处理的剩余字节数。InfoTypeStatistics
对象:在检查作业期间找到的每个 infoType 的实例数的统计信息。
您可以通过多种方式查看完整的检查作业结果。根据您选择的 Action
,检查作业会:
- 保存到 BigQuery(
SaveFindings
对象)的指定表中。在查看或分析结果之前,首先使用projects.dlpJobs.get
方法(如下所述)确保作业已完成。请注意,您可以使用OutputSchema
对象指定用于存储结果的架构。 - 发布到 Pub/Sub 主题(
PublishToPubSub
对象)。该主题必须向运行用于发送通知的DlpJob
的敏感数据保护服务帐号授予发布权限。 - 已发布到 Security Command Center。
- 已发布到 Data Catalog。
- 发布到 Cloud Monitoring。
如需过滤敏感数据保护生成的大量数据,您可以使用内置的 BigQuery 工具运行丰富的 SQL 分析,也可以使用 Looker Studio 等工具生成报告。如需了解详情,请参阅分析和报告敏感数据保护发现结果。如需了解某些示例查询,请参阅在 BigQuery 中查询结果。
如果向敏感数据保护发送存储区检查请求,则会创建并运行 DlpJob
对象实例作为响应。这些作业的运行可能需要几秒、几分钟或几小时,具体取决于您的数据大小和指定的配置。如果选择发布到 Pub/Sub 主题(通过在 Action
中指定 PublishToPubSub
),则会在作业状态更改时自动向具有指定名称的主题发送通知。Pub/Sub 主题的名称以 projects/[PROJECT-ID]/topics/[PUBSUB-TOPIC-NAME]
格式指定。
您可以使用以下管理方法完全控制自己创建的作业:
projects.dlpJobs.cancel
方法:停止当前正在进行的作业。服务器会尽可能取消作业,但不能保证一定成功。作业及其配置将保留,直到您将其删除。projects.dlpJobs.delete
方法:删除作业及其配置。projects.dlpJobs.get
方法:检索单个作业并返回其状态、配置和摘要结果(作业完成后)。projects.dlpJobs.list
方法:检索所有作业的列表。此方法还包含过滤结果的功能。
后续步骤
- 如需详细了解如何创建存储空间检查作业,请参阅创建和安排敏感数据保护检查作业。
- 详细了解如何在存储空间中创建去标识化的数据副本。
- 如需详细了解检查 Cloud Storage 存储分区时支持的文件类型,请参阅支持的文件类型。