Realiza operaciones ETL desde una base de datos relacional en BigQuery con Dataflow

En este instructivo, se muestra cómo usar Dataflow para extraer, transformar y cargar (ETL) datos de una base de datos relacional de procesamiento de transacciones en línea (OLTP) en BigQuery para su análisis.

El instructivo está dirigido a administradores de bases de datos, profesionales de operaciones y arquitectos de la nube interesados en aprovechar las capacidades de consulta analítica de BigQuery y las capacidades de procesamiento por lotes de Dataflow.

Las bases de datos OLTP suelen ser bases de datos relacionales que almacenan información y procesan transacciones para sitios de comercio electrónico, aplicaciones de software como servicio (SaaS) o juegos. Por lo general, las bases de datos OLTP están optimizadas para transacciones, que requieren las propiedades ACID (atomicidad, coherencia, aislamiento y durabilidad), y suelen tener esquemas altamente normalizados. En contraposición, los almacenes de datos tienden a estar optimizados para la recuperación y el análisis de datos, en lugar de las transacciones, y normalmente tienen esquemas desnormalizados. Por lo general, el hecho de desnormalizar los datos de una base de datos OLTP hace que esta sea más útil para el análisis en BigQuery.

Objetivos

En el instructivo, se muestran dos enfoques para realizar el procedimiento de ETL de los datos de RDBMS normalizados a fin de obtener datos de BigQuery desnormalizados:

  • Usar BigQuery para cargar y transformar los datos: Usa este enfoque a fin de realizar una carga única de una pequeña cantidad de datos en BigQuery para su análisis. También puedes usar este enfoque para el prototipado de tu conjunto de datos antes de la automatización de conjuntos de datos más grandes o múltiples.
  • Usar Dataflow para cargar, transformar y limpiar los datos: Usa este enfoque para cargar una cantidad mayor de datos, cargar datos desde varias fuentes o cargarlos de forma progresiva o automática.

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud sean aptos para obtener una prueba gratuita.

Cuando finalices este instructivo, podrás borrar los recursos creados para evitar que se te siga facturando. Para obtener más información, consulta cómo hacer una limpieza.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de Compute Engine and Dataflow.

    Habilita las API

  5. Instala e inicializa el SDK de Cloud.
  6. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  7. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  8. Habilita las API de Compute Engine and Dataflow.

    Habilita las API

  9. Instala e inicializa el SDK de Cloud.

Usa el conjunto de datos de MusicBrainz

Este instructivo se basa en instantáneas JSON de tablas de la base de datos de MusicBrainz, que se creó en PostgreSQL y contiene información sobre toda la música de MusicBrainz. El esquema de MusicBrainz incluye, entre otros, los siguientes elementos:

  • Artistas
  • Lanzamiento de grupos
  • Lanzamientos
  • Grabaciones
  • Trabajos
  • Sellos discográficos
  • Muchas relaciones entre estas entidades

El esquema de MusicBrainz incluye tres tablas relevantes: artist, recording y artist_credit_name. Un artist_credit representa el crédito otorgado al artista por una grabación, y las filas artist_credit_name vinculan la grabación con su artista correspondiente mediante el valor artist_credit.

En este instructivo, se proporcionan las tablas de PostgreSQL ya extraídas en formato JSON. Puedes usar el siguiente código de muestra para seguir este paso:

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json # clean up extra '\' characters

Enfoque 1: ETL con BigQuery

Usa este enfoque a fin de realizar una carga única de una pequeña cantidad de datos en BigQuery para su análisis. También puedes usar este enfoque para el prototipado de tu conjunto de datos antes de la automatización de conjuntos de datos más grandes o múltiples.

Cree un conjunto de datos de BigQuery

Para crear un conjunto de datos de BigQuery, carga las tablas de MusicBrainz a BigQuery de forma individual. Luego, une las tablas que cargaste para que cada fila contenga la vinculación de datos que desees. Almacena los resultados de la unión en una tabla de BigQuery nueva. A continuación, puedes borrar las tablas originales que cargaste.

  1. En Cloud Console, abre BigQuery.

    ABRIR BIGQUERY

  2. En Recursos, haz clic en el nombre de tu proyecto.

  3. En el panel de navegación izquierdo, haz clic en + Agregar datos.

  4. En el cuadro de diálogo Crear conjunto de datos, sigue estos pasos:

    1. En el campo ID de conjunto de datos, ingresa musicbrainz.
    2. Deja la Ubicación de datos como Predeterminada.
  5. Haz clic en Crear conjunto de datos.

Importa tablas de MusicBrainz

En cada tabla de MusicBrainz, sigue estos pasos para agregar una tabla al conjunto de datos que creaste:

  1. En Cloud Console, haz clic en el nombre del conjunto de datos y, luego, en +Crear tabla.
  2. En el cuadro de diálogo Create table (Crear tabla), sigue estos pasos y, luego, haz clic en Crear tabla:

    1. En Source (Fuente), en la lista desplegable Create table from (Crear tabla desde), selecciona Google Cloud Storage.
    2. En el campo Select file from GCS bucket (Selecciona un archivo del depósito de GCS), ingresa la URL del archivo de datos, gs://solutions-public-assets/bqetl/artist.json.
    3. En File format (Formato de archivo), selecciona JSON (Delimitado por saltos de línea).
    4. En Table name (Nombre de la tabla), ingresa el nombre de la tabla, artist.
    5. Para el Table type (Tipo de tabla), deja seleccionada Native table (Tabla nativa).
    6. Debajo de la sección Schema (Esquema), haz clic para activar Edit as Text (Editar como texto).
    7. Descarga el archivo de esquema artist.
    8. Reemplaza el contenido de la sección Schema (Esquema) por el contenido del archivo de esquema que descargaste.

    Diálogo de creación de tabla con esquema actualizado de un archivo JSON descargado

  3. Espera unos momentos a que finalice el trabajo de carga. Para supervisar el trabajo, haz clic en Historial de trabajos (Job History).

    Cuando termine la carga, la tabla nueva aparece en el conjunto de datos.

  4. Repite los pasos 1 a 3 para la tabla artist_credit_name con los siguientes cambios:

  5. Repite los pasos de 1 a 3 para la tabla recording con los siguientes cambios:

Desnormaliza los datos manualmente

A fin de desnormalizar los datos, únelos a una tabla nueva de BigQuery que tenga una fila por cada grabación del artista, junto con los metadatos seleccionados que desees conservar para el análisis.

  1. En Cloud Console, copia la siguiente consulta y pégala en el Editor de consultas:

    SELECT artist.id, artist.gid as artist_gid,
           artist.name as artist_name, artist.area,
           recording.name as recording_name, recording.length,
           recording.gid as recording_gid, recording.video
      FROM `[PROJECT_ID].[DATASET].artist` as artist
          INNER JOIN `[PROJECT_ID].[DATASET].artist_credit_name` AS artist_credit_name
               ON artist.id = artist_credit_name.artist
          INNER JOIN `[PROJECT_ID].[DATASET].recording` AS recording
               ON artist_credit_name.artist_credit = recording.artist_credit
    

    Reemplaza [DATASET] por el nombre del conjunto de datos que creaste antes, por ejemplo, musicbrainz y [PROJECT_ID] por tu ID del proyecto de Google Cloud.

  2. Haz clic en la lista desplegable Más y, luego, selecciona Query settings (Configuración de consulta).

  3. En la tarjeta Configuración de consulta (Query settings), haz lo siguiente:

    1. Selecciona la casilla de verificación Set a destination table for query results (Establecer una tabla de destino para los resultados de la consulta).
    2. En Table name (Nombre de la tabla), ingresa recordings_by_artists_manual..
    3. En Destination table write preference (Preferencia de escritura para la tabla de destino), haz clic en Overwrite table (Reemplazar tabla).
    4. Selecciona la casilla de verificación Permitir resultados grandes (sin límite de tamaño) (Allow Large Results [no size limit]).
    5. En Prioridad del trabajo (Job Priority), deja el valor predeterminado, Interactiva (Interactive).
    6. En Dialecto de SQL (SQL Dialect), deja el valor predeterminado, Estándar (Standard).
    7. Haz clic en Guardar (Save).
  4. Haz clic en Ejecutar (Run).

    Cuando se completa la consulta, los datos del resultado se organizan en canciones por cada artista de la tabla de BigQuery recién creada.

    Configuración de consulta para la tabla de destino

Enfoque 2: ETL en BigQuery con Dataflow

En esta sección del instructivo, en lugar de usar la IU de BigQuery, usa un programa de muestra para cargar datos en BigQuery mediante una canalización de Dataflow. Luego, usa el modelo de programación de Dataflow para desnormalizar y limpiar los datos que se van a cargar en BigQuery.

Antes de comenzar, repasemos algunos conceptos y examinemos el código de muestra.

Revisa los conceptos

Aunque los datos son pequeños y se pueden subir con rapidez mediante la IU de BigQuery, a los fines de este instructivo, también puedes usar Dataflow para ETL. Usa Dataflow para ETL en BigQuery en lugar de la IU de BigQuery cuando realices uniones masivas, es decir, entre 500 y 5,000 columnas de más de 10 TB de datos, con los siguientes objetivos:

  • Deseas limpiar o transformar tus datos mientras estos se cargan en BigQuery, en lugar de almacenarlos y unirlos después. Debido a esto, este enfoque también tiene menores requisitos de almacenamiento, ya que los datos solo se almacenan en BigQuery en su estado unido y transformado.
  • Planeas realizar una limpieza personalizada de los datos (que no puede lograrse simplemente con SQL).
  • Planeas combinar los datos con otros ajenos al OLTP (como registros o datos a los que se accede de forma remota) durante el proceso de carga.
  • Planeas automatizar la prueba y la implementación de la lógica de carga de datos mediante la integración o implementación continuas (IC/IC).
  • Tienes previsto que el proceso de ETL se itere, aumente y mejore gradualmente con el tiempo.
  • Planeas agregar los datos de forma incremental, en lugar de realizar un ETL único.

Este es un diagrama de la canalización de datos que crea el programa de muestra:

Canalización de datos con BigQuery.

En el código de ejemplo, muchos de los pasos de canalización están agrupados o unidos en métodos útiles, reciben nombres descriptivos y se reutilizan. En el diagrama, los pasos que se reutilizan se indican con bordes discontinuos.

Revisa el código de canalización

El código crea una canalización que realiza los siguientes pasos:

  1. Carga cada tabla que desees que forme parte de la unión en una PCollection de strings. Cada elemento comprende la representación JSON de una fila de la tabla.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Convierte esas strings JSON en representaciones de objetos, objetos MusicBrainzDataObject; luego, organiza las representaciones de los objetos por uno de los valores de la columna, como una clave primaria o externa.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name,
                        MapElements
                          .into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
                          .via( (String input) -> {
                                MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                                Long key = (Long) datum.getColumnValue(namespacedKeyname);
                                return KV.of(key, datum);
                                })
             );
    }
  3. Une la lista según un artista en común. El artist_credit_name vincula el crédito de un artista con su grabación y también incluye la clave externa del artista. La tabla artist_credit_name se carga como una lista de objetos de clave-valor KV. El miembro K es el artista.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. Une la lista mediante el método MusicBrainzTransforms.innerJoin().

    public static PCollection<MusicBrainzDataObject>
                         innerJoin(String name,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table1,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(){};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(){};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Agrupa las colecciones de objetos KV por el miembro clave que deseas usar para la unión. Esto da como resultado una PCollection de objetos KV con una clave larga (el valor de la columna artist.id) y el CoGbkResult resultante (que significa “grupo de combinación por resultado clave”). El objeto CoGbkResult es una tupla de listas de objetos con la clave-valor en común de la primera y la segunda PCollections. Esta tupla es accesible mediante el uso de la etiqueta de tuplas formulada para cada PCollection antes de ejecutar la operación CoGroupByKey en el método group.
    2. Fusiona cada combinación de objetos para formar un objeto MusicBrainzDataObject que representa un resultado de unión.

          PCollection<List<MusicBrainzDataObject>> mergedResult =
              joinedResult.apply("merge join results",
                           MapElements
                         .into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                         .via( ( KV<Long, CoGbkResult> group ) -> {
                             List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
                             Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                             Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                             leftObjects.forEach((MusicBrainzDataObject l) -> {
                               rightObjects.forEach((MusicBrainzDataObject r) -> {
                                 result.add(l.duplicate().merge(r));
                               });
                             });
                             return result;
                           }
                         )
      );
    3. Reorganiza la colección en una lista de objetos KV para comenzar la siguiente unión. Esta vez, el valor K es la columna artist_credit, que se usa para unirse con la tabla de grabación.

      PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. Obtiene la colección final resultante de objetos MusicBrainzDataObject mediante la unión ese resultado con la colección cargada de grabaciones organizadas por artist_credit.id.

      PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
         artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. Asigna los objetos MusicBrainzDataObjects resultantes en TableRows.

      PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Escribe el TableRows resultante en BigQuery.

      tableRows.apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
          .to(BQETLOptions.getBigQueryTablename())
          .withSchema(bqTableSchema)
          .withCustomGcsTempLocation(StaticValueProvider.of(BQETLOptions.getTempLocation() ))
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Para obtener más información sobre el funcionamiento de la programación de canalizaciones de Dataflow, consulta los siguientes temas sobre el modelo de programación:

Después de revisar los pasos que realiza el código, puedes ejecutar la canalización.

Ejecuta el código de canalización

  1. En Cloud Console, abre Cloud Shell.

    Abra Cloud Shell

  2. Configura las variables de entorno de tu proyecto:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    

    Reemplaza [PROJECT_ID] por el ID del proyecto de Google Cloud y reemplaza [CHOOSE_AN_APPROPRIATE_ZONE] por una zona de Google Cloud.

  3. Configura las variables de entorno que usa la secuencia de comandos de la canalización:

    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export DATASET=musicbrainz
    export SERVICE_ACCOUNT=project-owner
    
  4. Asegúrate de que gcloud use el proyecto que creaste o seleccionaste al comienzo del instructivo:

    gcloud config set project $PROJECT_ID
    
  5. Crea una cuenta de servicio para ejecutar la canalización:

    gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
        --display-name "Project Owner Account"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
        --role roles/owner
    gcloud iam service-accounts keys create \
        ~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json \
        --iam-account ${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
    

    Este comando descarga un archivo JSON que contiene la clave de tu cuenta de servicio. Almacena este archivo en una ubicación segura.

  6. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio.

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  7. Clona el repositorio que contiene el código de Dataflow:

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  8. Cambia el directorio a la muestra:

    cd bigquery-etl-dataflow-sample
    
  9. Crea un bucket de etapa de pruebas en Cloud Storage, ya que los trabajos de Dataflow requieren un bucket en Cloud Storage para la etapa de pruebas de los archivos binarios que se usan a fin de ejecutar la canalización.

    gsutil mb gs://$STAGING_BUCKET
    
  10. Configura el ciclo de vida del objeto para [STAGING_BUCKET_NAME] según el del archivo dataflow-staging-policy.json:

    gsutil lifecycle set dataflow-staging-policy.json gs://$STAGING_BUCKET
    
  11. Ejecuta el trabajo de Dataflow:

    ./run.sh simple
    
  12. Para ver el progreso de la canalización, ve a la página de Dataflow en Cloud Console.

    Ir a la página de Dataflow

    El estado de los trabajos se muestra en la columna de estado. Si el estado es Correcto, significa que el trabajo se completó.

  13. Para ver el gráfico del trabajo y los detalles de los pasos, haz clic en el nombre del trabajo, por ejemplo, etl-into-bigquery-bqetlsimple (opcional).

  14. En Cloud Console, ve a la página BigQuery.

    Ir a la página BigQuery

    Asegúrate de que tu proyecto de Google Cloud esté seleccionado.

  15. Para ejecutar una consulta en la tabla nueva, ingresa lo siguiente en el Query Editor (Editor de consultas):

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Query Editor (Editor de consultas) actualizado con una consulta para una tabla nueva.

Limpia los datos

A continuación, realiza un pequeño cambio en la canalización de Dataflow para poder cargar tablas de consulta y procesarlas como entradas complementarias, como se muestra en el siguiente diagrama.

Canalización de Dataflow actualizada para entradas complementarias.

Cuando consultas la tabla de BigQuery resultante, es difícil suponer desde dónde se origina el artista sin buscar de forma manual el ID numérico de área de la tabla area en la base de datos de MusicBrainz. Por lo tanto, analizar los resultados de una consulta resulta menos sencillo de lo que podría ser.

De forma similar, los géneros de los artistas se muestran como ID, pero toda la tabla de géneros de MusicBrainz consiste de solo tres filas. Para solucionar este problema, puedes agregar un paso en la canalización de Dataflow a fin de usar las tablas area y gender de MusicBrainz para asignar los ID a las etiquetas correspondientes.

Las tablas artist_area y artist_gender contienen una cantidad de filas significativamente menor que la tabla de datos de grabación o de los artistas. La cantidad de elementos en las tablas posteriores está limitada por la cantidad de áreas geográficas o géneros, respectivamente.

Como resultado, el paso de búsqueda usa la función de Dataflow llamada entrada complementaria.

Las entradas complementarias se cargan como exportaciones de tablas en formato JSON delimitado por saltos de línea, y se usan para desnormalizar los datos de la tabla en un solo paso.

Revisa el código que agrega las entradas complementarias a la canalización

Antes de ejecutar la canalización, revisa el código para comprender mejor los pasos nuevos.

En el archivo BQETLSimple.java, revisa las líneas con comentarios. Se les quitará el comentario en el siguiente paso.

//PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//        MusicBrainzTransforms.lookup("gender","id","name","gender"));

PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

En este código, se muestra la limpieza de datos con entradas complementarias. La clase MusicBrainzTransforms proporciona ventajas adicionales para usar entradas complementarias a fin de asignar clave-valor externa a la etiqueta. La biblioteca MusicBrainzTransforms proporciona un método que crea una clase de búsqueda interna. La clase de búsqueda describe cada tabla de consulta y los campos que se reemplazan por etiquetas y argumentos de longitud variable. keyKey es el nombre de la columna que contiene la clave para la búsqueda y valueKey es el nombre de la columna que contiene la etiqueta correspondiente.

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Cada entrada complementaria se carga como un objeto de asignación único, que se usa con el objetivo de buscar la etiqueta que corresponde para un ID.

Primero, el archivo JSON para la tabla de consulta se carga al principio en MusicBrainzDataObjects con un espacio de nombres vacío y se convierte en una asignación del valor de la columna Key al valor de la columna Value.

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(
        "sideInput_" + name,
        MapElements
          .into(new TypeDescriptor<KV<Long, String>>() {})
          .via((String input) -> {
                 MusicBrainzDataObject object = JSONReader.readObject(name, input);
                 Long key = (Long) object.getColumnValue(keyKeyName);

                 String value = (String) object.getColumnValue(valueKeyName);
                 return KV.of(key, value);
               })
        );

  return entries.apply(View.<Long, String>asMap());
}

El valor de destinationKey de los objetos Map coloca un Map en cada uno. Este valor es la clave que se debe reemplazar por los valores buscados.

List<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>> mapSideInputs = new ArrayList<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
                            .map( destinationKey -> name + "_" + destinationKey )
                            .collect(Collectors.toList());

    mapSideInputs.add(new SimpleEntry(destKeyList, mapView));

}

Luego, mientras se transforman los objetos de artista desde el archivo JSON, el valor de destinationKey (que comienza como un número) se reemplaza por la etiqueta correspondiente.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach( ( String key ) -> {
  Long id = (Long) result.getColumnValue(key);
  if (id != null) {
    String label = (String) sideInputMap.get(id);
    if (label == null) {
      label = "" + id;
    }
    result.replace(key, label);

A fin de modificar BQETLSimple.java para usar las búsquedas y decodificar los datos de los campos artist_area y artist_gender, realiza los siguientes pasos:

  1. Modifica ligeramente el flujo del programa:

    1. Quita los comentarios de las líneas que cargan los datos del artista mediante búsquedas.
    2. Marca como comentario la llamada a loadTable que carga los datos del artista sin las búsquedas.
    //PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
    //        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
    //        MusicBrainzTransforms.lookup("gender","id","name","gender"));
    
    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");
  2. Cambia los TableFieldSchemas de artist_area y artist_gender con el objetivo de que sean del tipo de datos string en lugar de int mediante el agregado de un comentario en los campos int correspondientes y la eliminación de los comentarios de los campos string correspondientes.

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")
    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")
    /*Switch these two lines when using mapping table for artist_begin_area */
            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")
  3. Para volver a ejecutar el código de la canalización, completa los siguientes pasos:

    1. Configura las variables de entorno de tu proyecto:

      export PROJECT_ID=[PROJECT_ID]
      export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
      
    2. Asegúrate de que el entorno esté configurado:

      export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
      export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
      export DATASET=musicbrainz
      export SERVICE_ACCOUNT=project-owner
      
    3. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS con la ruta de acceso al archivo JSON que contiene la clave de tu cuenta de servicio.

      export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
      
    4. Ejecuta la canalización para anidar las filas de grabaciones en las filas del artista:

      ./run.sh simple
      
  4. Realiza la misma consulta que incluye artist_area y artist_gender:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    En el resultado, artist_area y artist_gender ahora están decodificados:

    Resultado decodificado por “artist_area” y “artist_gender”.

Optimiza el esquema de BigQuery

En la parte final de este instructivo, ejecutas una canalización que genera un esquema de tabla óptimo con campos anidados.

Tómate unos minutos para revisar el código con el que se genera esta versión optimizada de la tabla.

En el siguiente diagrama, se muestra una canalización de Dataflow un poco diferente que anida las grabaciones del artista dentro de cada fila de artistas, en lugar de crear filas de artistas duplicadas.

Canalización de Dataflow que anida las grabaciones del artista dentro de cada fila de artistas.

La representación actual de los datos es bastante plana. Es decir, tiene una fila por cada grabación acreditada con todos los metadatos del artista del esquema de BigQuery y todos los metadatos de la grabación y artist_credit_name. Esta representación plana tiene al menos dos desventajas:

  • Repite los metadatos de artist por cada grabación que se le acredita a un artista, por lo que se requiere más almacenamiento.
  • Cuando exportas los datos en formato JSON, se exporta un arreglo que repite esos datos, en lugar de un artista con los datos de las grabaciones anidados, que posiblemente es lo que deseas.

Una alternativa que no tiene ninguna desventaja en términos de rendimiento y, además, te permite ahorrar almacenamiento es que, en lugar de almacenar una grabación por fila, almacenes las grabaciones como un campo repetido dentro del registro de cada artista. Para ello, debes hacer algunas modificaciones en la canalización de Dataflow.

En lugar de unir las grabaciones con la información del artista mediante artist_credit_name.artist, esta canalización alternativa crea una lista de grabaciones anidada dentro de un objeto de artista.

public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                      PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                      String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(){};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(){};

  PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply("merge join results " + nestingKey,
                            MapElements
                             .into(new TypeDescriptor<MusicBrainzDataObject>() {})
                             .via((KV<Long, CoGbkResult> group) -> {
                                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                                List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
                                children.forEach(childList::add);
                                parentObject = parentObject.duplicate();
                                parentObject.addColumnValue("recordings", childList);
                                return parentObject;
                              })
                         );
}

TableRow tiene límites de tamaño en la API de BigQuery, por lo que el código limita el número de grabaciones anidadas para un registro determinado a 1,000 elementos. Si un artista determinado tiene más de 1,000 grabaciones, el código duplica la fila, junto con los metadatos de artist y continúa anidando los datos de las grabaciones en la fila duplicada.

private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<TableRow>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }

    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
      if (nestedList.size() > 0) {
        if (parentRow.get(key) == null) {
          parentRow.set(key, new ArrayList<TableRow>());
        }
        List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
        childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
      }
    });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

El diagrama muestra las fuentes, las transformaciones y los receptores de la canalización.

Canalización optimizada con fuentes, transformaciones y receptores.

En la mayoría de los casos, los nombres de los pasos se proporcionan en el código como parte de la llamada al método apply.

Para crear esta canalización optimizada, sigue estos pasos:

  1. En Cloud Shell, asegúrate de que el entorno esté configurado para la secuencia de comandos de la canalización:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export SERVICE_ACCOUNT=project-owner
    
  2. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS con la ruta del archivo JSON que contiene la clave de tu cuenta de servicio:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  3. Ejecuta la canalización para anidar las filas de grabaciones en las filas del artista:

    ./run.sh nested
    
  4. Consulta campos de la tabla anidada en BigQuery:

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Resultados de consulta de la tabla anidada.

  5. Ejecuta una consulta a fin de extraer valores de STRUCT y úsalos para filtrar los resultados:

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    Consulta para filtrar los resultados

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra recursos individuales

Sigue estos pasos para borrar recursos individuales en lugar de borrar todo el proyecto.

Borra el bucket de Cloud Storage

  1. En Cloud Console, ve a la página Navegador de Cloud Storage.

    Ir al navegador

  2. Haz clic en la casilla de verificación del bucket que deseas borrar.
  3. Para borrar el bucket, haz clic en Borrar y, luego, sigue las instrucciones.

Borra los conjuntos de datos de BigQuery

  1. Abre la IU web de BigQuery.

    Abrir BIGQUERY

  2. Selecciona los conjuntos de datos de BigQuery que creaste durante el instructivo.

  3. Haz clic en Borrar.

¿Qué sigue?