Performing ETL from a Relational Database into BigQuery

This tutorial demonstrates how to extract, transform, and load (ETL) data from an online transaction processing (OLTP) relational database into Google BigQuery for analysis.

OLTP databases include relational databases that store information and process transactions for e-commerce sites, SaaS applications, or games. OLTP databases are usually optimized for transactions, which require consistency and reliability, and are highly normalized. In contrast, data warehouses tend to be optimized for data retrieval and analysis, rather than transactions, and so are denormalized. Generally, denormalizing data from an OLTP database makes it more useful for analysis in BigQuery.

Objectives

The tutorial shows two approaches to ETL into BigQuery:

  • Using the BigQuery UI to perform the loading and transformation. Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you use automation with larger or multiple datasets.
  • Using Google Cloud Dataflow to perform the load and transform steps, and then following some additional techniques for cleansing the data with Cloud Dataflow. Use this approach to load a larger amount of data, load data from multiple data sources, or to load data incrementally or automatically.

Costs

This tutorial uses billable components of Google Cloud Platform, including:

  • Google Compute Engine
  • Google Cloud Storage
  • Cloud Dataflow
  • BigQuery

Use the Pricing Calculator to generate a cost estimate based on your projected usage.

New Cloud Platform users might be eligible for a free trial.

Before you begin

Before you begin this tutorial, use the Google Cloud Platform Console to create or select a project and enable billing.

  1. Sign in to your Google account.

    If you don't already have one, sign up for a new account.

  2. Select or create a Cloud Platform project.

    Go to the Manage resources page

  3. Enable billing for your project.

    Enable billing

  4. Compute Engine is automatically enabled in new projects. To activate Compute Engine in a pre-existing project, Enable the Compute API.

    Enable the API

  5. Install the command-line tool with the project you created above.

Using the MusicBrainz dataset

This tutorial relies on JSON snapshots of tables in the MusicBrainz database, which is built on PostgreSQL and contains information about all of the MusicBrainz music, including:

The MusicBrainz schema includes three relevant tables: artist, recording, and artist_credit_name. An artist credit represents credit given the artist for a recording, and artist_credit_name rows link the recording with its corresponding artist through the artist_credit value.

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json #clean up extra '\' characters

Approach 1: ETL with BigQuery Web UI

Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you use automation with larger or multiple datasets.

Create a BigQuery dataset

The following diagram illustrates the steps you take to create a BigQuery dataset:

image

You load the MusicBrainz tables into BigQuery individually, then join the tables you loaded so that each row contains the data linkage you want. You store the join results in a new BigQuery table. Then you can delete the original tables that you loaded.

  1. Open the BigQuery web UI.

    OPEN BIGQUERY

  2. To the right of the dataset name, click the down-arrow button. screenshot

  3. Click Create new dataset.
  4. In the Create Dataset dialog, add a Dataset ID and expiration period, and then click OK.

Import MusicBrainz tables

The following table shows the locations of the JSON files that you use to complete this step.

Table name Schema file Data file
artist
https://storage.googleapis.com/solutions-public-assets/bqetl/artist_schema.json
gs://solutions-public-assets/bqetl/artist.json
artist_credit_name
https://storage.googleapis.com/solutions-public-assets/bqetl/artist_credit_name_schema.json
gs://solutions-public-assets/bqetl/artist_credit_name.json 
recording
https://storage.googleapis.com/solutions-public-assets/bqetl/recording_schema.json
gs://solutions-public-assets/bqetl/recording.json

For each MusicBrainz table, perform the following steps:

  1. Add a table to the dataset you created above by hovering over the dataset name in the BigQuery column and clicking the plus symbol.
  2. In the Create Table dialog, from the Location drop-down list, select Google Cloud Storage.
  3. In the text field to the right of the Location drop-down list, type the url for the data file (for example, type gs://solutions-public-assets/bqetl/artist.json).
  4. For Table name, enter the table name (for example, type artist).
  5. For File format, select JSON (Newline Delimited).
  6. For Table type, leave Native table selected.
  7. Below the Schema section, click Edit as Text.
  8. Download the schema file for the table using the URL in the table above.
  9. Replace the contents of the Schema section with the contents of the schema file you downloaded.
  10. Click Create Table.

screenshot

Denormalize the data

To denormalize the data, join the data into a new BigQuery table that has one row for each artist’s recording, together with selected metadata you want retained for analysis.

  1. Copy the following query into the query text area.

    SELECT artist.id, artist.gid, artist.name, artist.area, recording.name,
    recording.length, recording.gid, recording.video  FROM
    [DATASET].artist as artist inner join
    [DATASET].artist_credit_name as artist_credit_name on artist.id
    = artist_credit_name.artist inner join [DATASET].recording as
    recording on artist_credit_name.artist_credit = recording.artist_credit
    
  2. Replace [DATASET] with the name of the dataset.

  3. Click Show Options and then make the following selections:

    1. Select Table for Destination Table, and then create a new table called recordings_by_artists.
    2. Check Allow Large Results.
    3. For Write Preference, select Overwrite table.
    4. Click Run Query.

screenshot

The data is organized into songs for each artist in the newly created BigQuery table.

Approach 2: ETL into BigQuery with Cloud Dataflow

In this section of the tutorial, instead of using the BigQuery Web UI, you use a sample program to load data into BigQuery using a Cloud Dataflow pipeline. Then you use the Cloud Dataflow programming model to denormalize and cleanse data for loading into BigQuery.

Before you begin, take a moment to review the concepts and the sample code.

Review the concepts

Although the data here is small and quickly loaded using the BigQuery Web UI, using Cloud Dataflow for ETL into BigQuery instead of the BigQuery Web UI is optimal when you are performing massive joins, that is, from around 500-5000 columns of more than 10 TB of data, with the following goals:

  • Your data can be cleansed as it’s loaded into BigQuery, instead of stored and joined afterwards (around $5 per TB), with lower storage requirements as well.
  • You plan to do custom data cleansing.
  • You plan to combine the data with data outside of the OLTP, such as logs or remotely accessed data.
  • You plan to automate data-loading using continuous integration or continuous deployment (CI/CD).
  • You anticipate gradual iteration and enhancement/improvement of the ETL process over time.
  • You plan to add data incrementally, as opposed to performing a one-time ETL.

Here’s a diagram of the data pipeline that’s created by the sample program:

image

In the example code, many of the pipeline steps are grouped and or wrapped in convenience methods, given descriptive names, and reused. In the diagram, re-used steps are indicated by dashed borders.

Review the pipeline code

The code creates a pipeline that performs the following steps.

  1. Loading each table that you want to be part of the join into a PCollection of strings. Each element comprises the JSON representation of a row of the table:

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(TextIO.Read.named(name).from(objectToLoad));
    }

  2. Converting those JSON strings to object representations, MusicBrainzDataObject objects, and then organizing the object representations by one of the column values, such as a primary or foreign key.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name, MapElements.via((String input) -> {
        MusicBrainzDataObject datum = JSONReader.readObject(name, input);
        Long key = (Long) datum.getColumnValue(namespacedKeyname);
        return KV.of(key, datum);
      }).withOutputType(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {
      }));
    }

  3. After converting the artist and artist_credit_name tables into lists of MusicBrainzDataObject objects, joining the two lists based on common artist. The artist_credit_name links an artist credit with its recording and includes the artist foreign key. The artist_credit_name table is loaded as a list of Key Value KV objects. The K member is the artist.

    PCollection<MusicBrainzDataObject> artistCredits = MusicBrainzTransforms.innerJoin("artists with artist credits",
        artists, artistCreditName);

    The lists are joined using the MusicBrainzTransforms.innerJoin() method shown below:

    public static PCollection<MusicBrainzDataObject> innerJoin(String name, PCollection<KV<Long,
              MusicBrainzDataObject>> table1, PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>();
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>();
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);

The MusicBrainzTransforms.innerJoin() method does the following:

  1. Groups the collections of KV objects by the key member on which you want to join. This results in a PCollection of KV objects with a long key (the artist.id column value) and resulting CoGbkResult (which stands for Combine Group by Key Result). The CoGbkResult object is a tuple of lists of objects with the key value in common from the first and second PCollections. This tuple is addressable using the Tuple tag formulated for each PCollection prior to the execution of the group by operation.

  2. Merges each matchup of objects into a MusicBrainzDataObject object that represents a join result.

    PCollection<List<MusicBrainzDataObject>> mergedResult = joinedResult.apply("merge join results", MapElements.via((KV<Long, CoGbkResult> group) -> {
      List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
      Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
      Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
      leftObjects.forEach((MusicBrainzDataObject l) -> {
        rightObjects.forEach((MusicBrainzDataObject r) -> {
          result.add(l.duplicate().merge(r));
        });
      });
      return result;
    }).withOutputType(new TypeDescriptor<List<MusicBrainzDataObject>>() {
    }));

  3. Results in a list of lists of MusicBrainzDataObject objects. Each list contains the total number of matches in the first table multiplied by the total number of matches in the second table for the given key value. The list of lists is then flattened into a single PCollection that contains all the MusicBrainzDataObject objects from the lists.

    return mergedResult.apply(new Flatten.FlattenIterables<>());

  4. Reorganizes the collection into a list of KV objects to begin the next join. This time, the K value is the artist_credit column, which is used to join with the recording table.

    PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);

  5. Obtains the final resulting collection of MusicBrainzDataObject objects by joining that result with the loaded collection of recordings that are organized by artist_credit.id.

    PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
       artistCreditNamesByArtistCredit, recordingsByArtistCredit);

  6. Maps the resulting MusicBrainzDataObjects objects into TableRows.

    PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);

  7. Writes the resulting table rows into BigQuery.

    tableRows.apply(BigQueryIO.Write
        .named("Write")
        .to(BQETLOptions.getBigQueryTablename())
        .withSchema(bqTableSchema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

For details about the mechanics of Cloud Dataflow pipeline programming, review the following specific topics about the programming model:

After you’ve reviewed the steps that the code performs, you can run it.

Run the pipeline code

  1. Ensure you are authenticated with the Cloud Platform. Run the following command to get application default credentials.

    gcloud auth login
    

  2. Configure the gcloud SDK to use the project you created

    gcloud config set project [PROJECT]
    
  3. Download and install the Java Development Kit (JDK) version 1.8 or later. Verify that the JAVA_HOME environment variable is set and points to your JDK installation.

  4. Download and install Apache Maven by following Maven's installation guide for your specific operating system.

  5. Clone the repository that contains the Cloud Dataflow code.

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  6. Change directory to the sample.

    cd bigquery-etl-dataflow-sample
    
  7. Create a staging bucket in Cloud Storage.

    gsutil mb gs://[STAGING_BUCKET_NAME]
    
  8. Set the object lifecycle for the [STAGING_BUCKET_NAME] to the one in dataflow-staging-policy.json.

    gsutil lifecycle set dataflow-staging-policy.json gs://[STAGING_BUCKET_NAME]
    
  9. Edit the sample run script provided with the tutorial, which calls Maven, for the simple pipeline:

    1. Copy the run-simple.example file to a file named run-simple and change the code to include the name of your project, BigQuery dataset, and destination table.

    2. Make sure that the destination table is different than the one you used in the previous section, so that you can compare the results.

  10. Run the Cloud Dataflow job in this example:

    ./run-simple
    
  11. (Optional) To see the progress of the pipeline:

    Go to the Cloud Dataflow Monitoring UI

  12. After the pipeline finishes writing the data, run a query on the new table, as shown in the following screenshot.

pipeline

Cleanse the data

Next, you’ll make a slight change to Cloud Dataflow pipeline so that you can load lookup tables and process them as side inputs, as shown in the following diagram.

image

In querying the resulting BigQuery table it is difficult to surmise where the artist is from. In the MusicBrainz database this is called the area. Just as in the MusicBrainz database, in the resulting BigQuery table, areas are shown as IDs. This makes analyzing query results, such as the results in the query above, less straightforward than it could be.

Similarly, artist genders are shown as IDs, but the entire MusicBrainz gender table consists of only three rows. To fix this, you can use the MusicBrainz area and gender tables to map the IDs to their proper labels.

Since neither artist_area nor artist_gender is sized on the order of the amount of artists or recording data, but rather constrained by the number of geographic areas or genders respectively, and on the order of megabytes, this is an appropriate use of the Cloud Dataflow feature called side input.

Here, side inputs are loaded as table exports of line-delimited JSON.

Add side inputs to the pipeline

In the BQETLSimple.java file, you’ll see that there are a few lines commented out:

//        PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//                MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//                MusicBrainzTransforms.lookup("gender","id","name","gender"));

    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

This code demonstrates data cleansing with side inputs. The MusicBrainzTransforms class provides some added convenience for using side inputs to map foreign key values to labels. Specifically, the MusicBrainzTransforms library provides a method that creates an internal lookup class. The lookup class describes each lookup table and the fields that should be replaced with labels and variable length arguments. keyKey is name of the column that contains the key for the lookup and valueKey is the name of the column that contains the corresponding label.

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Each side input is loaded as a single map object, which is used to look up the corresponding label for an ID.

First, the JSON for the lookup table is initially loaded into MusicBrainzDataObjects with an empty namespace and turned into a map from the Key column value to the Value column value.

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String keyKey, String valueKey) {
  String keyKeyName = "_" + keyKey;
  String valueKeyName = "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(MapElements.via((String input) -> {
    MusicBrainzDataObject object = JSONReader.readObject("", input);
    Long key = (Long) object.getColumnValue(keyKeyName);
    String value = (String) object.getColumnValue(valueKeyName);
    return KV.of(key, value);
  }).withOutputType(new TypeDescriptor<KV<Long, String>>() {
  }));

  return entries.apply(View.<Long, String>asMap());
}

Each of these Map objects is put into a Map by the value of its destinationKey, which is the key to replace with the looked up value.

Map<String, PCollectionView<Map<Long, String>>> mapSideInputs = new HashMap<String, PCollectionView<Map<Long, String>>>();
for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  mapper.destinationKeys.forEach((destinationKey) -> {
    mapSideInputs.put(name + "_" + destinationKey, mapView);
  });
}

Then, while transforming the artist objects from JSON, the value at the destinationKey (which starts out as a number) is replaced with its label.

Map<Long, String> sideInputMap = processContext.sideInput(mapping);
Long id = (Long) result.getColumnValue(key);
if (id != null) {
  String label = (String) sideInputMap.get(id);
  if (label == null) {
    label = "" + id;
  }
  result.replace(key, label);

Clean up the data from artist_area and artist_gender:

  1. Uncomment the lines in BQETLSimple.java that load the artist data using the lookups, and comment out the code (above) that loads the artist data without the lookups.

  2. Change the TableFieldSchemas for artist_area and artist_gender to be of data type string instead of int by commenting out the corresponding int fields and uncommenting the corresponding string fields.

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")

    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")

            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")

  3. Rerun the simple pipeline code and then perform the same query that includes artist_area and artist_gender, as shown in the following screenshot:

screenshot

Optimize the BigQuery schema

The following diagram shows a slightly different Cloud Dataflow pipeline that nests the artists’ recordings within each artist row, rather than creating duplicate artist rows.

image

Notice that the current representation of the data is fairly flat. That is, it includes one row per credited recording that includes all the artist metadata from the BigQuery schema, and all the recording and artist_credit_name metadata. This flat representation has at least two drawbacks:

  • It repeats the artist metadata for every recording credited to an artist, which in turn increases the storage required.
  • When you export the data as JSON, it will export an array that repeats that data, instead of an artist with the nested recording data — which is probably what you want.

Without any performance penalty and without using additional storage, instead of storing one recording per row, you can store recordings as a repeated field within each artist record by making some relatively straightforward changes to the Cloud Dataflow pipeline.

  1. Instead of joining the recordings with their artist information by artist_credit_name.artist, the pipeline creates a nested list of recordings within an artist object.

    public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                          PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                          String nestingKey) {
      final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>();
      final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>();
    
      PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
      return joinedResult.apply("merge join results " + nestingKey, MapElements.via((KV<Long, CoGbkResult> group) -> {
        MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
        Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
        List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
        children.forEach(childList::add);
        parentObject = parentObject.duplicate();
        parentObject.addColumnValue("recordings", childList);
        return parentObject;
      }).withOutputType(new TypeDescriptor<MusicBrainzDataObject>() {
      }));
    }

    TableRow has size limits in the BigQuery API, so the code limits the number of nested recordings for a given record to 1000 elements. If a given artist has more than 1000 recordings, the code duplicates the row, including the artist metadata, and continues nesting the recording data in the duplicate row.

    private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
      TableRow row = new TableRow();
      List<TableRow> result = new ArrayList<TableRow>();
      Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
      Set<String> keySet = serializableSchema.keySet();
      /*
       *  construct a row object without the nested objects
       */
      int maxListSize = 0;
      for (String key : keySet) {
        Object value = serializableSchema.get(key);
        Object fieldValue = mbdo.getColumnValue(key);
        if (fieldValue != null) {
          if (value instanceof Map) {
            List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
            if (list.size() > maxListSize) {
              maxListSize = list.size();
            }
            nestedLists.put(key, list);
          } else {
            row.set(key, fieldValue);
          }
    
        }
      }
      /*
       * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
       */
      TableRow parent = row.clone();
      Set<String> listFields = nestedLists.keySet();
      for (int i = 0; i < maxListSize; i++) {
        parent = (parent == null ? row.clone() : parent);
        final TableRow parentRow = parent;
        nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
          if (nestedList.size() > 0) {
            if (parentRow.get(key) == null) {
              parentRow.set(key, new ArrayList<TableRow>());
            }
            List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
            childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
          }
        });
        if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
          result.add(parent);
          parent = null;
        }
      }
      if (parent != null) {
        result.add(parent);
      }
      return result;
    }

  2. Copy the run-nested.example script to ./run-nested and change the #PROJECT, #STAGING_BUCKET, #DATASET, and #DESTINATION_TABLE to match your configuration.

  3. Run the pipeline to nest recording rows within artist rows:

     ./run-nested
    

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Deleting the project

The easiest way to delete all resources is simply to delete the project you created for this tutorial. If you don't want to delete the project, follow the instructions in the following section.

  1. In the Cloud Platform Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Deleting individual resources

Follow these steps to delete individual resources, instead of deleting the whole project.

Deleting the storage bucket

  1. In the Cloud Platform Console, go to the Cloud Storage browser.

    Go to the Cloud Storage browser

  2. Click the checkbox next to the bucket you want to delete.
  3. Click the Delete button at the top of the page to delete the bucket.

Deleting the BigQuery datasets

  1. Open the BigQuery web UI.

    OPEN BIGQUERY

  2. Select the BigQuery dataset(s) you created during the tutorial.

  3. Click Delete.

What's next

Next, consider how you can apply what you’ve learned to loading your data into BigQuery for warehousing and analysis.

  • Visit the BigQuery and Cloud Dataflow documentation for in-depth information about how to get the most out of each product.
  • Learn more about writing queries for BigQuery. Querying Data in the BigQuery documentation explains how to run synchronous and asynchronous queries, create user-defined functions (UDFs), and more.
  • Explore BigQuery syntax. BigQuery uses a SQL-like syntax that is described in the Query Reference (legacy SQL).
  • Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Send feedback about...