Performing ETL from a relational database into BigQuery using Dataflow

Last reviewed 2022-08-21 UTC

This tutorial demonstrates how to use Dataflow to extract, transform, and load (ETL) data from an online transaction processing (OLTP) relational database into BigQuery for analysis.

This tutorial is intended for database admins, operations professionals, and cloud architects interested in taking advantage of the analytical query capabilities of BigQuery and the batch processing capabilities of Dataflow.

OLTP databases are often relational databases that store information and process transactions for ecommerce sites, software as a service (SaaS) applications, or games. OLTP databases are usually optimized for transactions, which require the ACID properties: atomicity, consistency, isolation, and durability, and typically have highly normalized schemas. In contrast, data warehouses tend to be optimized for data retrieval and analysis, rather than transactions, and typically feature denormalized schemas. Generally, denormalizing data from an OLTP database makes it more useful for analysis in BigQuery.

Objectives

The tutorial shows two approaches to ETL normalized RDBMS data into denormalized BigQuery data:

  • Using BigQuery to load and transform the data. Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you automate larger or multiple datasets.
  • Using Dataflow to load, transform, and cleanse the data. Use this approach to load a larger amount of data, load data from multiple data sources, or to load data incrementally or automatically.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Dataflow APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Dataflow APIs.

    Enable the APIs

Using the MusicBrainz dataset

This tutorial relies on JSON snapshots of tables in the MusicBrainz database, which is built on PostgreSQL and contains information about all of the MusicBrainz music. Some elements of the MusicBrainz schema include:

  • Artists
  • Release groups
  • Releases
  • Recordings
  • Works
  • Labels
  • Many of the relationships between these entities.

The MusicBrainz schema includes three relevant tables: artist, recording, and artist_credit_name. An artist_credit represents credit given to the artist for a recording, and the artist_credit_name rows link the recording with its corresponding artist through the artist_credit value.

This tutorial provides the PostgreSQL tables already extracted into newline-delimited JSON format and stored in a public Cloud Storage bucket: gs://solutions-public-assets/bqetl

If you want to perform this step yourself, you need to have a PostgreSQL database containing the MusicBrainz dataset, and use the following commands to export each of the tables:

host=POSTGRES_HOST
user=POSTGRES_USER
database=POSTGRES_DATABASE

for table in artist recording artist_credit_name
do
    pg_cmd="\\copy (select row_to_json(r) from (select * from ${table}) r ) to exported_${table}.json"
    psql -w -h ${host} -U ${user} -d ${db} -c $pg_cmd
    # clean up extra '\' characters
    sed -i -e 's/\\\\/\\/g' exported_${table}.json 
done

Approach 1: ETL with BigQuery

Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you use automation with larger or multiple datasets.

Create a BigQuery dataset

To create a BigQuery dataset, you load the MusicBrainz tables into BigQuery individually, and then you join the tables that you loaded so that each row contains the data linkage that you want. You store the join results in a new BigQuery table. Then you can delete the original tables that you loaded.

  1. In the Google Cloud console, open BigQuery.

    OPEN BIGQUERY

  2. In the Explorer panel, click the menu next to your project name, and then click Create data set.

  3. In the Create data set dialog, complete the following steps:

    1. In the Data set ID field, enter musicbrainz.
    2. Set the Data Location to us.
    3. Click Create data set.

Import MusicBrainz tables

For each MusicBrainz table, perform the following steps to add a table to the dataset you created:

  1. In the Google Cloud console BigQuery Explorer panel, expand the row with your project name to show the newly created musicbrainz dataset.
  2. Click the menu next to your musicbrainz dataset, and then click Create Table.
  3. In the Create Table dialog, complete the following steps:

    1. In the Create table from drop-down list, select Google Cloud Storage.
    2. In the Select file from GCS bucket field, enter the path to the data file:

      solutions-public-assets/bqetl/artist.json
      
    3. For File format, select JSONL (Newline Delimited JSON).

    4. Ensure that Project contains your project name.

    5. Ensure that Data set is musicbrainz.

    6. For Table, enter the table name, artist.

    7. For Table type, leave Native table selected.

    8. Below the Schema section, click to turn on Edit as Text.

    9. Download the artist schema file and open it in a text editor or viewer.

    10. Replace the contents of the Schema section with the contents of the schema file you downloaded.

    11. Click Create Table:

  4. Wait a few moments for the load job to complete.

  5. When the load has finished, the new table appears under the dataset.

  6. Repeat steps 1 - 5 to create the artist_credit_name table with the following changes:

    • Use the following path for the source data file:

      solutions-public-assets/bqetl/artist_credit_name.json
      
    • Use artist_credit_name as the Table name.

    • Download the artist_credit_name schema file and use the contents for the schema.

  7. Repeat steps 1 - 5 to create the recording table with the following changes:

    • Use the following path for the source data file:

      solutions-public-assets/bqetl/recording.json
      
    • Use recording as the Table name.

    • Download the recording schema file. and use the contents for the schema.

Manually denormalize the data

To denormalize the data, join the data into a new BigQuery table that has one row for each artist's recording, together with selected metadata you want retained for analysis.

  1. If the BigQuery query editor is not open in the Google Cloud console click Compose New Query.
  2. Copy the following query and paste it into the Query Editor:

    SELECT
        artist.id,
        artist.gid AS artist_gid,
        artist.name AS artist_name,
        artist.area,
        recording.name AS recording_name,
        recording.length,
        recording.gid AS recording_gid,
        recording.video
    FROM
        `musicbrainz.artist` AS artist
    INNER JOIN
        `musicbrainz.artist_credit_name` AS artist_credit_name
    ON
        artist.id = artist_credit_name.artist
    INNER JOIN
        `musicbrainz.recording` AS recording
    ON
        artist_credit_name.artist_credit = recording.artist_credit
    
  3. Click the More drop-down list, and then select Query settings.

  4. In the Query settings dialog, complete the following steps:

    1. Select Set a destination table for query results.
    2. In Dataset, enter musicbrainz and select the dataset in your project.
    3. In Table id enter recordings_by_artists_manual.
    4. For Destination table write preference, click Overwrite table.
    5. Select the Allow Large Results (no size limit) checkbox.
    6. Click Save.
  5. Click Run.

    When the query is complete, the data from the query result is organized into songs for each artist in the newly created BigQuery table, and a sample of the results shown in the Query Results pane, for example:

    Row id artist_gid artist_name area recording_name length recording_gid video
    1 97546 125ec42a... unknown 240 Horo Gun Toireamaid Hùgan Fhathast Air 174106 c8bbe048... FALSE
    2 266317 2e7119b5... Capella Istropolitana 189 Concerto Grosso in D minor, op. 2 no. 3: II. Adagio 134000 af0f294d... FALSE
    3 628060 34cd3689... Conspirare 5196 Liturgy, op. 42: 9. Praise the Lord from the Heavens 126933 8bab920d... FALSE
    4 423877 54401795... Boys Air Choir 1178 Nunc Dimittis 190000 111611eb... FALSE
    5 394456 9914f9f9... L’Orchestre de la Suisse Romande 23036 Concert Waltz no. 2, op. 51 509960 b16742d1... FALSE

Approach 2: ETL into BigQuery with Dataflow

In this section of the tutorial, instead of using the BigQuery UI, you use a sample program to load data into BigQuery by using a Dataflow pipeline. Then, you use the Beam programming model to denormalize and cleanse data to load into BigQuery.

Before you begin, review the concepts and the sample code.

Review the concepts

Although the data is small and can quickly be uploaded by using the BigQuery UI, for the purpose of this tutorial you can also use Dataflow for ETL. Use Dataflow for ETL into BigQuery instead of the BigQuery UI when you are performing massive joins, that is, from around 500-5000 columns of more than 10 TB of data, with the following goals:

  • You want to clean or transform your data as it's loaded into BigQuery, instead of storing it and joining afterwards. As a result, this approach also has lower storage requirements because data is only stored in BigQuery in its joined and transformed state.
  • You plan to do custom data cleansing (which cannot be simply achieved with SQL).
  • You plan to combine the data with data outside of the OLTP, such as logs or remotely accessed data, during the loading process.
  • You plan to automate testing and deployment of data-loading logic using continuous integration or continuous deployment (CI/CD).
  • You anticipate gradual iteration, enhancement, and improvement of the ETL process over time.
  • You plan to add data incrementally, as opposed to performing a one-time ETL.

Here's a diagram of the data pipeline that's created by the sample program:

Data pipeline using BigQuery.

In the example code, many of the pipeline steps are grouped or wrapped in convenience methods, given descriptive names, and reused. In the diagram, reused steps are indicated by dashed borders.

Review the pipeline code

The code creates a pipeline that performs the following steps:

  1. Loads each table that you want to be part of the join from the public Cloud Storage bucket into a PCollection of strings. Each element comprises the JSON representation of a row of the table.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Converts those JSON strings to object representations, MusicBrainzDataObject objects, and then organize the object representations by one of the column values, such as a primary or foreign key.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(
        PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply(
          "load " + name,
          MapElements.into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                    Long key = (Long) datum.getColumnValue(namespacedKeyname);
                    return KV.of(key, datum);
                  }));
    }
  3. Joins the list based on common artist. The artist_credit_name links an artist credit with its recording and includes the artist foreign key. The artist_credit_name table is loaded as a list of key value KV objects. The K member is the artist.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. Joins the list by using the MusicBrainzTransforms.innerJoin() method.

    public static PCollection<MusicBrainzDataObject> innerJoin(
        String name,
        PCollection<KV<Long, MusicBrainzDataObject>> table1,
        PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>() {};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>() {};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Groups the collections of KV objects by the key member on which you want to join. This results in a PCollection of KV objects with a long key (the artist.id column value) and resulting CoGbkResult (which stands for combine group by key result). The CoGbkResult object is a tuple of lists of objects with the key value in common from the first and second PCollections. This tuple is addressable by using the tuple tag formulated for each PCollection prior to running the CoGroupByKey operation in the group method.
    2. Merges each matchup of objects into a MusicBrainzDataObject object that represents a join result.

      PCollection<List<MusicBrainzDataObject>> mergedResult =
          joinedResult.apply(
              "merge join results",
              MapElements.into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                  .via(
                      (KV<Long, CoGbkResult> group) -> {
                        List<MusicBrainzDataObject> result = new ArrayList<>();
                        Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                        Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                        leftObjects.forEach(
                            (MusicBrainzDataObject l) ->
                                rightObjects.forEach(
                                    (MusicBrainzDataObject r) -> result.add(l.duplicate().merge(r))));
                        return result;
                      }));
    3. Reorganizes the collection into a list of KV objects to begin the next join. This time, the K value is the artist_credit column, which is used to join with the recording table.

      PCollection<KV<Long, MusicBrainzDataObject>> artistCreditNamesByArtistCredit =
          MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. Obtains the final resulting collection of MusicBrainzDataObject objects by joining that result with the loaded collection of recordings that are organized by artist_credit.id.

      PCollection<MusicBrainzDataObject> artistRecordings =
          MusicBrainzTransforms.innerJoin(
              "joined recordings", artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. Maps the resulting MusicBrainzDataObjects objects into TableRows.

      PCollection<TableRow> tableRows =
          MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Writes the resulting TableRows into BigQuery.

      tableRows.apply(
          "Write to BigQuery",
          BigQueryIO.writeTableRows()
              .to(options.getBigQueryTablename())
              .withSchema(bqTableSchema)
              .withCustomGcsTempLocation(StaticValueProvider.of(options.getTempLocation()))
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

For details about the mechanics of Beam pipeline programming, review the following topics about the programming model:

After you review the steps that the code performs, you can run the pipeline.

Create a cloud storage bucket

Run the pipeline code

  1. In the Google Cloud console, open Cloud Shell.

    Open Cloud Shell

  2. Set the environment variables for your project and pipeline script

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export DATASET=musicbrainz
    

    Replace PROJECT_ID with the project ID of your Google Cloud project.

  3. Make sure that gcloud is using the project you created or selected at the beginning of the tutorial:

    gcloud config set project $PROJECT_ID
    
  4. Following the security principle of least privilege, create a service account for the Dataflow pipeline and grant it only the necessary privileges: the roles/dataflow.worker, roles/bigquery.jobUser, and the dataEditor role on the musicbrainz dataset:

    gcloud iam service-accounts create musicbrainz-dataflow
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/dataflow.worker
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/bigquery.jobUser
    bq query  --use_legacy_sql=false \
        "GRANT \`roles/bigquery.dataEditor\` ON SCHEMA musicbrainz 
         TO 'serviceAccount:${SERVICE_ACCOUNT}'"
    
  5. Create a bucket for the Dataflow pipeline to use for temporary files, and grant the musicbrainz-dataflow service account Owner privileges to it:

    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    gsutil mb -l us ${DATAFLOW_TEMP_BUCKET}
    gsutil acl ch -u ${SERVICE_ACCOUNT}:O ${DATAFLOW_TEMP_BUCKET}
    
  6. Clone the repository that contains the Dataflow code:

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  7. Change directory to the sample:

    cd bigquery-etl-dataflow-sample
    
  8. Compile and run the Dataflow job:

    ./run.sh simple
    

    The job should take about 10 minutes to run.

  9. To see the progress of the pipeline, in the Google Cloud console, go to the Dataflow page.

    Go to Dataflow

    The status of the jobs is shown in the status column. A status of Succeeded indicates that the job is complete.

  10. (Optional) To see the job graph and details about the steps, click the job name, for example, etl-into-bigquery-bqetlsimple.

  11. When the job has completed, go to the BigQuery page.

    Go to BigQuery

  12. To run a query on the new table, in the Query editor pane, enter the following:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    The result pane will show a set of results similar to the following:

    Row artist_name artist_gender artist_area recording_name recording_length
    1 mirin 2 107 Sylphia 264000
    2 mirin 2 107 Dependence 208000
    3 Gaudiburschen 1 81 Die Hände zum Himmel 210000
    4 Sa4 1 331 Ein Tag aus meiner Sicht 221000
    5 Dpat 1 7326 Cutthroat 249000
    6 Dpat 1 7326 Deloused 178000

    The actual output may differ as the results are not ordered.

Cleanse the data

Next, you make a slight change to the Dataflow pipeline so that you can load lookup tables and process them as side inputs, as shown in the following diagram.

Dataflow pipeline updated for side inputs.

When you query the resulting BigQuery table, it's difficult to determine from where the artist originates without manually looking up the area numeric ID from the area table in the MusicBrainz database. This makes analyzing query results less straightforward than it could be.

Similarly, artist genders are shown as IDs, but the entire MusicBrainz gender table consists of only three rows. To fix this, you can add a step in the Dataflow pipeline to use the MusicBrainz area and gender tables to map the IDs to their proper labels.

Both artist_area and artist_gender tables contain a significantly smaller number of rows than artists or recording data table. The number of elements in the later tables are constrained by the number of geographic areas or genders respectively.

As a result, the lookup step uses the Dataflow feature called side input.

Side inputs are loaded as table exports of line-delimited JSON files in the public Cloud Storage bucket containing the musicbrainz dataset, and are used to denormalize the table data in a single step.

Review the code that adds side inputs to the pipeline

Before running the pipeline, review the code to get a better understanding of the new steps.

This code demonstrates data cleansing with side inputs. The MusicBrainzTransforms class provides some added convenience for using side inputs to map foreign key values to labels. The MusicBrainzTransforms library provides a method that creates an internal lookup class. The lookup class describes each lookup table and the fields that are replaced with labels and variable length arguments. keyKey is the name of the column that contains the key for the lookup and valueKey is the name of the column that contains the corresponding label.

public static LookupDescription lookup(
    String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Each side input is loaded as a single map object, which is used to look up the corresponding label for an ID.

First, the JSON for the lookup table is initially loaded into MusicBrainzDataObjects with an empty namespace and turned into a map from the Key column value to the Value column value.

public static PCollectionView<Map<Long, String>> loadMapFromText(
    PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries =
      text.apply(
          "sideInput_" + name,
          MapElements.into(new TypeDescriptor<KV<Long, String>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject object = JSONReader.readObject(name, input);
                    Long key = (Long) object.getColumnValue(keyKeyName);

                    String value = (String) object.getColumnValue(valueKeyName);
                    return KV.of(key, value);
                  }));

  return entries.apply(View.asMap());
}

Each of these Map objects are put into a Map by the value of its destinationKey, which is the key to replace with the looked up values.

List<SimpleEntry<List<String>, PCollectionView<Map<Long, String>>>> mapSideInputs =
    new ArrayList<>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView =
      loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
          .map(destinationKey -> name + "_" + destinationKey)
          .collect(Collectors.toList());

  mapSideInputs.add(new SimpleEntry<>(destKeyList, mapView));
}

Then, while transforming the artist objects from JSON, the value of the destinationKey (which starts out as a number) is replaced with its label.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach(
    (String key) -> {
      Long id = (Long) result.getColumnValue(key);
      if (id != null) {
        String label = sideInputMap.get(id);
        if (label == null) {
          label = "" + id;
        }
        result.replace(key, label);

To add the decoding of the artist_area and artist_gender fields, complete the following steps:

  1. In Cloud Shell, ensure the environment is set up for the pipeline script:

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    

    Replace PROJECT_ID with the project ID of your Google Cloud project.

  2. Run the pipeline to create the table with decoded area and artist gender:

    ./run.sh simple-with-lookups
    
  3. As before, to see the progress of the pipeline, go to the Dataflow page.

    Go to Dataflow

    The pipeline will take approx 10 minutes to complete.

  4. When the job has completed, go to the BigQuery page.

    Go to BigQuery

  5. Perform the same query that includes artist_area and artist_gender:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
      FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
     WHERE artist_area is NOT NULL
       AND artist_gender IS NOT NULL
     LIMIT 1000;
    

    In the output, the artist_area and artist_gender are now decoded:

    Row artist_name artist_gender artist_area recording_name recording_length
    1 mirin Female Japan Sylphia 264000
    2 mirin Female Japan Dependence 208000
    3 Gaudiburschen Male Germany Die Hände zum Himmel 210000
    4 Sa4 Male Hamburg Ein Tag aus meiner Sicht 221000
    5 Dpat Male Houston Cutthroat 249000
    6 Dpat Male Houston Deloused 178000

    The actual output may differ, because the results are not ordered.

Optimize the BigQuery schema

In the final part of this tutorial, you run a pipeline that generates a more optimal table schema using nested fields.

Take a moment to review the code that is used to generate this optimized version of the table.

The following diagram shows a slightly different Dataflow pipeline that nests the artist's recordings within each artist row, rather than creating duplicate artist rows.

Dataflow pipeline that nests the artist's recordings within each artist row.

The current representation of the data is fairly flat. That is, it includes one row per credited recording that includes all the artist's metadata from the BigQuery schema, and all the recording and artist_credit_name metadata. This flat representation has at least two drawbacks:

  • It repeats the artist metadata for every recording credited to an artist, which in turn increases the storage required.
  • When you export the data as JSON, it exports an array that repeats that data, instead of an artist with the nested recording data — which is probably what you want.

Without any performance penalty and without using additional storage, instead of storing one recording per row, you can store recordings as a repeated field in each artist record by making some changes to the Dataflow pipeline.

Instead of joining the recordings with their artist information by artist_credit_name.artist, this alternate pipeline creates a nested list of recordings within an artist object.

public static PCollection<MusicBrainzDataObject> nest(
    PCollection<KV<Long, MusicBrainzDataObject>> parent,
    PCollection<KV<Long, MusicBrainzDataObject>> child,
    String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>() {};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>() {};

  PCollection<KV<Long, CoGbkResult>> joinedResult =
      group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply(
      "merge join results " + nestingKey,
      MapElements.into(new TypeDescriptor<MusicBrainzDataObject>() {})
          .via(
              (KV<Long, CoGbkResult> group) -> {
                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                List<MusicBrainzDataObject> childList = new ArrayList<>();
                children.forEach(childList::add);
                parentObject = parentObject.duplicate();
                parentObject.addColumnValue("recordings", childList);
                return parentObject;
              }));
}

The BigQuery API has a maximum row size limit of 100 MB when performing bulk inserts (10 MB for streaming inserts), so the code limits the number of nested recordings for a given record to 1000 elements to ensure that this limit is not reached. If a given artist has more than 1000 recordings, the code duplicates the row, including the artist metadata, and continues nesting the recording data in the duplicate row.

private static List<TableRow> toTableRows(
    MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        @SuppressWarnings("unchecked")
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }
    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting
   * limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach(
        (String key, List<MusicBrainzDataObject> nestedList) -> {
          if (nestedList.size() > 0) {
            if (parentRow.get(key) == null) {
              parentRow.set(key, new ArrayList<TableRow>());
            }
            @SuppressWarnings("unchecked")
            List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
            @SuppressWarnings("unchecked")
            Map<String, Object> map = (Map<String, Object>) serializableSchema.get(key);
            childRows.add(toChildRow(nestedList.remove(0), map));
          }
        });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

The diagram shows the sources, transformations, and sinks of the pipeline.

Optimized pipeline with sources, transformations, and sinks.

In most cases, the step names are supplied in code as part of the apply method call.

To create this optimized pipeline, complete the following steps:

  1. In Cloud Shell, ensure the environment is set up for the pipeline script:

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    
  2. Run the pipeline to nest recording rows within artist rows:

    ./run.sh nested
    
  3. As before, to see the progress of the pipeline, go to the Dataflow page.

    Go to Dataflow

    The pipeline will take approx 10 minutes to complete.

  4. When the job has completed, go to the BigQuery page.

    Go to BigQuery

  5. Query fields from the nested table in BigQuery:

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    In the output, the artist_recordings are shown as nested rows that can be expanded:

    Row artist_name artist_gender artist_area artist_recordings
    1 mirin Female Japan (5 rows)
    3 Gaudiburschen Male Germany (1 row)
    4 Sa4 Male Hamburg (10 rows)
    6 Dpat Male Houston (9 rows)

    The actual output may differ as the results are not ordered.

  6. Run a query to extract values from the STRUCT and use those values to filter the results, for example for artists who have recordings containing the word "Justin":

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    In the output, the artist_credit_name_name and recording_name are shown as nested rows that can be expanded, for example:

    Row artist_name artist_gender artist_area artist_credit_name_name recording_name
    1 Damonkenutz null null (1 row) 1 Yellowpants (Justin Martin remix)
    3 Fabian Male Germany (10+ rows) 1 Heatwave
    . 2 Starlight Love
    . 3 Dreams To Wishes
    . 4 Last Flight (Justin Faust remix)
    . ...
    4 Digital Punk Boys null null (6 rows) 1 Come True
    . 2 We Are... (Punkgirlz remix by Justin Famous)
    . 3 Chaos (short cut)
    . ...

    The actual output may differ as the results are not ordered.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Deleting individual resources

Follow these steps to delete individual resources, instead of deleting the whole project.

Deleting the Cloud Storage bucket

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.

Deleting the BigQuery datasets

  1. Open the BigQuery web UI.

    Open BIGQUERY

  2. Select the BigQuery datasets you created during the tutorial.

  3. Click Delete.

What's next

  • Learn more about writing queries for BigQuery. Querying data explains how to run synchronous and asynchronous queries, create user-defined functions (UDFs), and more.
  • Explore BigQuery syntax. BigQuery uses a SQL-like syntax that is described in the Query reference (legacy SQL).
  • Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.