Automatically apply sensitivity tags in Data Catalog to files, databases, and BigQuery tables using Sensitive Data Protection and Dataflow

Last reviewed 2022-01-11 UTC

This document shows you how to use Data Catalog with an automated Dataflow pipeline to identify and apply data sensitivity tags to your data in Cloud Storage files, relational databases (like MySQL, PostgreSQL, and others), and BigQuery.

This Dataflow pipeline uses Sensitive Data Protection to detect sensitive data, like personally identifiable information (PII), and then it tags the findings in Data Catalog.

The solution described in this document builds on the architecture of the file-based tokenizing solution described in its companion document: Automatically tokenize sensitive file-based data with Sensitive Data Protection, Cloud Key Management Service, and Dataflow. The primary difference between the two documents is that this document describes a solution that also creates a Data Catalog entry with a schema of the source and data sensitivity tags for Sensitive Data Protection findings. It can also inspect relational databases using Java database connectivity (JDBC) connections.

This document is intended for a technical audience whose responsibilities include data security, data governance, data processing, or data analytics. This document assumes that you're familiar with data processing and data privacy, without being an expert. It also assumes that you have some familiarity with shell scripts and a basic knowledge of Google Cloud.


This architecture defines a pipeline that performs the following actions:

  • Extracts the data from a relational database using JDBC.
  • Samples the records using the database's LIMIT clause.
  • Processes records through the Cloud Data Loss Prevention API (part of Sensitive Data Protection) to identify sensitivity categories.
  • Saves the findings to a BigQuery table and Data Catalog.

The following diagram illustrates the actions that the pipeline performs:

Data is extracted, sampled, processed, and saved.

The solution uses JDBC connections to access relational databases. When using BigQuery tables as a data source, the solution uses the BigQuery Storage API to improve load times.

The sample-and-identify pipeline outputs the following files to Cloud Storage:

  • Avro schema (equivalent) of the source's schema
  • Detected infoTypes data for each of the input columns (PERSON_NAME, PHONE_NUMBER, and STREET_ADDRESS)

This solution uses record flattening to handle nested and repeated fields in records.


  • Create Data Catalog tags and entity group
  • Deploy the sampling-and-identify pipeline
  • Create custom Data Catalog entity
  • Apply sensitivity tags to custom Data Catalog entity
  • Verify that sensitivity tags data is also in BigQuery


In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Build, DLP API, Cloud SQL, Cloud Storage, Compute Engine, Dataflow, Data Catalog, and Secret Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Build, DLP API, Cloud SQL, Cloud Storage, Compute Engine, Dataflow, Data Catalog, and Secret Manager APIs.

    Enable the APIs

  8. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

Set up your environment

  1. In Cloud Shell, clone the source repository and go to the directory for the cloned files:

    git clone
    cd auto-data-tokenize/
  2. Use a text editor to modify the script to set the required environment variables. Ignore the other variables in the script. They aren't relevant in this document.

    # The Google Cloud project to use:
    # The Compute Engine region to use for running dataflow jobs and create a
    # temporary storage bucket:
    export REGION_ID= "REGION_ID"
    # The Cloud Storage bucket to use as a temporary bucket for Dataflow:
    # Name of the service account to use (not the email address)
    # (For example, tokenizing-runner):
    # Entry Group ID to use for creating/searching for Entries
    # in Data Catalog for non-BigQuery entries.
    # The ID must begin with a letter or underscore, contain only English
    # letters, numbers and underscores, and have 64 characters or fewer.
    # The Data Catalog Tag Template ID to use
    # for creating sensitivity tags in Data Catalog.
    # The ID must contain only lowercase letters (a-z), numbers (0-9), or
    # underscores (_), and must start with a letter or underscore.
    # The maximum size is 64 bytes when encoded in UTF-8

    Replace the following:

    • PROJECT_ID: Your project ID.
    • REGION_ID: The region containing your storage bucket or buckets. Select a location that's in a Data Catalog region.
    • CLOUD_STORAGE_BUCKET_NAME: The name of your storage bucket.
    • DLP_RUNNER_SERVICE_ACCOUNT_NAME: The name of your service account.
    • DATA_CATALOG_ENTRY_GROUP_ID: The name of your non BigQuery data catalog entry group.
    • INSPECTION_TAG_TEMPLATE_ID: The name you gave to your tag template for Data Catalog
  3. Run the script to set the environment variables:


Create resources

The architecture that's described in this document uses the following resources:

  • A service account to run Dataflow pipelines, enabling fine-grained access control
  • A Cloud Storage bucket to store temporary data and test data
  • A Data Catalog tag template to attach sensitivity tags to entries
  • A MySQL on Cloud SQL instance as the JDBC source

Create service accounts

We recommend that you run pipelines with fine-grained access control to improve access partitioning. If your project doesn't have a user-created service account, create one.

  1. In Cloud Shell, create a service account to use as the user-managed controller service account for Dataflow:

      gcloud iam service-accounts create ${DLP_RUNNER_SERVICE_ACCOUNT_NAME} \
      --project="${PROJECT_ID}" \
      --description="Service Account for Sampling and Cataloging pipelines." \
      --display-name="Sampling and Cataloging pipelines"
  2. Create a custom role with required permissions for accessing Sensitive Data Protection, Dataflow, Cloud SQL, and Data Catalog:

      export SAMPLING_CATALOGING_ROLE_NAME="sampling_cataloging_runner"
      gcloud iam roles create ${SAMPLING_CATALOGING_ROLE_NAME} \
      --project="${PROJECT_ID}" \
  3. Apply the custom role and the Dataflow Worker role to the service account to let it to run as a Dataflow worker:

      gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member="serviceAccount:${DLP_RUNNER_SERVICE_ACCOUNT_EMAIL}" \
      gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member="serviceAccount:${DLP_RUNNER_SERVICE_ACCOUNT_EMAIL}" \

Create the Cloud Storage bucket

  • In Cloud Shell, create a Cloud Storage bucket for storing test data and as a Dataflow staging location:

    gsutil mb -p ${PROJECT_ID} -l ${REGION_ID} "gs://${TEMP_GCS_BUCKET}"

Create a Data Catalog entry group

The Data Catalog maintains a list of entries that represent Google Cloud resources or other resources. The entries are organized in entry groups. An implicit entry group exists for BigQuery (@bigquery). You must create entry groups for other types of resources. To learn more about Data Catalog entries, see Surface files from Cloud Storage with fileset entries.

In Data Catalog, an entry group is like a folder that contains entries. An entry represents a data asset.

  • In Cloud Shell, create a new entry group where the pipeline can add an entry for your MySQL table:

    gcloud data-catalog entry-groups create \
    --project="${PROJECT_ID}" \

Create the inspection tag template

  • In Cloud Shell, create a Data Catalog tag template to enable tagging entries with sensitivity information with Sensitive Data Protection:

    gcloud data-catalog tag-templates create ${INSPECTION_TAG_TEMPLATE_ID} \
    --project="${PROJECT_ID}"  \
    --location="${REGION_ID}" \
    --display-name="Auto DLP sensitive categories" \
    --field=id=infoTypes,type=string,display-name="DLP infoTypes",required=TRUE \
    --field=id=inspectTimestamp,type=timestamp,display-name="Inspection run timestamp",required=TRUE

Create an inspection results table in BigQuery

  • In Cloud Shell, create a BigQuery table to store aggregated findings from Sensitive Data Protection:

    bq mk --dataset \
    --location="${REGION_ID}" \
    --project_id="${PROJECT_ID}"  \
    bq mk --table \
    --project_id="${PROJECT_ID}"  \
    inspection_results.SensitivityInspectionResults \

Set up a MySQL on Cloud SQL instance

For the data source, you use a Cloud SQL instance.

  1. In Cloud Shell, instantiate a MySQL on Cloud SQL instance and load it with sample data:

    export SQL_INSTANCE="mysql-autodlp-instance"
    export SQL_ROOT_PASSWORD="root1234"
    gcloud sql instances create "${SQL_INSTANCE}" \
    --project="${PROJECT_ID}"  \
    --region="${REGION_ID}" \
    --database-version=MYSQL_5_7 \
  2. Save the database password in Secret Manager.

    The database password and other secret information shouldn't be stored or logged. Secret Manager lets you store and retrieve such secrets securely.

    Store the MySQL database root password as a cloud secret:

    export SQL_PASSWORD_SECRET_NAME="mysql-password"
    printf $SQL_ROOT_PASSWORD |
    gcloud secrets create "${SQL_PASSWORD_SECRET_NAME}" \
    --data-file=- \
    --locations="${REGION_ID}" \
    --replication-policy="user-managed" \

Copy test data to the Cloud SQL instance

The test data is a demonstration dataset that contains 5,000 randomly generated first and last names and US-style phone numbers. The demonstration-dataset table contains four columns: row_id, person_name, contact_type, contact_number. You can also use your own dataset. If you use your own dataset, remember to adjust the suggested values in Verify in BigQuery in this document. To copy the included demonstration dataset (contacts5k.sql.gz) to your Cloud SQL instance, do the following:

  1. In Cloud Shell, copy the sample dataset to Cloud Storage for staging into Cloud SQL:

    gsutil cp contacts5k.sql.gz gs://${TEMP_GCS_BUCKET}
  2. Create a new database in the Cloud SQL instance:

    export DATABASE_ID="auto_dlp_test"
    gcloud sql databases create "${DATABASE_ID}" \
    --project="${PROJECT_ID}"  \
  3. Grant the Storage Object Admin role to your Cloud SQL service account so it can access storage:

    export SQL_SERVICE_ACCOUNT=$(gcloud sql instances describe
    "${SQL_INSTANCE}" --project="${PROJECT_ID}" | grep
    serviceAccountEmailAddress: | sed "s/serviceAccountEmailAddress: //g")
    gsutil iam ch "serviceAccount:${SQL_SERVICE_ACCOUNT}:objectAdmin" \
  4. Load the data into a new table:

    gcloud sql import sql "${SQL_INSTANCE}" \
    "gs://${TEMP_GCS_BUCKET}/contacts5k.sql.gz" \
    --project="${PROJECT_ID}"  \

    To learn more about importing data into Cloud SQL, see Best practices for importing and exporting data.

Compile modules

  • In Cloud Shell, compile the modules to build the executables for deploying the sampling-and-identify pipeline and the tokenize pipeline:

     ./gradlew clean buildNeeded shadowJar -x test

    Optionally, to run the unit and integration test, remove the -x test flag. If you don't already have libncurses5 installed, install it in Cloud Shell with: sudo apt-get install libncurses5.

Run the sampling-and-identify pipeline

The sampling and Sensitive Data Protection identification pipeline performs the following tasks in the following order:

  1. Extracts records from the provided source. For example, the Sensitive Data Protection identify method supports only flat tables, so the pipeline flattens the Avro, Parquet, or BigQuery records, since those records can contain nested and repeated fields.
  2. Samples the individual columns for required samples, excluding null or empty values.
  3. Identifies sensitive infoTypes data using Sensitive Data Protection, by batching the samples into batch sizes that are acceptable for Sensitive Data Protection (<500 Kb and <50,000 values).
  4. Writes reports to Cloud Storage and to BigQuery for future reference.
  5. Creates Data Catalog entities, when you provide tag template and entry group information. When you provide this information, the pipeline creates sensitivity tags for entries in Data Catalog against the appropriate columns.

Create a Dataflow Flex Template

Dataflow Flex Templates let you use the Google Cloud console, the Google Cloud CLI, or REST API calls to set up and run your pipelines on Google Cloud. This document provides instructions for Google Cloud console. Classic templates are staged as execution graphs on Cloud Storage, while Flex Templates bundle the pipeline as a container image in your project's Container Registry. Flex Templates let you decouple building and running pipelines, and integrate with orchestration systems for scheduled pipeline runs. For more information about Dataflow Flex Templates, see Evaluating which template type to use.

Dataflow Flex Templates separate the building and staging steps from the running steps. They do so by making it possible to launch a Dataflow pipeline from an API call, and from Cloud Composer, using the DataflowStartFlexTemplateOperator module.

  1. In Cloud Shell, define the location to store the template specification file that contains the information necessary to run the Dataflow job:

  2. Build the Dataflow Flex Template:

    gcloud dataflow flex-template build "${FLEX_TEMPLATE_PATH}" \
    --image-gcr-path="${FLEX_TEMPLATE_IMAGE}" \
    --service-account-email="${DLP_RUNNER_SERVICE_ACCOUNT_EMAIL}" \
    --sdk-language="JAVA" \
    --flex-template-base-image=JAVA11 \
    --metadata-file="sample_identify_tag_pipeline_metadata.json" \
    --jar="build/libs/auto-data-tokenize-all.jar" \

Run the pipeline

The sampling and identification pipeline extracts the number of records that are specified by the sampleSize value. It then flattens each record and identifies the infoTypes fields using Sensitive Data Protection (to identify sensitive information types). The infoTypes values are counted and then aggregated by column name and by infoType field to build a sensitivity report.

  • In Cloud Shell, launch the sampling-and-identify pipeline to identify sensitive columns in the data source:

    gcloud dataflow flex-template run "sample-inspect-tag-`date +%Y%m%d-%H%M%S`" \
      --template-file-gcs-location "${FLEX_TEMPLATE_PATH}" \
      --region "${REGION_ID}" \
      --service-account-email "${DLP_RUNNER_SERVICE_ACCOUNT_EMAIL}" \
      --staging-location "gs://${TEMP_GCS_BUCKET}/staging" \
      --worker-machine-type "n1-standard-1" \
      --parameters sampleSize=2000 \
      --parameters sourceType="JDBC_TABLE" \
      --parameters inputPattern="Contacts" \
      --parameters reportLocation="gs://${TEMP_GCS_BUCKET}/auto_dlp_report/" \
      --parameters reportBigQueryTable="${PROJECT_ID}:inspection_results.SensitivityInspectionResults" \
      --parameters jdbcConnectionUrl="${CLOUD_SQL_JDBC_CONNECTION_URL}" \
      --parameters jdbcDriverClass="com.mysql.cj.jdbc.Driver" \
      --parameters jdbcUserName="root" \
      --parameters jdbcPasswordSecretsKey="projects/${PROJECT_ID}/secrets/${SQL_PASSWORD_SECRET_NAME}/versions/1" \
      --parameters ^:^jdbcFilterClause="ROUND(RAND() * 10) IN (1,3)" \
      --parameters dataCatalogEntryGroupId="projects/${PROJECT_ID}/locations/${REGION_ID}/entryGroups/${DATA_CATALOG_ENTRY_GROUP_ID}" \
      --parameters dataCatalogInspectionTagTemplateId="projects/${PROJECT_ID}/locations/${REGION_ID}/tagTemplates/${INSPECTION_TAG_TEMPLATE_ID}"

The jdbcConnectionUrl parameter specifies a JDBC database connection URL with username and password details. The details of building the exact connection URL depend on your database vendor and hosting partner. To understand details for connecting to Cloud SQL based relational databases, see Connecting using Cloud SQL connectors.

The pipeline constructs a query like SELECT * FROM [TableName] to read the table records for inspection.

This query can result in loading on the database and also on the pipeline, especially for a large table. Optionally, you can optimize your sample of the records you want to inspect on the database side. To do so, insert jdbcFilterClause as the WHERE clause of the query that appears in the code sample offered in the Verify in BigQuery section later in this document.

To run a report, you can choose one or more of the following reporting sinks:

  • reportLocation to store the report in a Cloud Storage bucket
  • report BIGQUERY_TABLE to store the report in a BigQueryTable
  • dataCatalogEntryGroupId to create and tag the entry in Data Catalog (omit this parameter if the sourceType is BIGQUERY_TABLE)

The pipeline supports the following source types. To determine the correct combination of sourceType and inputPattern arguments, use the options listed in the following table.

In this case, you only use the JDBC_TABLE table.

sourceType Data source inputPattern


Relational databases (using JDBC)



Avro file in Cloud Storage.
To select multiple files matching a pattern, you can use a single wildcard. The following pattern selects all files starting with prefix (data-):




Parquet file in Cloud Storage.
To select multiple files matching a pattern, you can use a single wildcard. The following pattern selects all files starting with prefix (data-):




BigQuery table.
Reads all the rows and then randomly samples using the pipeline.


The pipeline detects all the standard infoTypes supported by Sensitive Data Protection. You can provide additional custom infoTypes by using the --observableinfoTypes parameter.

Sampling-and-identify pipeline directed acyclic graph (DAG)

The following diagram shows the Dataflow execution DAG. The DAG has two branches. Both branches start at ReadJdbcTable and conclude at ExtractReport. From there, reports are generated or data is stored.

Sampling and identification pipeline data flow.

Retrieve the report

The sampling and identification pipeline outputs the following files:

  • Avro schema file (or a schema converted to Avro) of the source
  • One file for each of the sensitive columns with infoType information and counts

To retrieve the report, do the following:

  1. In Cloud Shell, retrieve the report:

    mkdir -p auto_dlp_report/
    gsutil -m cp
  2. List all identified column names:

    cat auto_dlp_report/col-*.json | jq .columnName

    The output is as follows:

    "$.topLevelRecord.contact_number" "$.topLevelRecord.person_name"

  3. View the details of an identified column with the cat command for the file:

    cat auto_dlp_report/col-topLevelRecord-contact_number-00000-of-00001.json

    The following is a snippet of the cc column:

    { "columnName": "$.topLevelRecord.contact_number", "infoTypes": [{ "infoType": "PHONE_NUMBER", "count": "990" }] }

    • The columnName value is unusual because of the implicit conversion of a database row to an Avro record.
    • The count value varies based on the randomly selected samples during execution.

Verify sensitivity tags in Data Catalog

The sampling and identification pipeline creates a new entry and applies the sensitivity tags to the appropriate columns.

  1. In Cloud Shell, retrieve the created entry for the Contacts table:

    gcloud data-catalog entries describe Contacts \
      --entry-group=${DATA_CATALOG_ENTRY_GROUP_ID} \
      --project="${PROJECT_ID}"  \

    This command shows the details of the table, including its schema.

  2. Show all the sensitivity tags that are attached to this entry:

    gcloud data-catalog tags list --entry=Contacts
      --entry-group=${DATA_CATALOG_ENTRY_GROUP_ID} \
      --project="${PROJECT_ID}"  \
  3. Verify that the sensitivity tags are present on the following columns: contact_number, person_name.

    The infoTypes data identified by Sensitive Data Protection can contain some false types. For example, it can identify the person_name type as a DATE type, because some random person_names strings can be April, May, June, or others.

    The sensitivity tag details that are output are as follows:

    column: contact_number
        displayName: DLP infoTypes
        stringValue: '[PHONE_NUMBER]'
        displayName: Inspection run timestamp
        timestampValue: '2021-05-20T16:34:29.596Z'
    name: projects/auto-dlp/locations/asia-southeast1/entryGroups/sql_databases/entries/Contacts/tags/CbS0CtGSpZyJ
    template: projects/auto-dlp/locations/asia-southeast1/tagTemplates/auto_dlp_inspection
    templateDisplayName: Auto DLP sensitive categories
    column: person_name
        displayName: DLP infoTypes
        stringValue: '[DATE, PERSON_NAME]'
        displayName: Inspection run timestamp
        timestampValue: '2021-05-20T16:34:29.594Z'
    name: projects/auto-dlp/locations/asia-southeast1/entryGroups/sql_databases/entries/Contacts/tags/Cds1aiO8R0pT
    template: projects/auto-dlp/locations/asia-southeast1/tagTemplates/auto_dlp_inspection
    templateDisplayName: Auto DLP sensitive categories

Verify in BigQuery

The Dataflow pipeline appends the aggregated findings to the provided BigQuery table. The query prints the inspection results retrieved from the BigQuery table.

  • In Cloud Shell, check the results:

    bq query \
      --location="${REGION_ID}"  \
      --project="${PROJECT_ID}"  \
      --use_legacy_sql=false \
       input_pattern AS table_name,
       ColumnReport.column_name AS column_name,
       ColumnReport.info_types AS info_types
       UNNEST(column_report) ColumnReport;
     WHERE column_name="$.topLevelRecord.contact_number"'

    The output is as follows:

    | table_name |           column_name           |                  info_types                  |
    | Contacts   | $.topLevelRecord.contact_number | [{"info_type":"PHONE_NUMBER","count":"990"}] |

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next

  • Read the companion document about a similar solution that uses files as input: