title | description | author | tags | date_published |
---|---|---|---|---|
Create a Data Catalog tag history in BigQuery using Cloud Logging and Dataflow |
Learn how to create a historical record of metadata in real time by capturing logs and processing them with Pub/Sub and Dataflow. |
anantdamle |
Data Catalog, BigQuery, Dataflow |
2020-10-07 |
Anant Damle | Solutions Architect | Google
Contributed by Google employees.
This solution is intended for technical practitioners—such as data engineers and analysts—who are responsible for metadata management, data governance, and related analytics.
Historical metadata about your data warehouse is a treasure trove for discovering insights about changing data patterns, data quality, and user behavior. The challenge is that Data Catalog keeps a single version of metadata for fast searchability.
This tutorial suggests a solution to create a historical record of metadata Data Catalog tags by creating change records in real time by capturing and parsing the audit logs from Cloud Logging and processing them in real time by using Pub/Sub and Dataflow to append into a BigQuery table for historical analysis.
- Data Catalog provides a single interface for searching and managing both technical and business metadata of your data warehouse or data lake in Google Cloud and beyond. Data Catalog uses tags to organize metadata and make it discoverable.
- BigQuery is Google Cloud’s serverless, highly scalable, and cost-effective multi-cloud data warehouse designed for business agility that you can use to run petabyte-sized queries. BigQuery also provides APIs for reading table schemas.
- Dataflow is Google Cloud’s serverless service for processing data in streams or batches.
- Pub/Sub is Google Cloud’s flexible, reliable, real-time messaging service for independent applications to publish and subscribe to asynchronous events.
- Cloud Logging is Google Cloud's powerful log management service that allows you to search and route logs from applications and Google Cloud services.
This tutorial assumes some familiarity with shell scripts and basic knowledge of Google Cloud.
- Set up a log sink to export Data Catalog audit logs to Pub/Sub.
- Deploy a streaming Dataflow pipeline to parse the logs.
- Enrich the logs with entry tag information from Data Catalog.
- Store the metadata tags attached to the modified entry in a BigQuery table for historical reference.
This tutorial uses billable components of Google Cloud, including the following:
- Data Catalog
- Dataflow
- Pub/Sub
- Cloud Logging
- Cloud Storage
- BigQuery
- Streaming API
- Storage
Use the pricing calculator to generate a cost estimate based on your projected usage.
For this tutorial, you need a Google Cloud project. To make cleanup easiest at the end of the tutorial, we recommend that you create a new project for this tutorial.
-
Make sure that billing is enabled for your Google Cloud project.
-
At the bottom of the Cloud Console, a Cloud Shell session opens and displays a command-line prompt. Cloud Shell is a shell environment with the Cloud SDK already installed, including the gcloud command-line tool, and with values already set for your current project. It can take a few seconds for the session to initialize.
-
Enable APIs for Data Catalog, BigQuery, Pub/Sub, Dataflow, and Cloud Storage services with the following command:
gcloud services enable \ bigquery.googleapis.com \ storage_component \ datacatalog.googleapis.com \ dataflow.googleapis.com \ pubsub.googleapis.com
-
In Cloud Shell, clone the source repository and go to the directory for this tutorial:
git clone https://github.com/GoogleCloudPlatform/datacatalog-tag-history.git cd datacatalog-tag-history/
-
Use a text editor to modify the
env.sh
file to set following variables:# The Google Cloud project to use for this tutorial export PROJECT_ID="your-project-id" # The BigQuery region to use for the tags table export BIGQUERY_REGION="" # The name of the BigQuery Dataset to create the tag records table export DATASET_ID="" # The name of the BigQuery table for tag records export TABLE_ID="EntityTagOperationRecords" # The Compute Engine region to use for running Dataflow jobs and create a temporary storage bucket export REGION_ID="" # define the bucket ID export TEMP_GCS_BUCKET="" # define the name of the Pub/Sub log sink in Cloud Logging export LOGS_SINK_NAME="datacatalog-audit-pubsub" # define Pub/Sub topic for receiving AuditLog events export LOGS_SINK_TOPIC_ID="catalog-audit-log-sink" # define the subscription ID export LOGS_SUBSCRIPTION_ID="catalog-tags-dumper" # name of the service account to use (not the email address) export TAG_HISTORY_SERVICE_ACCOUNT="tag-history-collector" export TAG_HISTORY_SERVICE_ACCOUNT_EMAIL="${TAG_HISTORY_SERVICE_ACCOUNT}@$(echo $PROJECT_ID | awk -F':' '{print $2"."$1}' | sed 's/^\.//').iam.gserviceaccount.com"
-
Run the script to set the environment variables:
source env.sh
-
Set up a BigQuery dataset in the region of your choice to store Data Catalog entry tags when a change event occurs:
bq --location ${BIGQUERY_REGION} \ --project_id=${PROJECT_ID} \ mk --dataset ${DATASET_ID}
-
Create a BigQuery table for storing tags using the provided schema:
bq mk --table \ --project_id=${PROJECT_ID} \ --description "Catalog Tag snapshots" \ --time_partitioning_field "reconcileTime" \ "${DATASET_ID}.${TABLE_ID}" camelEntityTagOperationRecords.schema
This creates a BigQuery table with
lowerCamelCase
column names. If you wantsnake_case
column names, instead, use the following command:bq mk --table \ --project_id=${PROJECT_ID} \ --description "Catalog Tag snapshots" \ --time_partitioning_field "reconcile_time" \ "${DATASET_ID}.${TABLE_ID}" snakeEntityTagOperationRecords.schema
Warning: Apply restrictive access controls to the tag history BigQuery table. Access to Data Catalog results is based on requestors' permissions.
-
Create a Pub/Sub topic to receive audit log events:
gcloud pubsub topics create ${LOGS_SINK_TOPIC_ID} \ --project ${PROJECT_ID}
-
Create a new Pub/Sub subscription:
gcloud pubsub subscriptions create ${LOGS_SUBSCRIPTION_ID} \ --topic=${LOGS_SINK_TOPIC_ID} \ --topic-project=${PROJECT_ID}
Using a subscription with a Dataflow pipeline ensures that all messages are processed even when the pipeline may be temporarily down for updates or maintenance.
-
Create a log sink to send Data Catalog audit events to the Pub/Sub topic:
gcloud logging sinks create ${LOGS_SINK_NAME} \ pubsub.googleapis.com/projects/${PROJECT_ID}/topics/${LOGS_SINK_TOPIC_ID} \ --log-filter="protoPayload.serviceName=\"datacatalog.googleapis.com\" \ AND protoPayload.\"@type\"=\"type.googleapis.com/google.cloud.audit.AuditLog\""
Cloud Logging pushes new Data Catalog audit logs to the Pub/Sub topic for processing in real time.
-
Give the Pub/Sub
publisher
role to the logging service account to enable the pushing of log entries into the configured Pub/Sub topic:# Identify the log writer service account export LOGGING_WRITER_IDENTITY="$(gcloud logging sinks describe ${LOGS_SINK_NAME} --format="get(writerIdentity)" --project ${PROJECT_ID})" # Grant publisher role to the Logging writer gcloud pubsub topics add-iam-policy-binding ${LOGS_SINK_TOPIC_ID} \ --member=${LOGGING_WRITER_IDENTITY} \ --role='roles/pubsub.publisher' \ --project ${PROJECT_ID}
We recommend that you run pipelines with fine-grained access control to improve access partitioning. If your project does not have a user-created service account, create one using following instructions.
You can use your browser by going to Service accounts in the Cloud Console.
-
Create a service account to use as the user-managed controller service account for Dataflow:
gcloud iam service-accounts create ${TAG_HISTORY_SERVICE_ACCOUNT} \ --description="Service Account to run the Data Catalog tag history collection and recording pipeline." \ --display-name="Data Catalog History collection account"
-
Create a custom role with required permissions for accessing BigQuery, Pub/Sub, Dataflow, and Data Catalog:
export TAG_HISTORY_COLLECTOR_ROLE="tag_history_collector" gcloud iam roles create ${TAG_HISTORY_COLLECTOR_ROLE} --project=${PROJECT_ID} --file=tag_history_collector.yaml
-
Apply the custom role to the service account:
gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL}" \ --role=projects/${PROJECT_ID}/roles/${TAG_HISTORY_COLLECTOR_ROLE}
-
Assign the
dataflow.worker
role to allow the service account to run as a Dataflow worker:gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL}" \ --role=roles/dataflow.worker
-
Create a Cloud Storage bucket as a temporary and staging bucket for Dataflow:
gsutil mb -l ${REGION_ID} \ -p ${PROJECT_ID} \ gs://${TEMP_GCS_BUCKET}
-
Start the Dataflow pipeline using the following Maven command:
mvn clean generate-sources compile package exec:java \ -Dexec.mainClass=com.google.cloud.solutions.catalogtagrecording.PipelineLauncher \ -Dexec.cleanupDaemonThreads=false \ -Dmaven.test.skip=true \ -Dexec.args=" \ --streaming=true \ --project=${PROJECT_ID} \ --serviceAccount=${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL} \ --runner=DataflowRunner \ --gcpTempLocation=gs://${TEMP_GCS_BUCKET}/temp/ \ --stagingLocation=gs://${TEMP_GCS_BUCKET}/staging/ \ --workerMachineType=n1-standard-1 \ --region=${REGION_ID} \ --tagsBigqueryTable=${PROJECT_ID}:${DATASET_ID}.${TABLE_ID} \ --catalogAuditLogsSubscription=projects/${PROJECT_ID}/subscriptions/${LOGS_SUBSCRIPTION_ID}"
If you are using
snake_case
for column names, add the--snakeCaseColumnNames
flag.
Follow the guide to attach a tag to a Data Catalog entry to verify that the tool captures all of the tags attached to the modified entry in the BigQuery table.
- This implementation handles only the operations listed below:
CreateTag
UpdateTag
DeleteTag
- A single Data Catalog operation creates multiple tag record entries due to multiple
AuditLog
events. - The tool polls the Data Catalog service for entry/tag information, because the audit logs don't contain change information. This can result in some changes to entries or tags being missed.
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, you can delete the project:
- In the Cloud Console, go to the Manage resources page.
- In the project list, select the project that you want to delete and then click Delete.
- In the dialog, type the project ID and then click Shut down to delete the project.
- Learn more about Data Catalog
- Learn more about Cloud developer tools.
- Try out other Google Cloud features for yourself. Have a look at our tutorials.