This tutorial demonstrates how to use Dataflow to extract, transform, and load (ETL) data from an online transaction processing (OLTP) relational database into BigQuery for analysis.
This tutorial is intended for database admins, operations professionals, and cloud architects interested in taking advantage of the analytical query capabilities of BigQuery and the batch processing capabilities of Dataflow.
OLTP databases are often relational databases that store information and process transactions for ecommerce sites, software as a service (SaaS) applications, or games. OLTP databases are usually optimized for transactions, which require the ACID properties: atomicity, consistency, isolation, and durability, and typically have highly normalized schemas. In contrast, data warehouses tend to be optimized for data retrieval and analysis, rather than transactions, and typically feature denormalized schemas. Generally, denormalizing data from an OLTP database makes it more useful for analysis in BigQuery.
Objectives
The tutorial shows two approaches to ETL normalized RDBMS data into denormalized BigQuery data:
- Using BigQuery to load and transform the data. Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you automate larger or multiple datasets.
- Using Dataflow to load, transform, and cleanse the data. Use this approach to load a larger amount of data, load data from multiple data sources, or to load data incrementally or automatically.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Compute Engine and Dataflow APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Compute Engine and Dataflow APIs.
Using the MusicBrainz dataset
This tutorial relies on JSON snapshots of tables in the MusicBrainz database, which is built on PostgreSQL and contains information about all of the MusicBrainz music. Some elements of the MusicBrainz schema include:
- Artists
- Release groups
- Releases
- Recordings
- Works
- Labels
- Many of the relationships between these entities.
The MusicBrainz schema includes three relevant tables: artist
, recording
,
and artist_credit_name
. An artist_credit
represents credit given to the
artist for a recording, and the artist_credit_name
rows link the recording
with its corresponding artist through the artist_credit
value.
This tutorial provides the PostgreSQL tables already extracted into
newline-delimited JSON format and stored in a public Cloud Storage
bucket: gs://solutions-public-assets/bqetl
If you want to perform this step yourself, you need to have a PostgreSQL database containing the MusicBrainz dataset, and use the following commands to export each of the tables:
host=POSTGRES_HOST
user=POSTGRES_USER
database=POSTGRES_DATABASE
for table in artist recording artist_credit_name
do
pg_cmd="\\copy (select row_to_json(r) from (select * from ${table}) r ) to exported_${table}.json"
psql -w -h ${host} -U ${user} -d ${db} -c $pg_cmd
# clean up extra '\' characters
sed -i -e 's/\\\\/\\/g' exported_${table}.json
done
Approach 1: ETL with BigQuery
Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you use automation with larger or multiple datasets.
Create a BigQuery dataset
To create a BigQuery dataset, you load the MusicBrainz tables into BigQuery individually, and then you join the tables that you loaded so that each row contains the data linkage that you want. You store the join results in a new BigQuery table. Then you can delete the original tables that you loaded.
In the Google Cloud console, open BigQuery.
In the Explorer panel, click the menu more_vert next to your project name, and then click Create data set.
In the Create data set dialog, complete the following steps:
- In the Data set ID field, enter
musicbrainz
. - Set the Data Location to us.
- Click Create data set.
- In the Data set ID field, enter
Import MusicBrainz tables
For each MusicBrainz table, perform the following steps to add a table to the dataset you created:
- In the Google Cloud console BigQuery Explorer panel,
expand the row with your project name to show the newly created
musicbrainz
dataset. - Click the menu
more_vert
next to your
musicbrainz
dataset, and then click Create Table. In the Create Table dialog, complete the following steps:
- In the Create table from drop-down list, select Google Cloud Storage.
In the Select file from GCS bucket field, enter the path to the data file:
solutions-public-assets/bqetl/artist.json
For File format, select JSONL (Newline Delimited JSON).
Ensure that Project contains your project name.
Ensure that Data set is
musicbrainz
.For Table, enter the table name,
artist
.For Table type, leave Native table selected.
Below the Schema section, click to turn on Edit as Text.
Download the
artist
schema file and open it in a text editor or viewer.Replace the contents of the Schema section with the contents of the schema file you downloaded.
Click Create Table:
Wait a few moments for the load job to complete.
When the load has finished, the new table appears under the dataset.
Repeat steps 1 - 5 to create the
artist_credit_name
table with the following changes:Use the following path for the source data file:
solutions-public-assets/bqetl/artist_credit_name.json
Use
artist_credit_name
as the Table name.Download the
artist_credit_name
schema file and use the contents for the schema.
Repeat steps 1 - 5 to create the
recording
table with the following changes:Use the following path for the source data file:
solutions-public-assets/bqetl/recording.json
Use
recording
as the Table name.Download the
recording
schema file. and use the contents for the schema.
Manually denormalize the data
To denormalize the data, join the data into a new BigQuery table that has one row for each artist's recording, together with selected metadata you want retained for analysis.
- If the BigQuery query editor is not open in the Google Cloud console click add_box Compose New Query.
Copy the following query and paste it into the Query Editor:
SELECT artist.id, artist.gid AS artist_gid, artist.name AS artist_name, artist.area, recording.name AS recording_name, recording.length, recording.gid AS recording_gid, recording.video FROM `musicbrainz.artist` AS artist INNER JOIN `musicbrainz.artist_credit_name` AS artist_credit_name ON artist.id = artist_credit_name.artist INNER JOIN `musicbrainz.recording` AS recording ON artist_credit_name.artist_credit = recording.artist_credit
Click the settings More drop-down list, and then select Query settings.
In the Query settings dialog, complete the following steps:
- Select Set a destination table for query results.
- In Dataset, enter
musicbrainz
and select the dataset in your project. - In Table id enter
recordings_by_artists_manual
. - For Destination table write preference, click Overwrite table.
- Select the Allow Large Results (no size limit) checkbox.
- Click Save.
Click play_circle_filled Run.
When the query is complete, the data from the query result is organized into songs for each artist in the newly created BigQuery table, and a sample of the results shown in the Query Results pane, for example:
Row id artist_gid artist_name area recording_name length recording_gid video 1 97546 125ec42a... unknown 240 Horo Gun Toireamaid Hùgan Fhathast Air 174106 c8bbe048... FALSE 2 266317 2e7119b5... Capella Istropolitana 189 Concerto Grosso in D minor, op. 2 no. 3: II. Adagio 134000 af0f294d... FALSE 3 628060 34cd3689... Conspirare 5196 Liturgy, op. 42: 9. Praise the Lord from the Heavens 126933 8bab920d... FALSE 4 423877 54401795... Boys Air Choir 1178 Nunc Dimittis 190000 111611eb... FALSE 5 394456 9914f9f9... L’Orchestre de la Suisse Romande 23036 Concert Waltz no. 2, op. 51 509960 b16742d1... FALSE
Approach 2: ETL into BigQuery with Dataflow
In this section of the tutorial, instead of using the BigQuery UI, you use a sample program to load data into BigQuery by using a Dataflow pipeline. Then, you use the Beam programming model to denormalize and cleanse data to load into BigQuery.
Before you begin, review the concepts and the sample code.
Review the concepts
Although the data is small and can quickly be uploaded by using the BigQuery UI, for the purpose of this tutorial you can also use Dataflow for ETL. Use Dataflow for ETL into BigQuery instead of the BigQuery UI when you are performing massive joins, that is, from around 500-5000 columns of more than 10 TB of data, with the following goals:
- You want to clean or transform your data as it's loaded into BigQuery, instead of storing it and joining afterwards. As a result, this approach also has lower storage requirements because data is only stored in BigQuery in its joined and transformed state.
- You plan to do custom data cleansing (which cannot be simply achieved with SQL).
- You plan to combine the data with data outside of the OLTP, such as logs or remotely accessed data, during the loading process.
- You plan to automate testing and deployment of data-loading logic using continuous integration or continuous deployment (CI/CD).
- You anticipate gradual iteration, enhancement, and improvement of the ETL process over time.
- You plan to add data incrementally, as opposed to performing a one-time ETL.
Here's a diagram of the data pipeline that's created by the sample program:
In the example code, many of the pipeline steps are grouped or wrapped in convenience methods, given descriptive names, and reused. In the diagram, reused steps are indicated by dashed borders.
Review the pipeline code
The code creates a pipeline that performs the following steps:
Loads each table that you want to be part of the join from the public Cloud Storage bucket into a
PCollection
of strings. Each element comprises the JSON representation of a row of the table.Converts those JSON strings to object representations,
MusicBrainzDataObject
objects, and then organize the object representations by one of the column values, such as a primary or foreign key.Joins the list based on common artist. The
artist_credit_name
links an artist credit with its recording and includes the artist foreign key. Theartist_credit_name
table is loaded as a list of key valueKV
objects. TheK
member is the artist.Joins the list by using the
MusicBrainzTransforms.innerJoin()
method.- Groups the collections of
KV
objects by the key member on which you want to join. This results in aPCollection
ofKV
objects with a long key (theartist.id
column value) and resultingCoGbkResult
(which stands for combine group by key result). TheCoGbkResult
object is a tuple of lists of objects with the key value in common from the first and secondPCollections
. This tuple is addressable by using the tuple tag formulated for eachPCollection
prior to running theCoGroupByKey
operation in thegroup
method. Merges each matchup of objects into a
MusicBrainzDataObject
object that represents a join result.Reorganizes the collection into a list of
KV
objects to begin the next join. This time, theK
value is theartist_credit
column, which is used to join with the recording table.Obtains the final resulting collection of
MusicBrainzDataObject
objects by joining that result with the loaded collection of recordings that are organized byartist_credit.id
.Maps the resulting
MusicBrainzDataObjects
objects intoTableRows
.Writes the resulting
TableRows
into BigQuery.
- Groups the collections of
For details about the mechanics of Beam pipeline programming, review the following topics about the programming model:
PCollection
- Loading data from text files (including Cloud Storage)
- Transforms such as
ParDo
and MapElements` - Joining and
GroupByKey
- BigQuery IO
After you review the steps that the code performs, you can run the pipeline.
Create a cloud storage bucket
Run the pipeline code
In the Google Cloud console, open Cloud Shell.
Set the environment variables for your project and pipeline script
export PROJECT_ID=PROJECT_ID export REGION=us-central1 export DESTINATION_TABLE=recordings_by_artists_dataflow export DATASET=musicbrainz
Replace PROJECT_ID with the project ID of your Google Cloud project.
Make sure that
gcloud
is using the project you created or selected at the beginning of the tutorial:gcloud config set project $PROJECT_ID
Following the security principle of least privilege, create a service account for the Dataflow pipeline and grant it only the necessary privileges: the
roles/dataflow.worker
,roles/bigquery.jobUser
, and thedataEditor
role on themusicbrainz
dataset:gcloud iam service-accounts create musicbrainz-dataflow export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member=serviceAccount:${SERVICE_ACCOUNT} \ --role=roles/dataflow.worker gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member=serviceAccount:${SERVICE_ACCOUNT} \ --role=roles/bigquery.jobUser bq query --use_legacy_sql=false \ "GRANT \`roles/bigquery.dataEditor\` ON SCHEMA musicbrainz TO 'serviceAccount:${SERVICE_ACCOUNT}'"
Create a bucket for the Dataflow pipeline to use for temporary files, and grant the
musicbrainz-dataflow
service accountOwner
privileges to it:export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID} gsutil mb -l us ${DATAFLOW_TEMP_BUCKET} gsutil acl ch -u ${SERVICE_ACCOUNT}:O ${DATAFLOW_TEMP_BUCKET}
Clone the repository that contains the Dataflow code:
git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
Change directory to the sample:
cd bigquery-etl-dataflow-sample
Compile and run the Dataflow job:
./run.sh simple
The job should take about 10 minutes to run.
To see the progress of the pipeline, in the Google Cloud console, go to the Dataflow page.
The status of the jobs is shown in the status column. A status of Succeeded indicates that the job is complete.
(Optional) To see the job graph and details about the steps, click the job name, for example,
etl-into-bigquery-bqetlsimple
.When the job has completed, go to the BigQuery page.
To run a query on the new table, in the Query editor pane, enter the following:
SELECT artist_name, artist_gender, artist_area, recording_name, recording_length FROM musicbrainz.recordings_by_artists_dataflow WHERE artist_area is NOT NULL AND artist_gender IS NOT NULL LIMIT 1000;
The result pane will show a set of results similar to the following:
Row artist_name artist_gender artist_area recording_name recording_length 1 mirin 2 107 Sylphia 264000 2 mirin 2 107 Dependence 208000 3 Gaudiburschen 1 81 Die Hände zum Himmel 210000 4 Sa4 1 331 Ein Tag aus meiner Sicht 221000 5 Dpat 1 7326 Cutthroat 249000 6 Dpat 1 7326 Deloused 178000 The actual output may differ as the results are not ordered.
Cleanse the data
Next, you make a slight change to the Dataflow pipeline so that you can load lookup tables and process them as side inputs, as shown in the following diagram.
When you query the resulting BigQuery table, it's difficult to
determine from where the artist originates without manually looking up the area
numeric ID from the area
table in the MusicBrainz database. This makes
analyzing query results less straightforward than it could be.
Similarly, artist genders are shown as IDs, but the entire MusicBrainz gender
table consists of only three rows. To fix this, you can add a step in the
Dataflow pipeline to use the MusicBrainz area
and gender
tables to map the IDs to their proper labels.
Both artist_area
and artist_gender
tables contain a significantly smaller
number of rows than artists or recording data table. The number of elements in
the later tables are constrained by the number of geographic areas or genders
respectively.
As a result, the lookup step uses the Dataflow feature called side input.
Side inputs are loaded as table exports of line-delimited JSON files in the public Cloud Storage bucket containing the musicbrainz dataset, and are used to denormalize the table data in a single step.
Review the code that adds side inputs to the pipeline
Before running the pipeline, review the code to get a better understanding of the new steps.
This code demonstrates data cleansing with side inputs. The
MusicBrainzTransforms
class provides some added convenience for using side
inputs to map foreign key values to labels. The MusicBrainzTransforms
library
provides a method that creates an internal lookup class. The lookup class
describes each lookup table and the fields that are replaced with labels and
variable length arguments. keyKey
is the name of the column that contains the
key for the lookup and valueKey
is the name of the column that contains the
corresponding label.
Each side input is loaded as a single map object, which is used to look up the corresponding label for an ID.
First, the JSON for the lookup table is initially loaded into
MusicBrainzDataObjects
with an empty namespace and turned into a map from the
Key
column value to the Value
column value.
Each of these Map
objects are put into a Map
by the value of its
destinationKey
, which is the key to replace with the looked up values.
Then, while transforming the artist objects from JSON, the value of the
destinationKey
(which starts out as a number) is replaced with its label.
To add the decoding of the artist_area
and artist_gender
fields,
complete the following steps:
In Cloud Shell, ensure the environment is set up for the pipeline script:
export PROJECT_ID=PROJECT_ID export REGION=us-central1 export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs export DATASET=musicbrainz export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID} export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
Replace PROJECT_ID with the project ID of your Google Cloud project.
Run the pipeline to create the table with decoded area and artist gender:
./run.sh simple-with-lookups
As before, to see the progress of the pipeline, go to the Dataflow page.
The pipeline will take approx 10 minutes to complete.
When the job has completed, go to the BigQuery page.
Perform the same query that includes
artist_area
andartist_gender
:SELECT artist_name, artist_gender, artist_area, recording_name, recording_length FROM musicbrainz.recordings_by_artists_dataflow_sideinputs WHERE artist_area is NOT NULL AND artist_gender IS NOT NULL LIMIT 1000;
In the output, the
artist_area
andartist_gender
are now decoded:Row artist_name artist_gender artist_area recording_name recording_length 1 mirin Female Japan Sylphia 264000 2 mirin Female Japan Dependence 208000 3 Gaudiburschen Male Germany Die Hände zum Himmel 210000 4 Sa4 Male Hamburg Ein Tag aus meiner Sicht 221000 5 Dpat Male Houston Cutthroat 249000 6 Dpat Male Houston Deloused 178000 The actual output may differ, because the results are not ordered.
Optimize the BigQuery schema
In the final part of this tutorial, you run a pipeline that generates a more optimal table schema using nested fields.
Take a moment to review the code that is used to generate this optimized version of the table.
The following diagram shows a slightly different Dataflow pipeline that nests the artist's recordings within each artist row, rather than creating duplicate artist rows.
The current representation of the data is fairly flat. That is, it includes one
row per credited recording that includes all the artist's metadata from the
BigQuery schema, and all the recording and artist_credit_name
metadata. This flat representation has at least two drawbacks:
- It repeats the
artist
metadata for every recording credited to an artist, which in turn increases the storage required. - When you export the data as JSON, it exports an array that repeats that data, instead of an artist with the nested recording data — which is probably what you want.
Without any performance penalty and without using additional storage, instead of storing one recording per row, you can store recordings as a repeated field in each artist record by making some changes to the Dataflow pipeline.
Instead of joining the recordings with their artist information by
artist_credit_name.artist
, this alternate pipeline creates a nested list of
recordings within an artist object.
The BigQuery API has a maximum row size limit
of 100 MB when performing bulk inserts (10 MB for streaming inserts),
so the code limits the number of nested recordings for a given record to 1000
elements to ensure that this limit is not reached. If a given artist has
more than 1000 recordings, the code duplicates the row, including the artist
metadata, and continues nesting the recording data in the duplicate row.
The diagram shows the sources, transformations, and sinks of the pipeline.
In most cases, the step names are supplied in code as part of the apply
method
call.
To create this optimized pipeline, complete the following steps:
In Cloud Shell, ensure the environment is set up for the pipeline script:
export PROJECT_ID=PROJECT_ID export REGION=us-central1 export DESTINATION_TABLE=recordings_by_artists_dataflow_nested export DATASET=musicbrainz export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID} export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
Run the pipeline to nest recording rows within artist rows:
./run.sh nested
As before, to see the progress of the pipeline, go to the Dataflow page.
The pipeline will take approx 10 minutes to complete.
When the job has completed, go to the BigQuery page.
Query fields from the nested table in BigQuery:
SELECT artist_name, artist_gender, artist_area, artist_recordings FROM musicbrainz.recordings_by_artists_dataflow_nested WHERE artist_area IS NOT NULL AND artist_gender IS NOT NULL LIMIT 1000;
In the output, the
artist_recordings
are shown as nested rows that can be expanded:Row artist_name artist_gender artist_area artist_recordings 1 mirin Female Japan (5 rows) 3 Gaudiburschen Male Germany (1 row) 4 Sa4 Male Hamburg (10 rows) 6 Dpat Male Houston (9 rows) The actual output may differ as the results are not ordered.
Run a query to extract values from the
STRUCT
and use those values to filter the results, for example for artists who have recordings containing the word "Justin":SELECT artist_name, artist_gender, artist_area, ARRAY(SELECT artist_credit_name_name FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name, ARRAY(SELECT recording_name FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name FROM musicbrainz.recordings_by_artists_dataflow_nested, UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct WHERE artist_recordings_struct.recording_name LIKE "%Justin%" LIMIT 1000;
In the output, the
artist_credit_name_name
andrecording_name
are shown as nested rows that can be expanded, for example:Row artist_name artist_gender artist_area artist_credit_name_name recording_name 1 Damonkenutz null null (1 row) 1 Yellowpants (Justin Martin remix) 3 Fabian Male Germany (10+ rows) 1 Heatwave . 2 Starlight Love . 3 Dreams To Wishes . 4 Last Flight (Justin Faust remix) . ... 4 Digital Punk Boys null null (6 rows) 1 Come True . 2 We Are... (Punkgirlz remix by Justin Famous) . 3 Chaos (short cut) . ... The actual output may differ as the results are not ordered.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Deleting individual resources
Follow these steps to delete individual resources, instead of deleting the whole project.
Deleting the Cloud Storage bucket
- In the Google Cloud console, go to the Cloud Storage Browser page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Deleting the BigQuery datasets
Open the BigQuery web UI.
Select the BigQuery datasets you created during the tutorial.
Click Deletedelete.
What's next
- Learn more about writing queries for BigQuery. Querying data explains how to run synchronous and asynchronous queries, create user-defined functions (UDFs), and more.
- Explore BigQuery syntax. BigQuery uses a SQL-like syntax that is described in the Query reference (legacy SQL).
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.