Building a BigQuery data lineage system using audit logs, Pub/Sub, ZetaSQL, Dataflow, and Data Catalog

Stay organized with collections Save and categorize content based on your preferences.
Last reviewed 2021-01-28 UTC

This tutorial shows you how to implement a reference data lineage architecture by building a basic real-time data lineage solution to track data movement in BigQuery. The solution parses SQL queries in audit logs by using the Dataflow pipeline.

This tutorial is part of a series that helps you to understand data lineage concepts using BigQuery:

In this tutorial, you use audit logs to capture data operations. You configure Cloud Logging to emit BigQuery data access logs to Pub/Sub and deploy a streaming Dataflow pipeline to analyze the SQL query in the access log to identify and extract lineage. The extracted lineage is persisted in a BigQuery table, a Data Catalog tag, and—to enable downstream use cases—a Pub/Sub topic.

This tutorial is intended for people who work in data engineering and data governance. This tutorial assumes that you have basic knowledge of building Dataflow pipelines using Apache Beam Java SDK, shell scripting, and BigQuery.

Architecture

The main components in the architecture in this tutorial are as follows:

  • BigQuery: In this tutorial, BigQuery acts as a data warehouse and schema provider BigQuery also provides APIs for reading table schema.
  • BigQuery audit logs: BigQuery audit logs are used as the operation logs in this architecture. BigQuery audit logs provide three different types of logs; admin activity, data access, and system events. BigQueryAuditMetadata messages in the log contain integrated resource information such as table names and executed SQL queries.
  • Data Catalog. Data Catalog is a fully managed and highly scalable data discovery and metadata management service within Dataplex. It provides data governance features through support for tags on data entities like BigQuery tables, including policy tags that provide column level access-control for BigQuery tables. When attached to a data entity, these policy tags allow for the inclusion of additional metadata. You can also use the policy tags to add lineage metadata.
  • ZetaSQL: ZetaSQL is the grammar provider used in this tutorial.

The following architectural diagram shows how the components used in this tutorial interact to create a data lineage extraction system for a BigQuery data warehouse in the Google Cloud ecosystem.

Flow of information to and from a data engine.

The flow of events in the architecture is as follows:

  1. The extraction pipeline uses the audit logs emitted from BigQuery to Pub/Sub for consuming job information data in real time.

  2. The Dataflow pipeline processes the job information to build data lineage by parsing the SQL code for the query using ZetaSQL grammar engine.

  3. The extraction pipeline uses the table schema from the BigQuery API and persists the generated lineage in a BigQuery table and as a tag in Data Catalog.

  4. You can query the lineage table to identify the full flow of data in the data warehouse.

Limitations

The grammar provider used in this tutorial has the following limitations:

  • The grammar provider only supports LOAD, QUERY, and COPY jobs.
  • You must specify a fully qualified table name when you make a query— for example, project-id.dataset_id.table_id.
  • Because of limitations in ZetaSQL, DDL statements aren't supported.
  • This tutorial only demonstrates simple functions and aggregate function parsing. Complex functions and operations—for example, Window Aggregation, are out of the scope of this tutorial.

Objectives

  • Implement a data lineage system reference architecture for BigQuery.
  • Extract data lineage by using Dataflow streaming pipelines.
  • Verify that data lineage is extracted in BigQuery and Data Catalog.

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.

Before you begin

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  3. Enable the BigQuery, Pub/Sub, Data Catalog, and Dataflow APIs.

    Enable the APIs

  4. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  5. Download the code from the GitHub repository for this tutorial:

    git clone https://github.com/GoogleCloudPlatform/bigquery-data-lineage.git
    cd bigquery-data-lineage
    

Setting the environment

This tutorial assumes that you have the Owner(roles/owner) role.

The scripts and commands used in this tutorial rely on shell environment variables.

  1. In Cloud Shell, set the environment variables for your project and the Pub/Sub topic name:

    export PROJECT_ID=$(gcloud config get-value project)
    export AUDIT_LOGS_PUBSUB_TOPIC="TOPIC_ID"
    

    Replace TOPIC_ID with a name of your choice for the Pub/Sub topic.

  2. Set the environment variable for the Google Cloud region:

    export REGION_ID=CLOUD_REGION
    

    For data localization purposes, you must replace CLOUD_REGION with a Dataflow regional endpoint.

Enabling audit logging

In this step, you use Cloud Logging to capture operation logs in Google Cloud centrally. To enable the ingestion of audit logs into the extraction pipeline, you also set up the export of data_access logs to a message bus, in this case, Pub/Sub.

Create a Pub/Sub topic

  • In Cloud Shell, create a Pub/Sub topic to receive audit logs:

    gcloud pubsub topics create $AUDIT_LOGS_PUBSUB_TOPIC --project $PROJECT_ID
    

Export logs to Pub/Sub

The audit logs are available in Cloud Logging. You can access the audit logs using the Cloud Logging API.

  1. In Cloud Shell, filter and extract the BigQuery audit logs to Pub/Sub for real-time processing:

      export LOG_SINK_ID=name-of-log-sink
      gcloud logging sinks create $LOG_SINK_ID \
          pubsub.googleapis.com/projects/$PROJECT_ID/topics/$AUDIT_LOGS_PUBSUB_TOPIC \
          --log-filter='protoPayload.metadata."@type"="type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"     protoPayload.methodName="google.cloud.bigquery.v2.JobService.InsertJob" operation.last=true'
    
    
  2. To allow the logging service account to publish log entries, assign the pubsub.publisher role to the logging service account:

    # Identify the Logs writer service account
    export LOGGING_WRITER_IDENTITY=$(gcloud logging sinks describe $LOG_SINK_ID --format="get(writerIdentity)" --project $PROJECT_ID)
    
    # Grant Publish permission to the Logging writer
    gcloud pubsub topics add-iam-policy-binding $AUDIT_LOGS_PUBSUB_TOPIC \
        --member=$LOGGING_WRITER_IDENTITY \
        --role='roles/pubsub.publisher' \
        --project $PROJECT_ID
    

Create a BigQuery dataset and table

In this step, you create a BigQuery table which persists the extracted lineage.

  1. In Cloud Shell, create a BigQuery dataset:

    export BIGQUERY_REGION=BIGQUERY_MULTI_REGION
    export DATASET_ID=NAME_OF_LINEAGE_DATASET
    
    bq --location=$BIGQUERY_REGION \
    --project_id=$PROJECT_ID \
    mk --dataset $DATASET_ID
    

    Replace the following:

  2. In Cloud Shell, create a partitioned table to store lineage information:

    export LINEAGE_TABLE_ID=NAME_OF_LINEAGE_TABLE
    bq mk --table \
    --project_id=$PROJECT_ID \
    --description "Data Lineage table" \
    --time_partitioning_field "reconcileTime" \
    $DATASET_ID.$LINEAGE_TABLE_ID \
    lineage_bigquery_table_schema.json
    
    

    Replace NAME_OF_LINEAGE_TABLE with your chosen table name.

  3. To verify that the table has been created, in the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

    Table created in BigQuery.

    You see an output similar to the preceding image.

Storing lineage information in Data Catalog

In this step, you set up Data Catalog to store the lineage information as a custom tag-template tag, which enables simple and centralized metadata operations in Google Cloud.

Create a tag template

  1. In Cloud Shell, create the tag template:

      export LINEAGE_TAG_TEMPLATE_NAME="TAG_TEMPLATE_NAME"
    
      gcloud data-catalog tag-templates create $LINEAGE_TAG_TEMPLATE_NAME \
          --project $PROJECT_ID \
          --location=$REGION_ID \
          --display-name="Data Lineage" \
          --field=id=jobTime,display-name="Entity change timestamp",type=timestamp,required=TRUE \
          --field=id=reconcileTime,display-name="Lineage processed timestamp",type=timestamp,required=TRUE \
          --field=id=actuator,display-name="The email address of the authorized executor of Job which can be a person or a service account",type=string \
          --field=id=jobId,display-name="The BigQuery or Operation Job Id which made the change to the table",type=string \
          --field=id=parents,display-name="The tables which were read for generating this entity",type=string
    

    Replace TAG_TEMPLATE_NAME with a name of your choice. This name becomes a tag template to hold lineage information in Data Catalog.

  2. Verify that the template is created:

      export LINEAGE_TAG_TEMPLATE_ID="projects/${PROJECT_ID}/locations/${REGION_ID}/tagTemplates/${LINEAGE_TAG_TEMPLATE_NAME}"
    
      gcloud data-catalog tag-templates describe $LINEAGE_TAG_TEMPLATE_NAME \
          --location=$REGION_ID
    

    The output is similar to the following:

      displayName: Data Lineage
      fields:
        actuator:
          displayName: The email address of the authorized executor of Job which can be
            a person or a service account
          name:
      projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/actuator
          type:
            primitiveType: STRING
        jobId:
          displayName: The BigQuery or Operation Job Id which made the change to the table
          name:
      projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/jobId
          type:
            primitiveType: STRING
        jobTime:
          displayName: Entity change timestamp
          isRequired: true
          name:
      projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/jobTime
          type:
            primitiveType: TIMESTAMP
        parents:
          displayName: The tables which were read for generating this entity
          name:
      projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/parents
          type:
            primitiveType: STRING
        reconcileTime:
          displayName: Lineage processed timestamp
          isRequired: true
          name:
      projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/reconcileTime
          type:
            primitiveType: TIMESTAMP
           

Extracting lineage

In this step, you set up a Pub/Sub topic to export the extracted lineage in real time. You then launch the Dataflow pipeline to extract lineage. The Dataflow pipeline processes the SQL query in the audit log to identify data lineage. The Dataflow pipeline uses ZetaSql to understand the SQL query and BigQuery API to retrieve the table schema information.

Create a lineage output Pub/Sub topic

The extraction pipeline processes the SQL code to identify data lineage that you can use to activate multiple downstream applications—for example, automatically enforcing data access policy for new tables, or data exfiltration alerts. To allow for future data lineage use cases, a Pub/Sub-based message bus enables decoupling from the extraction pipeline.

  • Create a Pub/Sub topic to distribute lineage messages to other systems:

    export LINEAGE_OUTPUT_PUBSUB_TOPIC="composite-lineage"
    gcloud pubsub topics create $LINEAGE_OUTPUT_PUBSUB_TOPIC \
         --project $PROJECT_ID
    

Running the extraction pipeline

The extraction pipeline parses the SQL statements in the data_access log entries using ZetaSQL to identify lineage elements. The extraction pipeline then inserts the information into the BigQuery table that you created in the previous step.

  1. In Cloud Shell, create a Cloud Storage bucket as a temporary and staging area for Dataflow jobs:

      export TEMP_GCS_BUCKET=TEMP_BUCKET_ID
      gsutil mb -l $REGION_ID -p $PROJECT_ID gs://$TEMP_GCS_BUCKET
    

    Replace the TEMP_BUCKET_ID placeholder variable with a name of your choice.

  2. Use Cloud Build to build the JAR file:

      gcloud builds submit --config=cloudbuild.yaml \
      --substitutions _TEMPLATE_IMAGE_TAG="gcr.io/${PROJECT_ID}/bigquery-data-lineage-extraction:latest",_MAIN_CLASS="com.google.cloud.solutions.datalineage.LineageExtractionPipeline" \
      --project "${PROJECT_ID}"
    
  3. Create a Dataflow Flex Template:

      gcloud dataflow flex-template build \
      "gs://${TEMP_GCS_BUCKET}/bigquery-data-lineage-templates" \
      --project="${PROJECT_ID}" \
      --image="gcr.io/${PROJECT_ID}/bigquery-data-lineage-extraction:latest" \
      --metadata-file="lineage-extraction-metadata.json" \
      --sdk-language="JAVA"
    
  4. Launch the lineage extractor streaming pipeline using the Maven launcher:

      gcloud dataflow flex-template run \
      "bigquery-lineage-extraction-$(date +%Y%m%d%H%M%S)" \
      --template-file-gcs-location="gs://${TEMP_GCS_BUCKET}/bigquery-data-lineage-templates" \
      --project="${PROJECT_ID}" \
      --region="${REGION_ID}" \
      --temp-location="gs://${TEMP_GCS_BUCKET}/temp/" \
      --staging-location="gs://${TEMP_GCS_BUCKET}/staging/" \
      --parameters=lineageTableName="${PROJECT_ID}:${DATASET_ID}.${LINEAGE_TABLE_ID}" \
      --parameters=tagTemplateId="${LINEAGE_TAG_TEMPLATE_ID}" \
      --parameters=pubsubTopic="projects/${PROJECT_ID}/topics/${AUDIT_LOGS_PUBSUB_TOPIC}" \
      --parameters=compositeLineageTopic="projects/${PROJECT_ID}/topics/${LINEAGE_OUTPUT_PUBSUB_TOPIC}"
    

    This command compiles the pipeline code and deploys it to run on Dataflow.

  5. To see if the pipeline was successfully launched, in the Google Cloud console, go to the Dataflow page.

    Go to the Google Cloud console

    The Dataflow job resembles the following Directed Acyclic Graph (DAG) image:

    Example DAG.

Manually checking data lineage output

In this step, you run a query to verify the output of the extractor pipeline, using a publicly available NCAA basketball dataset.

Run a query on sample data

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. To set the destination table for query results, click More, then Query Settings.

  3. Select Set a destination table for query results.

    Query results destination options.

  4. Paste the following code into the Query Editor and click Run query:

      #standardSQL
      SELECT
        teams.id,
        CONCAT(colors.color,'-', teams.alias) AS team_alias_with_color,
        teams.name,
        colors.color AS team_color
      FROM
        `bigquery-public-data.ncaa_basketball.mbb_teams` AS teams
        INNER JOIN
        `bigquery-public-data.ncaa_basketball.team_colors` AS colors
        USING (id)
    

    Once the query execution is completed, you can verify the outputs. To verify the outputs, see the table in the following section, Understanding the extracted lineage information.

Understanding the extracted lineage information

The following table shows lineage information extracted by the system. The table also shows the values that you can expect to see when you follow the steps in this tutorial:

Information type Column Column definition Example value
Job information reconcileTime The time in UTC at which the logs were processed to extract lineage. The time at which the tutorial is run.
Job information jobId The BigQuery ID for the job which executed the query to manipulate data. You can get more details using the BigQuery API for jobs. bquxjob_xxxxxxxxxx
Job information jobTime The time in UTC at which the user executed the BigQuery job or query. The time at which you ran the manual verification step.
Job information actuator The user which executed the BigQuery query or job. In this case, the email address of the user or of the service account. Your email address.
Job information transform.sql The BigQuery SQL statement for the BigQuery job The output of the SQL query you run in Run a query on sample data
Table lineage target The BigQuery table which was created or updated with the job. The table is represented as both a SqlResource and a linked resource. sqlResource:
bigquery.table.{your-project-id}.MyDataSet.MyOutputTable
Table lineage parents The data entity which was used to build the target table. The entity can be a BigQuery table or, in the case of a load operation, a Cloud Storage file. sqlResource:

bigquery.table.bigquery-public-data.ncaa_basketball.mbb_teams

sqlResource:
bigquery.table.bigquery-public-data.ncaa_basketball.team_colors

Table lineage operation The type of job; COPY, LOAD, or QUERY QUERY_JOB
Column lineage target The target BigQuery table column. id,team_alias_with_color, name, team_color
Column lineage operations A list of functions used to derive the target column from parent columns. CONCAT (ZetaSQL::concat)
Column lineage parents A list of columns from parent tables which make up the target column. Example values for column:

team_alias_with_color

sqlResource:
bigquery.table.bigquery-public-data.ncaa_basketball.team_colors
column: color

sqlResource:
bigquery.table.bigquery-public-data.ncaa_basketball.mbb_teams
column: alias

Verify data entry in BigQuery

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. Open the output dataset which you created in Create a BigQuery dataset and table.

  3. In BigQuery, select and expand the dataset. Click the Preview tab. You see the following information:

    • Job information
    • Table level information
    • Column information, including operations

Verify tags in Data Catalog

  1. In the Google Cloud console, go to Dataplex.

    Go to Dataplex

  2. In Dataplex, search for the following variable: type=TABLE projectId:YOUR_PROJECT_ID.
  3. Select your output table. You see a data lineage tag attached to the table, as shown in the following example image.

    Example MyOutputTable.

    Column tags are applied to the table, as shown in the following example image.

    Table with column tags applied.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

If you plan to continue with the rest of this series, keep the resources that you've already created. Otherwise, delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next