Performing ETL from a relational database into BigQuery using Cloud Dataflow

This tutorial demonstrates how to use Cloud Dataflow to extract, transform, and load (ETL) data from an online transaction processing (OLTP) relational database into BigQuery for analysis.

This tutorial is intended for database admins, operations professionals, and cloud architects interested in taking advantage of the analytical query capabilities of BigQuery and the batch processing capabilities of Cloud Dataflow.

OLTP databases are often relational databases that store information and process transactions for ecommerce sites, software as a service (SaaS) applications, or games. OLTP databases are usually optimized for transactions, which require the ACID properties: atomicity, consistency, isolation, and durability, and typically have highly normalized schemas. In contrast, data warehouses tend to be optimized for data retrieval and analysis, rather than transactions, and typically feature denormalized schemas. Generally, denormalizing data from an OLTP database makes it more useful for analysis in BigQuery.

Objectives

The tutorial shows two approaches to ETL normalized RDBMS data into denormalized BigQuery data:

  • Using BigQuery to load and transform the data. Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you automate larger or multiple datasets.
  • Using Cloud Dataflow to load, transform, and cleanse the data. Use this approach to load a larger amount of data, load data from multiple data sources, or to load data incrementally or automatically.

Costs

This tutorial uses the following billable components of Google Cloud Platform:

You can use the pricing calculator to generate a cost estimate based on your projected usage. New GCP users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Cleaning up.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the project selector page

  3. Make sure that billing is enabled for your Google Cloud Platform project.

    Learn how to enable billing

  4. Enable the Compute Engine and Cloud Dataflow APIs.

    Enable the APIs

  5. Install and initialize the Cloud SDK.

Using the MusicBrainz dataset

This tutorial relies on JSON snapshots of tables in the MusicBrainz database, which is built on PostgreSQL and contains information about all of the MusicBrainz music. Some elements of the MusicBrainz schema include:

  • Artists
  • Release groups
  • Releases
  • Recordings
  • Works
  • Labels
  • Many of the relationships between these entities.

The MusicBrainz schema includes three relevant tables: artist, recording, and artist_credit_name. An artist_credit represents credit given to the artist for a recording, and the artist_credit_name rows link the recording with its corresponding artist through the artist_credit value.

This tutorial provides the PostgreSQL tables already extracted into JSON format. To perform this step yourself, you can use the following sample code:

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json # clean up extra '\' characters

Approach 1: ETL with BigQuery

Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you use automation with larger or multiple datasets.

Create a BigQuery dataset

The following diagram illustrates the steps you take to create a BigQuery dataset.

Steps to create a BigQuery dataset.

You load the MusicBrainz tables into BigQuery individually, and then you join the tables you loaded so that each row contains the data linkage you want. You store the join results in a new BigQuery table. Then you can delete the original tables that you loaded.

  1. In the GCP Console, open BigQuery.

    OPEN BIGQUERY

  2. Under Resources, click the name of your project.

  3. In the left nav, click + Add Data.

  4. In the Create Dataset dialog, complete the following steps:

    1. In the Dataset ID field, enter musicbrainz.
    2. Leave Data Location as Default.
  5. Click Create Dataset.

Import MusicBrainz tables

For each MusicBrainz table, perform the following steps to add a table to the dataset you created:

  1. In the GCP Console, click the dataset name, and then click +Create Table.
  2. In the Create Table dialog, complete the following steps, and then click Create Table:

    1. Under Source, in the Create table from drop-down list, select Google Cloud Storage.
    2. In the Select file from Cloud Storage bucket field, enter the URL for the data file, gs://solutions-public-assets/bqetl/artist.json.
    3. For File format, select JSON (Newline Delimited).
    4. For Table name, enter the table name, artist.
    5. For Table type, leave Native table selected.
    6. Below the Schema section, click to turn on Edit as Text.
    7. Download the artist schema file.
    8. Replace the contents of the Schema section with the contents of the schema file you downloaded.

    Create table dialog with updated schema from downloaded JSON file.

  3. Wait a few moments for the load job to complete. To monitor the job, click Job History.

    When the load has finished, the new table appears under the dataset.

  4. Repeat steps 1 - 3 for the artist_credit_name table with the following changes:

  5. Repeat steps 1 - 3 for the recording table with the following changes:

Manually denormalize the data

To denormalize the data, join the data into a new BigQuery table that has one row for each artist's recording, together with selected metadata you want retained for analysis.

  1. In the GCP Console, copy the following query and paste it into the Query Editor:

    SELECT artist.id, artist.gid as artist_gid,
           artist.name as artist_name, artist.area,
           recording.name as recording_name, recording.length,
           recording.gid as recording_gid, recording.video
      FROM `[PROJECT_ID].[DATASET].artist` as artist
          INNER JOIN `[PROJECT_ID].[DATASET].artist_credit_name` AS artist_credit_name
               ON artist.id = artist_credit_name.artist
          INNER JOIN `[PROJECT_ID].[DATASET].recording` AS recording
               ON artist_credit_name.artist_credit = recording.artist_credit
    

    Replace [DATASET] with the name of the dataset you previously created, for example, musicbrainz, and [PROJECT_ID] with your GCP project ID.

  2. Click the More drop-down list, and then select Query settings.

  3. In the Query settings card, do the following:

    1. Select Set a destination table for query results checkbox.
    2. In Table name enter recordings_by_artists_manual.
    3. For Destination table write preference, click Overwrite table.
    4. Select the Allow Large Results (no size limit) checkbox.
    5. Leave Job Priority as default Interactive.
    6. Leave SQL Dialect as default Standard.
    7. Click Save.
  4. Click Run.

    When the query is complete, the data from the query result is organized into songs for each artist in the newly created BigQuery table.

    Query settings for destination table.

Approach 2: ETL into BigQuery with Cloud Dataflow

In this section of the tutorial, instead of using the BigQuery UI, you use a sample program to load data into BigQuery by using a Cloud Dataflow pipeline. Then, you use the Cloud Dataflow programming model to denormalize and cleanse data to load into BigQuery.

Before you begin, review the concepts and the sample code.

Review the concepts

Although the data is small and can quickly be uploaded by using the BigQuery UI, for the purpose of this tutorial you can also use Cloud Dataflow for ETL. Use Cloud Dataflow for ETL into BigQuery instead of the BigQuery UI when you are performing massive joins, that is, from around 500-5000 columns of more than 10 TB of data, with the following goals:

  • You want to clean or transform your data as it's loaded into BigQuery, instead of storing it and joining afterwards. As a result, this approach also has lower storage requirements because data is only stored in BiqQuery in it's joined and transformed state.
  • You plan to do custom data cleansing (which cannot be simply achieved with SQL).
  • You plan to combine the data with data outside of the OLTP, such as logs or remotely accessed data, during the loading process.
  • You plan to automate testing and deployment of data-loading logic using continuous integration or continuous deployment (CI/CD).
  • You anticipate gradual iteration, enhancement, and improvement of the ETL process over time.
  • You plan to add data incrementally, as opposed to performing a one-time ETL.

Here's a diagram of the data pipeline that's created by the sample program:

Data pipeline using BigQuery.

In the example code, many of the pipeline steps are grouped or wrapped in convenience methods, given descriptive names, and reused. In the diagram, reused steps are indicated by dashed borders.

Review the pipeline code

The code creates a pipeline that performs the following steps:

  1. Loads each table that you want to be part of the join into a PCollection of strings. Each element comprises the JSON representation of a row of the table.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Converts those JSON strings to object representations, MusicBrainzDataObject objects, and then organize the object representations by one of the column values, such as a primary or foreign key.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name,
                        MapElements
                          .into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
                          .via( (String input) -> {
                                MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                                Long key = (Long) datum.getColumnValue(namespacedKeyname);
                                return KV.of(key, datum);
                                })
             );
    }
  3. Joins the list based on common artist. The artist_credit_name links an artist credit with its recording and includes the artist foreign key. The artist_credit_name table is loaded as a list of key value KV objects. The K member is the artist.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. Joins the list by using the MusicBrainzTransforms.innerJoin() method.

    public static PCollection<MusicBrainzDataObject>
                         innerJoin(String name,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table1,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(){};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(){};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Groups the collections of KV objects by the key member on which you want to join. This results in a PCollection of KV objects with a long key (the artist.id column value) and resulting CoGbkResult (which stands for combine group by key result). The CoGbkResult object is a tuple of lists of objects with the key value in common from the first and second PCollections. This tuple is addressable by using the tuple tag formulated for each PCollection prior to running the CoGroupByKey operation in the group method.
    2. Merges each matchup of objects into a MusicBrainzDataObject object that represents a join result.

          PCollection<List<MusicBrainzDataObject>> mergedResult =
              joinedResult.apply("merge join results",
                           MapElements
                         .into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                         .via( ( KV<Long, CoGbkResult> group ) -> {
                             List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
                             Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                             Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                             leftObjects.forEach((MusicBrainzDataObject l) -> {
                               rightObjects.forEach((MusicBrainzDataObject r) -> {
                                 result.add(l.duplicate().merge(r));
                               });
                             });
                             return result;
                           }
                         )
      );
    3. Reorganizes the collection into a list of KV objects to begin the next join. This time, the K value is the artist_credit column, which is used to join with the recording table.

      PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. Obtains the final resulting collection of MusicBrainzDataObject objects by joining that result with the loaded collection of recordings that are organized by artist_credit.id.

      PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
         artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. Maps the resulting MusicBrainzDataObjects objects into TableRows.

      PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Writes the resulting TableRows into BigQuery.

      tableRows.apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
          .to(BQETLOptions.getBigQueryTablename())
          .withSchema(bqTableSchema)
          .withCustomGcsTempLocation(StaticValueProvider.of(BQETLOptions.getTempLocation() ))
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

For details about the mechanics of Cloud Dataflow pipeline programming, review the following topics about the programming model:

After you review the steps that the code performs, you can run the pipeline.

Run the pipeline code

  1. In the GCP Console, open Cloud Shell.

    Open Cloud Shell

  2. Set the environment variables for your project:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    

    Replace [PROJECT_ID] with the project ID of your GCP project, and replace [CHOOSE_AN_APPROPRIATE_ZONE] with a GCP zone.

  3. Set environment variables that are used by the pipeline script:

    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export DATASET=musicbrainz
    export SERVICE_ACCOUNT=project-owner
    
  4. Make sure that gcloud is using the project you created or selected at the beginning of the tutorial:

    gcloud config set project $PROJECT_ID
    
  5. Create a service account to run the pipeline:

    gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
        --display-name "Project Owner Account"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
        --role roles/owner
    gcloud iam service-accounts keys create \
        ~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json \
        --iam-account ${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
    

    This command downloads a JSON file that contains your service account key. Store this file in a secure location.

  6. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the filepath of the JSON file that contains your service account key:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  7. Clone the repository that contains the Cloud Dataflow code:

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  8. Change directory to the sample:

    cd bigquery-etl-dataflow-sample
    
  9. Create a staging bucket in Cloud Storage, because Cloud Dataflow jobs require a bucket in Cloud Storage for staging the binary files used to run the pipeline.

    gsutil mb gs://$STAGING_BUCKET
    
  10. Set the object lifecycle for the [STAGING_BUCKET_NAME] to the one in the dataflow-staging-policy.json file:

    gsutil lifecycle set dataflow-staging-policy.json gs://$STAGING_BUCKET
    
  11. Run the Cloud Dataflow job:

    ./run.sh simple
    
  12. To see the progress of the pipeline, in the GCP Console, go to the Dataflow page.

    Go to the Dataflow page

    The status of the jobs is shown in the status column. A status of Succeeded indicates that the job is complete.

  13. (Optional) To see the job graph and details about the steps, click the job name, for example, etl-into-bigquery-bqetlsimple.

  14. In the GCP Console, go to the BigQuery page.

    Go to BigQuery page

    Make sure your GCP project is selected.

  15. To run a query on the new table, in the Query editor pane, enter the following:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Query editor updated with query for new table.

Cleanse the data

Next, you make a slight change to the Cloud Dataflow pipeline so that you can load lookup tables and process them as side inputs, as shown in the following diagram.

Cloud Dataflow pipeline updated for side inputs.

When you query the resulting BigQuery table, it's difficult to surmise from where the artist originates without manually looking up the area numeric ID from the area table in the MusicBrainz database. This makes analyzing query results less straightforward than it could be.

Similarly, artist genders are shown as IDs, but the entire MusicBrainz gender table consists of only three rows. To fix this, you can add a step in the Cloud Dataflow pipeline to use the MusicBrainz area and gender tables to map the IDs to their proper labels.

Both artist_area and artist_gender tables contain a significantly smaller number of rows than artists or recording data table. The number of elements in the later tables are constrained by the number of geographic areas or genders respectively.

As a result, the lookup step uses the Cloud Dataflow feature called side input.

Side inputs are loaded as table exports of line-delimited JSON, and are used to denormalize the table data in a single step.

Review the code that adds side inputs to the pipeline

Before running the pipeline, review the code to get a better understanding of the new steps.

In the BQETLSimple.java file, review the commented out lines. These are uncommented in a following step.

//PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//        MusicBrainzTransforms.lookup("gender","id","name","gender"));

PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

This code demonstrates data cleansing with side inputs. The MusicBrainzTransforms class provides some added convenience for using side inputs to map foreign key values to labels. The MusicBrainzTransforms library provides a method that creates an internal lookup class. The lookup class describes each lookup table and the fields that are replaced with labels and variable length arguments. keyKey is the name of the column that contains the key for the lookup and valueKey is the name of the column that contains the corresponding label.

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Each side input is loaded as a single map object, which is used to look up the corresponding label for an ID.

First, the JSON for the lookup table is initially loaded into MusicBrainzDataObjects with an empty namespace and turned into a map from the Key column value to the Value column value.

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(
        "sideInput_" + name,
        MapElements
          .into(new TypeDescriptor<KV<Long, String>>() {})
          .via((String input) -> {
                 MusicBrainzDataObject object = JSONReader.readObject(name, input);
                 Long key = (Long) object.getColumnValue(keyKeyName);

                 String value = (String) object.getColumnValue(valueKeyName);
                 return KV.of(key, value);
               })
        );

  return entries.apply(View.<Long, String>asMap());
}

Each of these Map objects are put into a Map by the value of its destinationKey, which is the key to replace with the looked up values.

List<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>> mapSideInputs = new ArrayList<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
                            .map( destinationKey -> name + "_" + destinationKey )
                            .collect(Collectors.toList());

    mapSideInputs.add(new SimpleEntry(destKeyList, mapView));

}

Then, while transforming the artist objects from JSON, the value of the destinationKey (which starts out as a number) is replaced with its label.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach( ( String key ) -> {
  Long id = (Long) result.getColumnValue(key);
  if (id != null) {
    String label = (String) sideInputMap.get(id);
    if (label == null) {
      label = "" + id;
    }
    result.replace(key, label);

To modify the BQETLSimple.java, to use lookups to decode the data for the artist_area and artist_gender fields, complete the following steps:

  1. Slightly change the program flow:

    1. Uncomment the lines that load the artist data by using the lookups.
    2. Comment out the call to loadTable that loads the artist data without the lookups.
    //PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
    //        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
    //        MusicBrainzTransforms.lookup("gender","id","name","gender"));
    
    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");
  2. Change the TableFieldSchemas for artist_area and artist_gender to be of data type string instead of int by commenting out the corresponding int fields and uncommenting the corresponding string fields.

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")
    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")
    /*Switch these two lines when using mapping table for artist_begin_area */
            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")
  3. To rerun the pipeline code, complete the following steps:

    1. Set the environment variables for your project:

      export PROJECT_ID=[PROJECT_ID]
      export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
      
    2. Make sure the environment is set up:

      export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
      export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
      export DATASET=musicbrainz
      export SERVICE_ACCOUNT=project-owner
      
    3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the file path of the JSON file that contains your service account key.

      export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
      
    4. Run the pipeline to nest recording rows within artist rows:

      ./run.sh simple
      
  4. Perform the same query that includes artist_area and artist_gender:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    In the output, the artist_area and artist_gender are now decoded:

    Output decoded by `artist_area` and `artist_gender`.

Optimize the BigQuery schema

In the final part of this tutorial, you run a pipeline that generates a more optimal table schema using nested fields.

Take a moment to review the code that is used to generate this optimized version of the table.

The following diagram shows a slightly different Cloud Dataflow pipeline that nests the artist's recordings within each artist row, rather than creating duplicate artist rows.

Cloud Dataflow pipeline that nests the artist's recordings within each artist row.

The current representation of the data is fairly flat. That is, it includes one row per credited recording that includes all the artist's metadata from the BigQuery schema, and all the recording and artist_credit_name metadata. This flat representation has at least two drawbacks:

  • It repeats the artist metadata for every recording credited to an artist, which in turn increases the storage required.
  • When you export the data as JSON, it exports an array that repeats that data, instead of an artist with the nested recording data — which is probably what you want.

Without any performance penalty and without using additional storage, instead of storing one recording per row, you can store recordings as a repeated field in each artist record by making some changes to the Cloud Dataflow pipeline.

Instead of joining the recordings with their artist information by artist_credit_name.artist, this alternate pipeline creates a nested list of recordings within an artist object.

public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                      PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                      String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(){};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(){};

  PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply("merge join results " + nestingKey,
                            MapElements
                             .into(new TypeDescriptor<MusicBrainzDataObject>() {})
                             .via((KV<Long, CoGbkResult> group) -> {
                                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                                List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
                                children.forEach(childList::add);
                                parentObject = parentObject.duplicate();
                                parentObject.addColumnValue("recordings", childList);
                                return parentObject;
                              })
                         );
}

TableRow has size limits in the BigQuery API, so the code limits the number of nested recordings for a given record to 1000 elements. If a given artist has more than 1000 recordings, the code duplicates the row, including the artist metadata, and continues nesting the recording data in the duplicate row.

private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<TableRow>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }

    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
      if (nestedList.size() > 0) {
        if (parentRow.get(key) == null) {
          parentRow.set(key, new ArrayList<TableRow>());
        }
        List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
        childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
      }
    });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

The diagram shows the sources, transformations, and sinks of the pipeline.

Optimized pipeline with sources, transformations, and sinks.

In most cases, the step names are supplied in code as part of the apply method call.

To create this optimized pipeline, complete the following steps:

  1. In Cloud Shell, ensure the environment is set up for the pipeline script:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export SERVICE_ACCOUNT=project-owner
    
  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the file path of the JSON file that contains your service account key:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  3. Run the pipeline to nest recording rows within artist rows:

    ./run.sh nested
    
  4. Query fields from the nested table in BigQuery:

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Query results of nested table.

  5. Run a query to extract values from the STRUCT and use those values to filter the results:

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    Query to filter the results.

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

  1. In the GCP Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Deleting individual resources

Follow these steps to delete individual resources, instead of deleting the whole project.

Deleting the Cloud Storage bucket

  1. In the GCP Console, go to the Cloud Storage Browser page.

    Go to the Cloud Storage Browser page

  2. Click the checkbox for the bucket you want to delete.
  3. Click Delete to delete the bucket.

Deleting the BigQuery datasets

  1. Open the BigQuery web UI.

    Open BIGQUERY

  2. Select the BigQuery datasets you created during the tutorial.

  3. Click Delete.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...