Edit on GitHub
Report issue
Page history

Create a Data Catalog tag history in BigQuery using Cloud Logging and Dataflow

Author(s): @anantdamle ,   Published: 2020-10-07

Anant Damle | Solutions Architect | Google

Contributed by Google employees.

This solution is intended for technical practitioners—such as data engineers and analysts—who are responsibile for metadata management, data governance, and related analytics.

Historical metadata about your data warehouse is a treasure trove for discovering insights about changing data patterns, data quality, and user behavior. The challenge is that Data Catalog keeps a single version of metadata for fast searchability.

This tutorial suggests a solution to create a historical record of metadata Data Catalog tags by creating change records in real time by capturing and parsing the audit logs from Cloud Logging and processing them in real time by using Pub/Sub and Dataflow to append into a BigQuery table for historical analysis.

Data Catalog tag history recording solution architecture

Concepts

  • Data Catalog provides a single interface for searching and managing both technical and business metadata of your data warehouse or data lake in Google Cloud and beyond. Data Catalog uses tags to organize metadata and make it discoverable.
  • BigQuery is Google Cloud’s serverless, highly scalable, and cost-effective multi-cloud data warehouse designed for business agility that you can use to run petabyte-sized queries. BigQuery also provides APIs for reading table schemas.
  • Dataflow is Google Cloud’s serverless service for processing data in streams or batches.
  • Pub/Sub is Google Cloud’s flexible, reliable, real-time messaging service for independent applications to publish and subscribe to asynchronous events.
  • Cloud Logging is Google Cloud's powerful log management service that allows you to search and route logs from applications and Google Cloud services.

Prerequisites

This tutorial assumes some familiarity with shell scripts and basic knowledge of Google Cloud.

Objectives

  1. Set up a log sink to export Data Catalog audit logs to Pub/Sub.
  2. Deploy a streaming Dataflow pipeline to parse the logs.
  3. Enrich the logs with entry tag information from Data Catalog.
  4. Store the metadata tags attached to the modified entry in a BigQuery table for historical reference.

Costs

This tutorial uses billable components of Google Cloud, including the following:

Use the pricing calculator to generate a cost estimate based on your projected usage.

Before you begin

For this tutorial, you need a Google Cloud project. To make cleanup easiest at the end of the tutorial, we recommend that you create a new project for this tutorial.

  1. Create a Google Cloud project.
  2. Make sure that billing is enabled for your Google Cloud project.
  3. Open Cloud Shell.

    At the bottom of the Cloud Console, a Cloud Shell session opens and displays a command-line prompt. Cloud Shell is a shell environment with the Cloud SDK already installed, including the gcloud command-line tool, and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. Enable APIs for Data Catalog, BigQuery, Pub/Sub, Dataflow, and Cloud Storage services with the following command:

    gcloud services enable \
    bigquery.googleapis.com \
    storage_component \
    datacatalog.googleapis.com \
    dataflow.googleapis.com \
    pubsub.googleapis.com  
    

Setting up your environment

  1. In Cloud Shell, clone the source repository and go to the directory for this tutorial:

    git clone https://github.com/GoogleCloudPlatform/datacatalog-tag-history.git
    cd datacatalog-tag-history/
    
  2. Use a text editor to modify the env.sh file to set following variables:

    # The Google Cloud project to use for this tutorial
    export PROJECT_ID="your-project-id"
    
    # The BigQuery region to use for the tags table
    export BIGQUERY_REGION=""
    
    # The name of the BigQuery Dataset to create the tag records table
    export DATASET_ID=""
    
    # The name of the BigQuery table for tag records
    export TABLE_ID="EntityTagOperationRecords"
    
    # The Compute Engine region to use for running Dataflow jobs and create a temporary storage bucket
    export REGION_ID=""
    
    # define the bucket ID
    export TEMP_GCS_BUCKET=""
    
    # define the name of the Pub/Sub log sink in Cloud Logging
    export LOGS_SINK_NAME="datacatalog-audit-pubsub"
    
    # define Pub/Sub topic for receiving AuditLog events
    export LOGS_SINK_TOPIC_ID="catalog-audit-log-sink"
    
    # define the subscription ID
    export LOGS_SUBSCRIPTION_ID="catalog-tags-dumper"
    
    # name of the service account to use (not the email address)
    export TAG_HISTORY_SERVICE_ACCOUNT="tag-history-collector"
    export TAG_HISTORY_SERVICE_ACCOUNT_EMAIL="${TAG_HISTORY_SERVICE_ACCOUNT}@$(echo $PROJECT_ID | awk -F':' '{print $2"."$1}' | sed 's/^\.//').iam.gserviceaccount.com"
    
  3. Run the script to set the environment variables:

    source env.sh
    

Creating resources

Create the BigQuery table

  1. Set up a BigQuery dataset in the region of your choice to store Data Catalog entry tags when a change event occurs:

    bq --location ${BIGQUERY_REGION} \
    --project_id=${PROJECT_ID} \
    mk --dataset ${DATASET_ID}
    
  2. Create a BigQuery table for storing tags using the provided schema:

    bq mk --table \
    --project_id=${PROJECT_ID} \
    --description "Catalog Tag snapshots" \
    --time_partitioning_field "reconcileTime" \
    "${DATASET_ID}.${TABLE_ID}" camelEntityTagOperationRecords.schema
    

    This creates a BigQuery table with lowerCamelCase column names. If you want snake_case column names, instead, use the following command:

    bq mk --table \
    --project_id=${PROJECT_ID} \
    --description "Catalog Tag snapshots" \
    --time_partitioning_field "reconcile_time" \
    "${DATASET_ID}.${TABLE_ID}" snakeEntityTagOperationRecords.schema
    

Warning: Apply restrictive access controls to the tag history BigQuery table. Access to Data Catalog results is based on requestors' permissions.

Configure a Pub/Sub topic and subscription

  1. Create a Pub/Sub topic to receive audit log events:

    gcloud pubsub topics create ${LOGS_SINK_TOPIC_ID} \
    --project ${PROJECT_ID}
    
  2. Create a new Pub/Sub subscription:

    gcloud pubsub subscriptions create ${LOGS_SUBSCRIPTION_ID} \
    --topic=${LOGS_SINK_TOPIC_ID} \
    --topic-project=${PROJECT_ID}
    

    Using a subscription with a Dataflow pipeline ensures that all messages are processed even when the pipeline may be temporarily down for updates or maintenance.

Configure a log sink

  1. Create a log sink to send Data Catalog audit events to the Pub/Sub topic:

    gcloud logging sinks create ${LOGS_SINK_NAME} \
    pubsub.googleapis.com/projects/${PROJECT_ID}/topics/${LOGS_SINK_TOPIC_ID} \
    --log-filter="protoPayload.serviceName=\"datacatalog.googleapis.com\" \
    AND protoPayload.\"@type\"=\"type.googleapis.com/google.cloud.audit.AuditLog\""
    

    Cloud Logging pushes new Data Catalog audit logs to the Pub/Sub topic for processing in real time.

  2. Give the Pub/Sub publisher role to the logging service account to enable the pushing of log entries into the configured Pub/Sub topic:

    # Identify the log writer service account  
    export LOGGING_WRITER_IDENTITY="$(gcloud logging sinks describe ${LOGS_SINK_NAME} --format="get(writerIdentity)" --project ${PROJECT_ID})"
    
    # Grant publisher role to the Logging writer
    gcloud pubsub topics add-iam-policy-binding ${LOGS_SINK_TOPIC_ID} \
    --member=${LOGGING_WRITER_IDENTITY} \
    --role='roles/pubsub.publisher' \
    --project ${PROJECT_ID}
    

Create service accounts

We recommend that you run pipelines with fine-grained access control to improve access partitioning. If your project does not have a user-created service account, create one using following instructions.

You can use your browser by going to Service accounts in the Cloud Console.

  1. Create a service account to use as the user-managed controller service account for Dataflow:

    gcloud iam service-accounts create  ${TAG_HISTORY_SERVICE_ACCOUNT} \
    --description="Service Account to run the Data Catalog tag history collection and recording pipeline." \
    --display-name="Data Catalog History collection account"
    
  2. Create a custom role with required permissions for accessing BigQuery, Pub/Sub, Dataflow, and Data Catalog:

    export TAG_HISTORY_COLLECTOR_ROLE="tag_history_collector"
    
    gcloud iam roles create ${TAG_HISTORY_COLLECTOR_ROLE} --project=${PROJECT_ID} --file=tag_history_collector.yaml
    
  3. Apply the custom role to the service account:

    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL}" \
    --role=projects/${PROJECT_ID}/roles/${TAG_HISTORY_COLLECTOR_ROLE}
    
  4. Assign the dataflow.worker role to allow the service account to run as a Dataflow worker:

    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL}" \
    --role=roles/dataflow.worker
    

Deploy the tag history recording pipeline

  1. Create a Cloud Storage bucket as a temporary and staging bucket for Dataflow:

    gsutil mb -l ${REGION_ID} \
    -p ${PROJECT_ID} \
    gs://${TEMP_GCS_BUCKET}
    
  2. Start the Dataflow pipeline using the following Maven command:

    mvn clean generate-sources compile package exec:java \
      -Dexec.mainClass=com.google.cloud.solutions.catalogtagrecording.PipelineLauncher \
      -Dexec.cleanupDaemonThreads=false \
      -Dmaven.test.skip=true \
      -Dexec.args=" \
    --streaming=true \
    --project=${PROJECT_ID} \
    --serviceAccount=${TAG_HISTORY_SERVICE_ACCOUNT_EMAIL} \
    --runner=DataflowRunner \
    --gcpTempLocation=gs://${TEMP_GCS_BUCKET}/temp/ \
    --stagingLocation=gs://${TEMP_GCS_BUCKET}/staging/ \
    --workerMachineType=n1-standard-1 \
    --region=${REGION_ID} \
    --tagsBigqueryTable=${PROJECT_ID}:${DATASET_ID}.${TABLE_ID} \
    --catalogAuditLogsSubscription=projects/${PROJECT_ID}/subscriptions/${LOGS_SUBSCRIPTION_ID}"
    

    If you are using snake_case for column names, add the --snakeCaseColumnNames flag.

Pipeline DAG

Recording pipeline DAG

Manual test

Follow the guide to attach a tag to a Data Catalog entry to verify that the tool captures all of the tags attached to the modified entry in the BigQuery table.

Limitations

  • This implementation handles only the operations listed below:
    • CreateTag
    • UpdateTag
    • DeleteTag
  • A single Data Catalog operation creates multiple tag record entries due to multiple AuditLog events.
  • The tool polls the Data Catalog service for entry/tag information, because the audit logs don't contain change information. This can result in some changes to entries or tags being missed.

Cleaning up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, you can delete the project:

  1. In the Cloud Console, go to the Manage resources page.
  2. In the project list, select the project that you want to delete and then click Delete.
  3. In the dialog, type the project ID and then click Shut down to delete the project.

What's next

Submit a tutorial

Share step-by-step guides

Submit a tutorial

Request a tutorial

Ask for community help

Submit a request

View tutorials

Search Google Cloud tutorials

View tutorials

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see our Site Policies. Java is a registered trademark of Oracle and/or its affiliates.