Inspect BigQuery for sensitive data

Demonstrates finding sensitive data that is stored in BigQuery.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C#

To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


using Google.Api.Gax.ResourceNames;
using Google.Cloud.BigQuery.V2;
using Google.Cloud.Dlp.V2;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Collections.Generic;
using System.Threading;
using static Google.Cloud.Dlp.V2.InspectConfig.Types;

public class InspectBigQuery
{
    public static object Inspect(
        string projectId,
        Likelihood minLikelihood,
        int maxFindings,
        bool includeQuote,
        IEnumerable<FieldId> identifyingFields,
        IEnumerable<InfoType> infoTypes,
        IEnumerable<CustomInfoType> customInfoTypes,
        string datasetId,
        string tableId)
    {
        var inspectJob = new InspectJobConfig
        {
            StorageConfig = new StorageConfig
            {
                BigQueryOptions = new BigQueryOptions
                {
                    TableReference = new Google.Cloud.Dlp.V2.BigQueryTable
                    {
                        ProjectId = projectId,
                        DatasetId = datasetId,
                        TableId = tableId,
                    },
                    IdentifyingFields =
                        {
                            identifyingFields
                        }
                },

                TimespanConfig = new StorageConfig.Types.TimespanConfig
                {
                    StartTime = Timestamp.FromDateTime(System.DateTime.UtcNow.AddYears(-1)),
                    EndTime = Timestamp.FromDateTime(System.DateTime.UtcNow)
                }
            },

            InspectConfig = new InspectConfig
            {
                InfoTypes = { infoTypes },
                CustomInfoTypes = { customInfoTypes },
                Limits = new FindingLimits
                {
                    MaxFindingsPerRequest = maxFindings
                },
                ExcludeInfoTypes = false,
                IncludeQuote = includeQuote,
                MinLikelihood = minLikelihood
            },
            Actions =
                {
                    new Google.Cloud.Dlp.V2.Action
                    {
                        // Save results in BigQuery Table
                        SaveFindings = new Google.Cloud.Dlp.V2.Action.Types.SaveFindings
                        {
                            OutputConfig = new OutputStorageConfig
                            {
                                Table = new Google.Cloud.Dlp.V2.BigQueryTable
                                {
                                    ProjectId = projectId,
                                    DatasetId = datasetId,
                                    TableId = tableId
                                }
                            }
                        },
                    }
                }
        };

        // Issue Create Dlp Job Request
        var client = DlpServiceClient.Create();
        var request = new CreateDlpJobRequest
        {
            InspectJob = inspectJob,
            Parent = new LocationName(projectId, "global").ToString(),
        };

        // We need created job name
        var dlpJob = client.CreateDlpJob(request);
        var jobName = dlpJob.Name;

        // Make sure the job finishes before inspecting the results.
        // Alternatively, we can inspect results opportunistically, but
        // for testing purposes, we want consistent outcome
        var finishedJob = EnsureJobFinishes(projectId, jobName);
        var bigQueryClient = BigQueryClient.Create(projectId);
        var table = bigQueryClient.GetTable(datasetId, tableId);

        // Return only first page of 10 rows
        Console.WriteLine("DLP v2 Results:");
        var firstPage = table.ListRows(new ListRowsOptions { StartIndex = 0, PageSize = 10 });
        foreach (var item in firstPage)
        {
            Console.WriteLine($"\t {item[""]}");
        }

        return finishedJob;
    }

    private static DlpJob EnsureJobFinishes(string projectId, string jobName)
    {
        var client = DlpServiceClient.Create();
        var request = new GetDlpJobRequest
        {
            DlpJobName = new DlpJobName(projectId, jobName),
        };

        // Simple logic that gives the job 5*30 sec at most to complete - for testing purposes only
        var numOfAttempts = 5;
        do
        {
            var dlpJob = client.GetDlpJob(request);
            numOfAttempts--;
            if (dlpJob.State != DlpJob.Types.JobState.Running)
            {
                return dlpJob;
            }

            Thread.Sleep(TimeSpan.FromSeconds(30));
        } while (numOfAttempts > 0);

        throw new InvalidOperationException("Job did not complete in time");
    }
}

Go

To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import (
	"context"
	"fmt"
	"io"
	"strings"
	"time"

	dlp "cloud.google.com/go/dlp/apiv2"
	"cloud.google.com/go/dlp/apiv2/dlppb"
	"cloud.google.com/go/pubsub"
)

// inspectBigquery searches for the given info types in the given Bigquery dataset table.
func inspectBigquery(w io.Writer, projectID string, infoTypeNames []string, customDictionaries []string, customRegexes []string, pubSubTopic, pubSubSub, dataProject, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// infoTypeNames := []string{"US_SOCIAL_SECURITY_NUMBER"}
	// customDictionaries := []string{...}
	// customRegexes := []string{...}
	// pubSubTopic := "dlp-risk-sample-topic"
	// pubSubSub := "dlp-risk-sample-sub"
	// dataProject := "my-data-project-ID"
	// datasetID := "my_dataset"
	// tableID := "mytable"

	ctx := context.Background()

	client, err := dlp.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("dlp.NewClient: %w", err)
	}

	// Convert the info type strings to a list of InfoTypes.
	var infoTypes []*dlppb.InfoType
	for _, it := range infoTypeNames {
		infoTypes = append(infoTypes, &dlppb.InfoType{Name: it})
	}
	// Convert the custom dictionary word lists and custom regexes to a list of CustomInfoTypes.
	var customInfoTypes []*dlppb.CustomInfoType
	for idx, it := range customDictionaries {
		customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
			InfoType: &dlppb.InfoType{
				Name: fmt.Sprintf("CUSTOM_DICTIONARY_%d", idx),
			},
			Type: &dlppb.CustomInfoType_Dictionary_{
				Dictionary: &dlppb.CustomInfoType_Dictionary{
					Source: &dlppb.CustomInfoType_Dictionary_WordList_{
						WordList: &dlppb.CustomInfoType_Dictionary_WordList{
							Words: strings.Split(it, ","),
						},
					},
				},
			},
		})
	}
	for idx, it := range customRegexes {
		customInfoTypes = append(customInfoTypes, &dlppb.CustomInfoType{
			InfoType: &dlppb.InfoType{
				Name: fmt.Sprintf("CUSTOM_REGEX_%d", idx),
			},
			Type: &dlppb.CustomInfoType_Regex_{
				Regex: &dlppb.CustomInfoType_Regex{
					Pattern: it,
				},
			},
		})
	}

	// Create a PubSub Client used to listen for when the inspect job finishes.
	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer pubsubClient.Close()

	// Create a PubSub subscription we can use to listen for messages.
	// Create the Topic if it doesn't exist.
	t := pubsubClient.Topic(pubSubTopic)
	if exists, err := t.Exists(ctx); err != nil {
		return fmt.Errorf("t.Exists: %w", err)
	} else if !exists {
		if t, err = pubsubClient.CreateTopic(ctx, pubSubTopic); err != nil {
			return fmt.Errorf("CreateTopic: %w", err)
		}
	}

	// Create the Subscription if it doesn't exist.
	s := pubsubClient.Subscription(pubSubSub)
	if exists, err := s.Exists(ctx); err != nil {
		return fmt.Errorf("s.Exits: %w", err)
	} else if !exists {
		if s, err = pubsubClient.CreateSubscription(ctx, pubSubSub, pubsub.SubscriptionConfig{Topic: t}); err != nil {
			return fmt.Errorf("CreateSubscription: %w", err)
		}
	}

	// topic is the PubSub topic string where messages should be sent.
	topic := "projects/" + projectID + "/topics/" + pubSubTopic

	// Create a configured request.
	req := &dlppb.CreateDlpJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
		Job: &dlppb.CreateDlpJobRequest_InspectJob{
			InspectJob: &dlppb.InspectJobConfig{
				// StorageConfig describes where to find the data.
				StorageConfig: &dlppb.StorageConfig{
					Type: &dlppb.StorageConfig_BigQueryOptions{
						BigQueryOptions: &dlppb.BigQueryOptions{
							TableReference: &dlppb.BigQueryTable{
								ProjectId: dataProject,
								DatasetId: datasetID,
								TableId:   tableID,
							},
						},
					},
				},
				// InspectConfig describes what fields to look for.
				InspectConfig: &dlppb.InspectConfig{
					InfoTypes:       infoTypes,
					CustomInfoTypes: customInfoTypes,
					MinLikelihood:   dlppb.Likelihood_POSSIBLE,
					Limits: &dlppb.InspectConfig_FindingLimits{
						MaxFindingsPerRequest: 10,
					},
					IncludeQuote: true,
				},
				// Send a message to PubSub using Actions.
				Actions: []*dlppb.Action{
					{
						Action: &dlppb.Action_PubSub{
							PubSub: &dlppb.Action_PublishToPubSub{
								Topic: topic,
							},
						},
					},
				},
			},
		},
	}
	// Create the inspect job.
	j, err := client.CreateDlpJob(ctx, req)
	if err != nil {
		return fmt.Errorf("CreateDlpJob: %w", err)
	}
	fmt.Fprintf(w, "Created job: %v\n", j.GetName())

	// Wait for the inspect job to finish by waiting for a PubSub message.
	// This only waits for 10 minutes. For long jobs, consider using a truly
	// asynchronous execution model such as Cloud Functions.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
	defer cancel()
	err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// If this is the wrong job, do not process the result.
		if msg.Attributes["DlpJobName"] != j.GetName() {
			msg.Nack()
			return
		}
		msg.Ack()

		// Stop listening for more messages.
		defer cancel()

		resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
			Name: j.GetName(),
		})
		if err != nil {
			fmt.Fprintf(w, "Error getting completed job: %v\n", err)
			return
		}
		r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
		if len(r) == 0 {
			fmt.Fprintf(w, "No results")
			return
		}
		for _, s := range r {
			fmt.Fprintf(w, "  Found %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %w", err)
	}
	return nil
}

Java

To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.BigQueryOptions;
import com.google.privacy.dlp.v2.BigQueryTable;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DlpJob;
import com.google.privacy.dlp.v2.GetDlpJobRequest;
import com.google.privacy.dlp.v2.InfoType;
import com.google.privacy.dlp.v2.InfoTypeStats;
import com.google.privacy.dlp.v2.InspectConfig;
import com.google.privacy.dlp.v2.InspectDataSourceDetails;
import com.google.privacy.dlp.v2.InspectJobConfig;
import com.google.privacy.dlp.v2.LocationName;
import com.google.privacy.dlp.v2.StorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class InspectBigQueryTable {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String bigQueryDatasetId = "your-bigquery-dataset-id";
    String bigQueryTableId = "your-bigquery-table-id";
    String topicId = "your-pubsub-topic-id";
    String subscriptionId = "your-pubsub-subscription-id";
    inspectBigQueryTable(projectId, bigQueryDatasetId, bigQueryTableId, topicId, subscriptionId);
  }

  // Inspects a BigQuery Table
  public static void inspectBigQueryTable(
      String projectId,
      String bigQueryDatasetId,
      String bigQueryTableId,
      String topicId,
      String subscriptionId)
      throws ExecutionException, InterruptedException, IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (DlpServiceClient dlp = DlpServiceClient.create()) {
      // Specify the BigQuery table to be inspected.
      BigQueryTable tableReference =
          BigQueryTable.newBuilder()
              .setProjectId(projectId)
              .setDatasetId(bigQueryDatasetId)
              .setTableId(bigQueryTableId)
              .build();

      BigQueryOptions bigQueryOptions =
          BigQueryOptions.newBuilder().setTableReference(tableReference).build();

      StorageConfig storageConfig =
          StorageConfig.newBuilder().setBigQueryOptions(bigQueryOptions).build();

      // Specify the type of info the inspection will look for.
      // See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
      List<InfoType> infoTypes =
          Stream.of("PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD_NUMBER")
              .map(it -> InfoType.newBuilder().setName(it).build())
              .collect(Collectors.toList());

      // Specify how the content should be inspected.
      InspectConfig inspectConfig =
          InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

      // Specify the action that is triggered when the job completes.
      String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
      Action.PublishToPubSub publishToPubSub =
          Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
      Action action = Action.newBuilder().setPubSub(publishToPubSub).build();

      // Configure the long running job we want the service to perform.
      InspectJobConfig inspectJobConfig =
          InspectJobConfig.newBuilder()
              .setStorageConfig(storageConfig)
              .setInspectConfig(inspectConfig)
              .addActions(action)
              .build();

      // Create the request for the job configured above.
      CreateDlpJobRequest createDlpJobRequest =
          CreateDlpJobRequest.newBuilder()
              .setParent(LocationName.of(projectId, "global").toString())
              .setInspectJob(inspectJobConfig)
              .build();

      // Use the client to send the request.
      final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
      System.out.println("Job created: " + dlpJob.getName());

      // Set up a Pub/Sub subscriber to listen on the job completion status
      final SettableApiFuture<Boolean> done = SettableApiFuture.create();

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      MessageReceiver messageHandler =
          (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
            handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
          };
      Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
      subscriber.startAsync();

      // Wait for job completion semi-synchronously
      // For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
      try {
        done.get(15, TimeUnit.MINUTES);
      } catch (TimeoutException e) {
        System.out.println("Job was not completed after 15 minutes.");
        return;
      } finally {
        subscriber.stopAsync();
        subscriber.awaitTerminated();
      }

      // Get the latest state of the job from the service
      GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
      DlpJob completedJob = dlp.getDlpJob(request);

      // Parse the response and process results.
      System.out.println("Job status: " + completedJob.getState());
      System.out.println("Job name: " + dlpJob.getName());
      InspectDataSourceDetails.Result result = completedJob.getInspectDetails().getResult();
      System.out.println("Findings: ");
      for (InfoTypeStats infoTypeStat : result.getInfoTypeStatsList()) {
        System.out.print("\tInfo type: " + infoTypeStat.getInfoType().getName());
        System.out.println("\tCount: " + infoTypeStat.getCount());
      }
    }
  }

  // handleMessage injects the job and settableFuture into the message reciever interface
  private static void handleMessage(
      DlpJob job,
      SettableApiFuture<Boolean> done,
      PubsubMessage pubsubMessage,
      AckReplyConsumer ackReplyConsumer) {
    String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
    if (job.getName().equals(messageAttribute)) {
      done.set(true);
      ackReplyConsumer.ack();
    } else {
      ackReplyConsumer.nack();
    }
  }
}

Node.js

To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

// Import the Google Cloud client libraries
const DLP = require('@google-cloud/dlp');
const {PubSub} = require('@google-cloud/pubsub');

// Instantiates clients
const dlp = new DLP.DlpServiceClient();
const pubsub = new PubSub();

// The project ID to run the API call under
// const projectId = 'my-project';

// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const dataProjectId = 'my-project';

// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';

// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';

// The minimum likelihood required before returning a match
// const minLikelihood = 'LIKELIHOOD_UNSPECIFIED';

// The maximum number of findings to report per request (0 = server maximum)
// const maxFindings = 0;

// The infoTypes of information to match
// const infoTypes = [{ name: 'PHONE_NUMBER' }, { name: 'EMAIL_ADDRESS' }, { name: 'CREDIT_CARD_NUMBER' }];

// The customInfoTypes of information to match
// const customInfoTypes = [{ infoType: { name: 'DICT_TYPE' }, dictionary: { wordList: { words: ['foo', 'bar', 'baz']}}},
//   { infoType: { name: 'REGEX_TYPE' }, regex: {pattern: '\\(\\d{3}\\) \\d{3}-\\d{4}'}}];

// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'

// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'

async function inspectBigquery() {
  // Construct item to be inspected
  const storageItem = {
    bigQueryOptions: {
      tableReference: {
        projectId: dataProjectId,
        datasetId: datasetId,
        tableId: tableId,
      },
    },
  };

  // Construct request for creating an inspect job
  const request = {
    parent: `projects/${projectId}/locations/global`,
    inspectJob: {
      inspectConfig: {
        infoTypes: infoTypes,
        customInfoTypes: customInfoTypes,
        minLikelihood: minLikelihood,
        limits: {
          maxFindingsPerRequest: maxFindings,
        },
      },
      storageConfig: storageItem,
      actions: [
        {
          pubSub: {
            topic: `projects/${projectId}/topics/${topicId}`,
          },
        },
      ],
    },
  };

  // Run inspect-job creation request
  const [topicResponse] = await pubsub.topic(topicId).get();
  // Verify the Pub/Sub topic and listen for job notifications via an
  // existing subscription.
  const subscription = await topicResponse.subscription(subscriptionId);
  const [jobsResponse] = await dlp.createDlpJob(request);
  const jobName = jobsResponse.name;
  // Watch the Pub/Sub topic until the DLP job finishes
  await new Promise((resolve, reject) => {
    const messageHandler = message => {
      if (message.attributes && message.attributes.DlpJobName === jobName) {
        message.ack();
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
        resolve(jobName);
      } else {
        message.nack();
      }
    };

    const errorHandler = err => {
      subscription.removeListener('message', messageHandler);
      subscription.removeListener('error', errorHandler);
      reject(err);
    };

    subscription.on('message', messageHandler);
    subscription.on('error', errorHandler);
  });
  // Wait for DLP job to fully complete
  setTimeout(() => {
    console.log('Waiting for DLP job to fully complete');
  }, 500);
  const [job] = await dlp.getDlpJob({name: jobName});
  console.log(`Job ${job.name} status: ${job.state}`);

  const infoTypeStats = job.inspectDetails.result.infoTypeStats;
  if (infoTypeStats.length > 0) {
    infoTypeStats.forEach(infoTypeStat => {
      console.log(
        `  Found ${infoTypeStat.count} instance(s) of infoType ${infoTypeStat.infoType.name}.`
      );
    });
  } else {
    console.log('No findings.');
  }
}

await inspectBigquery();

PHP

To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryOptions;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\InfoType;
use Google\Cloud\Dlp\V2\InspectConfig;
use Google\Cloud\Dlp\V2\InspectConfig\FindingLimits;
use Google\Cloud\Dlp\V2\InspectJobConfig;
use Google\Cloud\Dlp\V2\Likelihood;
use Google\Cloud\Dlp\V2\StorageConfig;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Inspect a BigQuery table , using Pub/Sub for job status notifications.
 *
 * @param string $callingProjectId  The project ID to run the API call under
 * @param string $dataProjectId     The project ID containing the target Datastore
 * @param string $topicId           The name of the Pub/Sub topic to notify once the job completes
 * @param string $subscriptionId    The name of the Pub/Sub subscription to use when listening for job
 * @param string $datasetId         The ID of the dataset to inspect
 * @param string $tableId           The ID of the table to inspect
 * @param int    $maxFindings       (Optional) The maximum number of findings to report per request (0 = server maximum)
 */
function inspect_bigquery(
    string $callingProjectId,
    string $dataProjectId,
    string $topicId,
    string $subscriptionId,
    string $datasetId,
    string $tableId,
    int $maxFindings = 0
): void {
    // Instantiate a client.
    $dlp = new DlpServiceClient();
    $pubsub = new PubSubClient();
    $topic = $pubsub->topic($topicId);

    // The infoTypes of information to match
    $personNameInfoType = (new InfoType())
        ->setName('PERSON_NAME');
    $creditCardNumberInfoType = (new InfoType())
        ->setName('CREDIT_CARD_NUMBER');
    $infoTypes = [$personNameInfoType, $creditCardNumberInfoType];

    // The minimum likelihood required before returning a match
    $minLikelihood = likelihood::LIKELIHOOD_UNSPECIFIED;

    // Specify finding limits
    $limits = (new FindingLimits())
        ->setMaxFindingsPerRequest($maxFindings);

    // Construct items to be inspected
    $bigqueryTable = (new BigQueryTable())
        ->setProjectId($dataProjectId)
        ->setDatasetId($datasetId)
        ->setTableId($tableId);

    $bigQueryOptions = (new BigQueryOptions())
        ->setTableReference($bigqueryTable);

    $storageConfig = (new StorageConfig())
        ->setBigQueryOptions($bigQueryOptions);

    // Construct the inspect config object
    $inspectConfig = (new InspectConfig())
        ->setMinLikelihood($minLikelihood)
        ->setLimits($limits)
        ->setInfoTypes($infoTypes);

    // Construct the action to run when job completes
    $pubSubAction = (new PublishToPubSub())
        ->setTopic($topic->name());

    $action = (new Action())
        ->setPubSub($pubSubAction);

    // Construct inspect job config to run
    $inspectJob = (new InspectJobConfig())
        ->setInspectConfig($inspectConfig)
        ->setStorageConfig($storageConfig)
        ->setActions([$action]);

    // Listen for job notifications via an existing topic/subscription.
    $subscription = $topic->subscription($subscriptionId);

    // Submit request
    $parent = "projects/$callingProjectId/locations/global";
    $createDlpJobRequest = (new CreateDlpJobRequest())
        ->setParent($parent)
        ->setInspectJob($inspectJob);
    $job = $dlp->createDlpJob($createDlpJobRequest);

    // Poll Pub/Sub using exponential backoff until job finishes
    // Consider using an asynchronous execution model such as Cloud Functions
    $attempt = 1;
    $startTime = time();
    do {
        foreach ($subscription->pull() as $message) {
            if (isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()) {
                $subscription->acknowledge($message);
                // Get the updated job. Loop to avoid race condition with DLP API.
                do {
                    $getDlpJobRequest = (new GetDlpJobRequest())
                        ->setName($job->getName());
                    $job = $dlp->getDlpJob($getDlpJobRequest);
                } while ($job->getState() == JobState::RUNNING);
                break 2; // break from parent do while
            }
        }
        print('Waiting for job to complete' . PHP_EOL);
        // Exponential backoff with max delay of 60 seconds
        sleep(min(60, pow(2, ++$attempt)));
    } while (time() - $startTime < 600); // 10 minute timeout

    // Print finding counts
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
    switch ($job->getState()) {
        case JobState::DONE:
            $infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
            if (count($infoTypeStats) === 0) {
                print('No findings.' . PHP_EOL);
            } else {
                foreach ($infoTypeStats as $infoTypeStat) {
                    printf(
                        '  Found %s instance(s) of infoType %s' . PHP_EOL,
                        $infoTypeStat->getCount(),
                        $infoTypeStat->getInfoType()->getName()
                    );
                }
            }
            break;
        case JobState::FAILED:
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            $errors = $job->getErrors();
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        case JobState::PENDING:
            print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
            break;
        default:
            print('Unexpected job state. Most likely, the job is either running or has not yet started.');
    }
}

Python

To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import threading
from typing import List, Optional

import google.cloud.dlp
import google.cloud.pubsub


def inspect_bigquery(
    project: str,
    bigquery_project: str,
    dataset_id: str,
    table_id: str,
    topic_id: str,
    subscription_id: str,
    info_types: List[str],
    custom_dictionaries: List[str] = None,
    custom_regexes: List[str] = None,
    min_likelihood: Optional[int] = None,
    max_findings: Optional[int] = None,
    timeout: int = 500,
) -> None:
    """Uses the Data Loss Prevention API to analyze BigQuery data.
    Args:
        project: The Google Cloud project id to use as a parent resource.
        bigquery_project: The Google Cloud project id of the target table.
        dataset_id: The id of the target BigQuery dataset.
        table_id: The id of the target BigQuery table.
        topic_id: The id of the Cloud Pub/Sub topic to which the API will
            broadcast job completion. The topic must already exist.
        subscription_id: The id of the Cloud Pub/Sub subscription to listen on
            while waiting for job completion. The subscription must already
            exist and be subscribed to the topic.
        info_types: A list of strings representing info types to look for.
            A full list of info type categories can be fetched from the API.
        min_likelihood: A string representing the minimum likelihood threshold
            that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
            'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
        max_findings: The maximum number of findings to report; 0 = no maximum.
        timeout: The number of seconds to wait for a response from the API.
    Returns:
        None; the response from the API is printed to the terminal.
    """

    # Instantiate a client.
    dlp = google.cloud.dlp_v2.DlpServiceClient()

    # Prepare info_types by converting the list of strings into a list of
    # dictionaries (protos are also accepted).
    if not info_types:
        info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
    info_types = [{"name": info_type} for info_type in info_types]

    # Prepare custom_info_types by parsing the dictionary word lists and
    # regex patterns.
    if custom_dictionaries is None:
        custom_dictionaries = []
    dictionaries = [
        {
            "info_type": {"name": f"CUSTOM_DICTIONARY_{i}"},
            "dictionary": {"word_list": {"words": custom_dict.split(",")}},
        }
        for i, custom_dict in enumerate(custom_dictionaries)
    ]
    if custom_regexes is None:
        custom_regexes = []
    regexes = [
        {
            "info_type": {"name": f"CUSTOM_REGEX_{i}"},
            "regex": {"pattern": custom_regex},
        }
        for i, custom_regex in enumerate(custom_regexes)
    ]
    custom_info_types = dictionaries + regexes

    # Construct the configuration dictionary. Keys which are None may
    # optionally be omitted entirely.
    inspect_config = {
        "info_types": info_types,
        "custom_info_types": custom_info_types,
        "min_likelihood": min_likelihood,
        "limits": {"max_findings_per_request": max_findings},
    }

    # Construct a storage_config containing the target Bigquery info.
    storage_config = {
        "big_query_options": {
            "table_reference": {
                "project_id": bigquery_project,
                "dataset_id": dataset_id,
                "table_id": table_id,
            }
        }
    }

    # Convert the project id into full resource ids.
    topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
    parent = f"projects/{project}/locations/global"

    # Tell the API where to send a notification when the job is complete.
    actions = [{"pub_sub": {"topic": topic}}]

    # Construct the inspect_job, which defines the entire inspect content task.
    inspect_job = {
        "inspect_config": inspect_config,
        "storage_config": storage_config,
        "actions": actions,
    }

    operation = dlp.create_dlp_job(
        request={"parent": parent, "inspect_job": inspect_job}
    )
    print(f"Inspection operation started: {operation.name}")

    # Create a Pub/Sub client and find the subscription. The subscription is
    # expected to already be listening to the topic.
    subscriber = google.cloud.pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_id)

    # Set up a callback to acknowledge a message. This closes around an event
    # so that it can signal that it is done and the main thread can continue.
    job_done = threading.Event()

    def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
        try:
            if message.attributes["DlpJobName"] == operation.name:
                # This is the message we're looking for, so acknowledge it.
                message.ack()

                # Now that the job is done, fetch the results and print them.
                job = dlp.get_dlp_job(request={"name": operation.name})
                print(f"Job name: {job.name}")
                if job.inspect_details.result.info_type_stats:
                    for finding in job.inspect_details.result.info_type_stats:
                        print(
                            "Info type: {}; Count: {}".format(
                                finding.info_type.name, finding.count
                            )
                        )
                else:
                    print("No findings.")

                # Signal to the main thread that we can exit.
                job_done.set()
            else:
                # This is not the message we're looking for.
                message.drop()
        except Exception as e:
            # Because this is executing in a thread, an exception won't be
            # noted unless we print it manually.
            print(e)
            raise

    # Register the callback and wait on the event.
    subscriber.subscribe(subscription_path, callback=callback)
    finished = job_done.wait(timeout=timeout)
    if not finished:
        print(
            "No event received before the timeout. Please verify that the "
            "subscription provided is subscribed to the topic provided."
        )

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.