This tutorial shows you how to implement a reference data lineage architecture by building a basic real-time data lineage solution to track data movement in BigQuery. The solution parses SQL queries in audit logs by using the Dataflow pipeline.
This tutorial is part of a series that helps you to understand data lineage concepts using BigQuery:
- Data lineage systems for a data warehouse
- Building a BigQuery data lineage system using audit logs, Pub/Sub, ZetaSQL, Dataflow, and Data Catalog (this document)
- Automatically cascade data access policy to derived tables based on data lineage using BigQuery, Data Catalog, and Dataflow
In this tutorial, you use audit logs to capture data operations. You configure Cloud Logging to emit BigQuery data access logs to Pub/Sub and deploy a streaming Dataflow pipeline to analyze the SQL query in the access log to identify and extract lineage. The extracted lineage is persisted in a BigQuery table, a Data Catalog tag, and—to enable downstream use cases—a Pub/Sub topic.
This tutorial is intended for people who work in data engineering and data governance. This tutorial assumes that you have basic knowledge of building Dataflow pipelines using Apache Beam Java SDK, shell scripting, and BigQuery.
Architecture
The main components in the architecture in this tutorial are as follows:
- BigQuery: In this tutorial, BigQuery acts as a data warehouse and schema provider BigQuery also provides APIs for reading table schema.
- BigQuery audit logs:
BigQuery audit logs are used as the operation logs in this
architecture. BigQuery audit logs provide three
different types of logs; admin activity, data access, and system events.
BigQueryAuditMetadata
messages in the log contain integrated resource information such as table names and executed SQL queries. - Data Catalog. Data Catalog is a fully managed and highly scalable data discovery and metadata management service within Dataplex. It provides data governance features through support for tags on data entities like BigQuery tables, including policy tags that provide column level access-control for BigQuery tables. When attached to a data entity, these policy tags allow for the inclusion of additional metadata. You can also use the policy tags to add lineage metadata.
- ZetaSQL: ZetaSQL is the grammar provider used in this tutorial.
The following architectural diagram shows how the components used in this tutorial interact to create a data lineage extraction system for a BigQuery data warehouse in the Google Cloud ecosystem.
The flow of events in the architecture is as follows:
The extraction pipeline uses the audit logs emitted from BigQuery to Pub/Sub for consuming job information data in real time.
The Dataflow pipeline processes the job information to build data lineage by parsing the SQL code for the query using ZetaSQL grammar engine.
The extraction pipeline uses the table schema from the BigQuery API and persists the generated lineage in a BigQuery table and as a tag in Data Catalog.
You can query the lineage table to identify the full flow of data in the data warehouse.
Limitations
The grammar provider used in this tutorial has the following limitations:
- The grammar provider only supports
LOAD
,QUERY
, andCOPY
jobs. - You must specify a fully qualified table name when you make a query—
for example,
project-id.dataset_id.table_id
. - Because of limitations in ZetaSQL, DDL statements aren't supported.
- This tutorial only demonstrates simple functions and aggregate function parsing. Complex functions and operations—for example, Window Aggregation, are out of the scope of this tutorial.
Objectives
- Implement a data lineage system reference architecture for BigQuery.
- Extract data lineage by using Dataflow streaming pipelines.
- Verify that data lineage is extracted in BigQuery and Data Catalog.
Costs
This tutorial uses the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.
Before you begin
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the BigQuery, Pub/Sub, Data Catalog, and Dataflow APIs.
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Download the code from the GitHub repository for this tutorial:
git clone https://github.com/GoogleCloudPlatform/bigquery-data-lineage.git cd bigquery-data-lineage
Setting the environment
This tutorial assumes that you have the Owner(roles/owner
) role.
The scripts and commands used in this tutorial rely on shell environment variables.
In Cloud Shell, set the environment variables for your project and the Pub/Sub topic name:
export PROJECT_ID=$(gcloud config get-value project) export AUDIT_LOGS_PUBSUB_TOPIC="TOPIC_ID"
Replace
TOPIC_ID
with a name of your choice for the Pub/Sub topic.Set the environment variable for the Google Cloud region:
export REGION_ID=CLOUD_REGION
For data localization purposes, you must replace
CLOUD_REGION
with a Dataflow regional endpoint.
Enabling audit logging
In this step, you use Cloud Logging
to capture operation logs in Google Cloud centrally. To enable the ingestion
of audit logs into the extraction pipeline, you also set up the export of data_access
logs to a message bus, in this case, Pub/Sub.
Create a Pub/Sub topic
In Cloud Shell, create a Pub/Sub topic to receive audit logs:
gcloud pubsub topics create $AUDIT_LOGS_PUBSUB_TOPIC --project $PROJECT_ID
Export logs to Pub/Sub
The audit logs are available in Cloud Logging. You can access the audit logs using the Cloud Logging API.
In Cloud Shell, filter and extract the BigQuery audit logs to Pub/Sub for real-time processing:
export LOG_SINK_ID=name-of-log-sink gcloud logging sinks create $LOG_SINK_ID \ pubsub.googleapis.com/projects/$PROJECT_ID/topics/$AUDIT_LOGS_PUBSUB_TOPIC \ --log-filter='protoPayload.metadata."@type"="type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata" protoPayload.methodName="google.cloud.bigquery.v2.JobService.InsertJob" operation.last=true'
To allow the logging service account to publish log entries, assign the
pubsub.publisher
role to the logging service account:# Identify the Logs writer service account export LOGGING_WRITER_IDENTITY=$(gcloud logging sinks describe $LOG_SINK_ID --format="get(writerIdentity)" --project $PROJECT_ID) # Grant Publish permission to the Logging writer gcloud pubsub topics add-iam-policy-binding $AUDIT_LOGS_PUBSUB_TOPIC \ --member=$LOGGING_WRITER_IDENTITY \ --role='roles/pubsub.publisher' \ --project $PROJECT_ID
Create a BigQuery dataset and table
In this step, you create a BigQuery table which persists the extracted lineage.
In Cloud Shell, create a BigQuery dataset:
export BIGQUERY_REGION=BIGQUERY_MULTI_REGION export DATASET_ID=NAME_OF_LINEAGE_DATASET bq --location=$BIGQUERY_REGION \ --project_id=$PROJECT_ID \ mk --dataset $DATASET_ID
Replace the following:
BIGQUERY_MULTI_REGION
: one of the BigQuery regional locations. You must use the same cloud region or cloud multi-region as your selected BigQuery regional location.NAME_OF_LINEAGE_DATASET
: a name of your choice for the BigQuery dataset.
In Cloud Shell, create a partitioned table to store lineage information:
export LINEAGE_TABLE_ID=NAME_OF_LINEAGE_TABLE bq mk --table \ --project_id=$PROJECT_ID \ --description "Data Lineage table" \ --time_partitioning_field "reconcileTime" \ $DATASET_ID.$LINEAGE_TABLE_ID \ lineage_bigquery_table_schema.json
Replace
NAME_OF_LINEAGE_TABLE
with your chosen table name.To verify that the table has been created, in the Google Cloud console, go to the BigQuery page.
You see an output similar to the preceding image.
Storing lineage information in Data Catalog
In this step, you set up Data Catalog to store the lineage information as a
custom tag-template
tag, which enables simple and centralized metadata operations in Google Cloud.
Create a tag template
In Cloud Shell, create the tag template:
export LINEAGE_TAG_TEMPLATE_NAME="TAG_TEMPLATE_NAME" gcloud data-catalog tag-templates create $LINEAGE_TAG_TEMPLATE_NAME \ --project $PROJECT_ID \ --location=$REGION_ID \ --display-name="Data Lineage" \ --field=id=jobTime,display-name="Entity change timestamp",type=timestamp,required=TRUE \ --field=id=reconcileTime,display-name="Lineage processed timestamp",type=timestamp,required=TRUE \ --field=id=actuator,display-name="The email address of the authorized executor of Job which can be a person or a service account",type=string \ --field=id=jobId,display-name="The BigQuery or Operation Job Id which made the change to the table",type=string \ --field=id=parents,display-name="The tables which were read for generating this entity",type=string
Replace
TAG_TEMPLATE_NAME
with a name of your choice. This name becomes a tag template to hold lineage information in Data Catalog.Verify that the template is created:
export LINEAGE_TAG_TEMPLATE_ID="projects/${PROJECT_ID}/locations/${REGION_ID}/tagTemplates/${LINEAGE_TAG_TEMPLATE_NAME}" gcloud data-catalog tag-templates describe $LINEAGE_TAG_TEMPLATE_NAME \ --location=$REGION_ID
The output is similar to the following:
displayName: Data Lineage fields: actuator: displayName: The email address of the authorized executor of Job which can be a person or a service account name: projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/actuator type: primitiveType: STRING jobId: displayName: The BigQuery or Operation Job Id which made the change to the table name: projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/jobId type: primitiveType: STRING jobTime: displayName: Entity change timestamp isRequired: true name: projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/jobTime type: primitiveType: TIMESTAMP parents: displayName: The tables which were read for generating this entity name: projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/parents type: primitiveType: STRING reconcileTime: displayName: Lineage processed timestamp isRequired: true name: projects/bq-lineage-demo/locations/us-central1/tagTemplates/data_lineage_tag/fields/reconcileTime type: primitiveType: TIMESTAMP
Extracting lineage
In this step, you set up a Pub/Sub topic to export the extracted lineage in real time. You then launch the Dataflow pipeline to extract lineage. The Dataflow pipeline processes the SQL query in the audit log to identify data lineage. The Dataflow pipeline uses ZetaSql to understand the SQL query and BigQuery API to retrieve the table schema information.
Create a lineage output Pub/Sub topic
The extraction pipeline processes the SQL code to identify data lineage that you can use to activate multiple downstream applications—for example, automatically enforcing data access policy for new tables, or data exfiltration alerts. To allow for future data lineage use cases, a Pub/Sub-based message bus enables decoupling from the extraction pipeline.
Create a Pub/Sub topic to distribute lineage messages to other systems:
export LINEAGE_OUTPUT_PUBSUB_TOPIC="composite-lineage" gcloud pubsub topics create $LINEAGE_OUTPUT_PUBSUB_TOPIC \ --project $PROJECT_ID
Running the extraction pipeline
The extraction pipeline parses the SQL statements in the data_access
log
entries using ZetaSQL to identify lineage elements. The extraction pipeline then
inserts the information into the BigQuery table that you created in
the previous step.
In Cloud Shell, create a Cloud Storage bucket as a temporary and staging area for Dataflow jobs:
export TEMP_GCS_BUCKET=TEMP_BUCKET_ID gsutil mb -l $REGION_ID -p $PROJECT_ID gs://$TEMP_GCS_BUCKET
Replace the
TEMP_BUCKET_ID
placeholder variable with a name of your choice.Use Cloud Build to build the JAR file:
gcloud builds submit --config=cloudbuild.yaml \ --substitutions _TEMPLATE_IMAGE_TAG="gcr.io/${PROJECT_ID}/bigquery-data-lineage-extraction:latest",_MAIN_CLASS="com.google.cloud.solutions.datalineage.LineageExtractionPipeline" \ --project "${PROJECT_ID}"
Create a Dataflow Flex Template:
gcloud dataflow flex-template build \ "gs://${TEMP_GCS_BUCKET}/bigquery-data-lineage-templates" \ --project="${PROJECT_ID}" \ --image="gcr.io/${PROJECT_ID}/bigquery-data-lineage-extraction:latest" \ --metadata-file="lineage-extraction-metadata.json" \ --sdk-language="JAVA"
Launch the lineage extractor streaming pipeline using the Maven launcher:
gcloud dataflow flex-template run \ "bigquery-lineage-extraction-$(date +%Y%m%d%H%M%S)" \ --template-file-gcs-location="gs://${TEMP_GCS_BUCKET}/bigquery-data-lineage-templates" \ --project="${PROJECT_ID}" \ --region="${REGION_ID}" \ --temp-location="gs://${TEMP_GCS_BUCKET}/temp/" \ --staging-location="gs://${TEMP_GCS_BUCKET}/staging/" \ --parameters=lineageTableName="${PROJECT_ID}:${DATASET_ID}.${LINEAGE_TABLE_ID}" \ --parameters=tagTemplateId="${LINEAGE_TAG_TEMPLATE_ID}" \ --parameters=pubsubTopic="projects/${PROJECT_ID}/topics/${AUDIT_LOGS_PUBSUB_TOPIC}" \ --parameters=compositeLineageTopic="projects/${PROJECT_ID}/topics/${LINEAGE_OUTPUT_PUBSUB_TOPIC}"
This command compiles the pipeline code and deploys it to run on Dataflow.
To see if the pipeline was successfully launched, in the Google Cloud console, go to the Dataflow page.
Go to the Google Cloud console
The Dataflow job resembles the following Directed Acyclic Graph (DAG) image:
Manually checking data lineage output
In this step, you run a query to verify the output of the extractor pipeline, using a publicly available NCAA basketball dataset.
Run a query on sample data
In the Google Cloud console, go to the BigQuery page.
To set the destination table for query results, click More, then Query Settings.
Select Set a destination table for query results.
Paste the following code into the Query Editor and click Run query:
#standardSQL SELECT teams.id, CONCAT(colors.color,'-', teams.alias) AS team_alias_with_color, teams.name, colors.color AS team_color FROM `bigquery-public-data.ncaa_basketball.mbb_teams` AS teams INNER JOIN `bigquery-public-data.ncaa_basketball.team_colors` AS colors USING (id)
Once the query execution is completed, you can verify the outputs. To verify the outputs, see the table in the following section, Understanding the extracted lineage information.
Understanding the extracted lineage information
The following table shows lineage information extracted by the system. The table also shows the values that you can expect to see when you follow the steps in this tutorial:
Information type | Column | Column definition | Example value |
---|---|---|---|
Job information | reconcileTime |
The time in UTC at which the logs were processed to extract lineage. | The time at which the tutorial is run. |
Job information | jobId |
The BigQuery ID for the job which executed the query to manipulate data. You can get more details using the BigQuery API for jobs. | bquxjob_xxxxxxxxxx |
Job information | jobTime |
The time in UTC at which the user executed the BigQuery job or query. | The time at which you ran the manual verification step. |
Job information | actuator |
The user which executed the BigQuery query or job. In this case, the email address of the user or of the service account. | Your email address. |
Job information | transform.sql |
The BigQuery SQL statement for the BigQuery job | The output of the SQL query you run in Run a query on sample data |
Table lineage | target |
The BigQuery table which was created or updated with the job.
The table is represented as both a SqlResource and a linked resource. |
sqlResource:bigquery.table.{your-project-id}.MyDataSet.MyOutputTable |
Table lineage | parents |
The data entity which was used to build the target table. The entity can be a BigQuery table or, in the case of a load operation, a Cloud Storage file. | sqlResource:
sqlResource: |
Table lineage | operation
|
The type of job; COPY , LOAD , or QUERY |
QUERY_JOB |
Column lineage | target |
The target BigQuery table column. | id,team_alias_with_color, name, team_color
|
Column lineage | operations
|
A list of functions used to derive the target column from parent columns. | CONCAT (ZetaSQL::concat) |
Column lineage | parents
|
A list of columns from parent tables which make up the target column. |
Example values for column:
sqlResource:
sqlResource: |
Verify data entry in BigQuery
In the Google Cloud console, go to the BigQuery page.
Open the output dataset which you created in Create a BigQuery dataset and table.
In BigQuery, select and expand the dataset. Click the Preview tab. You see the following information:
- Job information
- Table level information
- Column information, including operations
Verify tags in Data Catalog
- In the Google Cloud console, go to Dataplex.
- In Dataplex, search for the following variable:
type=TABLE projectId:YOUR_PROJECT_ID
. Select your output table. You see a data lineage tag attached to the table, as shown in the following example image.
Column tags are applied to the table, as shown in the following example image.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
If you plan to continue with the rest of this series, keep the resources that you've already created. Otherwise, delete the project that contains the resources, or keep the project and delete the individual resources.Delete the project
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
What's next
- Learn more about data lineage concepts and use cases for your data warehouse in BigQuery.
- Learn how to dynamically propagate data access policy to derived tables
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.