Exécuter des opérations ETL à partir d'une base de données relationnelle dans BigQuery à l'aide de Dataflow

Ce tutoriel explique comment utiliser Dataflow pour extraire, transformer et charger des données (ETL, "extract, transform and load") à partir d'une base de données relationnelle de traitement des transactions en ligne (OLTP, "online transaction processing") vers BigQuery à des fins d'analyse.

Ce tutoriel est destiné aux administrateurs de bases de données, aux responsables d'exploitation et aux architectes cloud qui souhaitent bénéficier des capacités de requêtes analytiques de BigQuery et des fonctionnalités de traitement par lots de Dataflow.

Les bases de données OLTP sont souvent des bases de données relationnelles qui stockent des informations et traitent des transactions pour des sites d'e-commerce, des applications SaaS (Software as a Service) ou encore des jeux. Elles sont généralement optimisées pour les transactions présentant des schémas hautement normalisés et exigeant des propriétés ACID : atomicité, cohérence, isolation et durabilité. En revanche, les entrepôts de données sont généralement optimisés pour l'extraction et l'analyse des données plutôt que pour les transactions, et possèdent donc des schémas dénormalisés. Généralement, la dénormalisation des données à partir d'une base de données OLTP facilite leur analyse dans BigQuery.

Objectifs

Le tutoriel présente deux approches pour l'ETL de données de SGBDR normalisés vers des données BigQuery dénormalisées :

  • Utiliser BigQuery pour charger et transformer les données. Utilisez cette approche pour le chargement ponctuel de petits volumes de données dans BigQuery à des fins d'analyse. Vous pouvez également utiliser cette approche pour prototyper votre ensemble de données avant d'utiliser l'automatisation avec des ensembles de données plus volumineux ou plusieurs ensembles de données.
  • Utiliser Dataflow pour charger, transformer et nettoyer les données : Utilisez cette approche pour charger des données plus volumineuses ou provenant de plusieurs sources, ou pour charger des données de manière incrémentielle ou automatique.

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé ce tutoriel, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Consultez la page Effectuer un nettoyage pour en savoir plus.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Cloud.

    Accéder à la page de sélection du projet

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Compute Engine et Dataflow.

    Activer les API

  5. Installez et initialisez le SDK Cloud.

Utiliser l'ensemble de données MusicBrainz

Ce tutoriel s'appuie sur des instantanés JSON de tables figurant dans la base de données MusicBrainz, construite en PostgreSQL et contenant des informations sur l'ensemble de la musique répertoriée dans MusicBrainz. Parmi les éléments du schéma MusicBrainz, on trouve les suivants :

  • Artistes
  • Groupes de parution
  • Versions
  • Enregistrements
  • Travaux
  • Labels
  • Différentes relations entre ces entités

Le schéma MusicBrainz comprend trois tables pertinentes : artist, recording et artist_credit_name. Un crédit d'artiste artist_credit représente le crédit accordé à l'artiste pour un enregistrement, et les lignes artist_credit_name associent l'enregistrement à l'artiste correspondant via la valeur artist_credit.

Ce tutoriel fournit les tables PostgreSQL déjà extraites au format JSON. Pour effectuer cette étape vous-même, vous pouvez utiliser l'exemple de code suivant :

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json # clean up extra '\' characters

Approche 1 : ETL avec BigQuery

Utilisez cette approche pour charger une petite quantité de données dans BigQuery à des fins d'analyse. Vous pouvez également utiliser cette approche pour prototyper votre ensemble de données avant d'utiliser l'automatisation avec des ensembles de données plus volumineux ou plusieurs ensembles de données.

Créer un ensemble de données BigQuery

Le diagramme suivant illustre les étapes à suivre pour créer un ensemble de données BigQuery.

Étapes de création d'un ensemble de données BigQuery.

Chargez les tables MusicBrainz une par une dans BigQuery, puis joignez les tables chargées de sorte que chaque ligne contienne l'association de données souhaitée. Stockez les résultats de la jointure dans une nouvelle table BigQuery. Vous pouvez ensuite supprimer les tables d'origine que vous avez chargées.

  1. Dans Cloud Console, ouvrez BigQuery.

    OUVRIR BIGQUERY

  2. Sous Ressources, cliquez sur le nom de votre projet.

  3. Dans le menu de gauche, cliquez sur + Ajouter des données.

  4. Dans la boîte de dialogue Créer un ensemble de données, effectuez les opérations suivantes :

    1. Dans le champ ID de l'ensemble de données, saisissez musicbrainz.
    2. Dans le champ Emplacement des données, conservez la Valeur par défaut.
  5. Cliquez sur Créer un ensemble de données.

Importer des tables MusicBrainz

Pour chaque table MusicBrainz, suivez les étapes ci-dessous pour ajouter une table à l'ensemble de données que vous avez créé :

  1. Dans Cloud Console, cliquez sur le nom de l'ensemble de données, puis cliquez sur + Créer une table.
  2. Dans la boîte de dialogue Create table (créer une table), procédez comme indiqué ci-dessous, puis cliquez sur Create table :

    1. Sous Source, dans la liste déroulante Create table from (créer une table à partir de), sélectionnez Google Cloud Storage.
    2. Dans le champ Select file from GCS bucket (Sélectionner un fichier depuis le bucket Cloud Storage), saisissez l'URL du fichier de données, gs://solutions-public-assets/bqetl/artist.json.
    3. Pour Format de fichier, sélectionnez JSON (délimité par retour à la ligne).
    4. Pour Table name (Nom de la table), saisissez le nom de la table, artist.
    5. Pour Table type (Type de table), laissez l'option Native table (Table native) sélectionnée.
    6. Sous la section Schema (Schéma), cliquez sur Edit as Text (Modifier sous forme de texte).
    7. Téléchargez le fichier de schéma artist.
    8. Remplacez le contenu de la section Schema (Schéma) par le contenu du fichier de schéma que vous venez de télécharger.

    Boîte de dialogue de création de table avec le schéma mis à jour à partir du fichier JSON téléchargé.

  3. Attendez quelques instants que le chargement soit terminé. Pour surveiller la tâche, cliquez sur Historique des tâches.

    Une fois le chargement terminé, la nouvelle table apparaît sous l'ensemble de données.

  4. Répétez les étapes 1 à 3 pour la table artist_credit_name, avec les modifications suivantes :

  5. Répétez les étapes 1 à 3 pour la table recording, avec les modifications suivantes :

Dénormaliser manuellement les données

Pour dénormaliser les données, joignez-les dans une nouvelle table BigQuery comportant une ligne pour chaque enregistrement d'artiste, avec les métadonnées sélectionnées que vous souhaitez conserver pour l'analyse.

  1. Dans Cloud Console, copiez la requête suivante et collez-la dans l'éditeur de requête :

    SELECT artist.id, artist.gid as artist_gid,
           artist.name as artist_name, artist.area,
           recording.name as recording_name, recording.length,
           recording.gid as recording_gid, recording.video
      FROM `[PROJECT_ID].[DATASET].artist` as artist
          INNER JOIN `[PROJECT_ID].[DATASET].artist_credit_name` AS artist_credit_name
               ON artist.id = artist_credit_name.artist
          INNER JOIN `[PROJECT_ID].[DATASET].recording` AS recording
               ON artist_credit_name.artist_credit = recording.artist_credit
    

    Remplacez [DATASET] par le nom de l'ensemble de données que vous avez créé précédemment, par exemple musicbrainz, et [PROJECT_ID] par l'ID de votre projet Google Cloud.

  2. Cliquez sur la liste déroulante Plus, puis sélectionnez Paramètres de requête.

  3. Dans la fiche Query settings (Paramètres de requête), renseignez les éléments suivants :

    1. Cochez la case Set a destination table for query results (Définir une table de destination pour les résultats de la requête).
    2. Dans le champ Table name (Nom de la table), saisissez recordings_by_artists_manual.
    3. Dans la section Destination table write preference (Préférence d'écriture pour la table de destination), choisissez Overwrite table (Écraser la table).
    4. Cochez la case Allow Large Results (no size limit) (Autoriser un nombre élevé de résultats (aucune limite)).
    5. Conservez la valeur par défaut, Interactive, pour la Task priority (Priorité de la tâche).
    6. Conservez la valeur par défaut, Standard, pour le SQL Dialect (Dialecte SQL).
    7. Cliquez sur Enregistrer.
  4. Cliquez sur Exécuter.

    Lorsque la requête se termine, les données sont triées par chanson pour chaque artiste dans la nouvelle table BigQuery.

    Paramètres de la requête pour la table de destination.

Approche 2 : ETL dans BigQuery avec Dataflow

Dans cette section du tutoriel, au lieu d'utiliser l'interface utilisateur BigQuery, vous utilisez un exemple de programme pour charger des données dans BigQuery à l'aide d'un pipeline Dataflow. Vous utilisez ensuite le modèle de programmation de Dataflow pour dénormaliser et nettoyer les données à charger dans BigQuery.

Avant de commencer, prenez un moment pour examiner les concepts et l'exemple de code.

Examiner les concepts

Bien que les données soient d'un volume réduit et qu'elles puissent être importées rapidement à l'aide de l'interface utilisateur de BigQuery, vous pouvez aussi utiliser Dataflow pour l'ETL dans le cadre de ce tutoriel. Utilisez Dataflow pour l'ETL dans BigQuery plutôt que l'interface utilisateur de BigQuery lorsque vous effectuez des jointures massives (entre 500 et 5 000 colonnes représentant plus de 10 To de données), avec les objectifs suivants :

  • Vous souhaitez nettoyer ou transformer vos données à mesure qu'elles sont chargées dans BigQuery, au lieu de les stocker et de les joindre ultérieurement. Cette approche a pour effet de réduire les besoins en stockage car les données ne sont stockées dans BigQuery que dans leur état joint et transformé.
  • Vous souhaitez effectuer un nettoyage personnalisé des données (qui ne peut être réalisé de manière simple avec SQL).
  • Vous souhaitez combiner les données avec des données extérieures à OLTP, telles que des journaux ou des données accessibles à distance, durant le processus de chargement.
  • Vous souhaitez automatiser les tests et le déploiement de la logique de chargement des données à l'aide d'une intégration ou d'un déploiement continus (CI/CD).
  • Vous prévoyez une itération graduelle et une amélioration du processus ETL au fil du temps.
  • Vous souhaitez ajouter des données de manière incrémentielle, plutôt qu'en suivant un processus ETL unique.

Voici un diagramme du pipeline de données créé par l'exemple de programme :

Pipeline de données utilisant BigQuery.

Dans l'exemple de code, de nombreuses étapes du pipeline sont regroupées et/ou encapsulées dans des méthodes pratiques, nommées et réutilisées. Dans le diagramme, les étapes réutilisées sont signalées par des bordures en pointillés.

Examiner le code du pipeline

Le code crée un pipeline qui effectue les étapes suivantes :

  1. Chaque table que vous voulez intégrer à la jointure est chargée dans une collection PCollection de chaînes. Chaque élément comprend la représentation JSON d'une ligne de la table.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Les chaînes JSON sont converties en représentations d'objets MusicBrainzDataObject, puis les représentations d'objets sont organisées suivant l'une des valeurs de colonne, par exemple une clé primaire ou étrangère.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name,
                        MapElements
                          .into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
                          .via( (String input) -> {
                                MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                                Long key = (Long) datum.getColumnValue(namespacedKeyname);
                                return KV.of(key, datum);
                                })
             );
    }
  3. La liste fait l'objet d'une jointure sur la base de l'artiste commun. artist_credit_name associe un crédit d'artiste à son enregistrement et inclut la clé étrangère d'artiste. La table artist_credit_name est chargée en tant que liste d'objets KV (Key Value). Le membre K correspond à l'artiste.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. La liste fait l'objet d'une jointure à l'aide de la méthode MusicBrainzTransforms.innerJoin().

    public static PCollection<MusicBrainzDataObject>
                         innerJoin(String name,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table1,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(){};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(){};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Cette méthode regroupe les collections d'objets KV selon le membre clé que vous souhaitez joindre. Cela produit une collection PCollection d'objets KV avec une clé longue (la valeur de colonne artist.id) et l'objet CoGbkResult (combinaison des groupes par résultat clé) résultant. L'objet CoGbkResult est un tuple de listes d'objets avec la valeur de clé commune des première et deuxième collections PCollections. Ce tuple est adressable à l'aide du tag tuple formulé pour chaque collection PCollection avant l'exécution de l'opération de regroupement CoGroupByKey dans la méthode group.
    2. La méthode fusionne chaque correspondance d'objets dans un objet MusicBrainzDataObject qui représente un résultat de jointure.

          PCollection<List<MusicBrainzDataObject>> mergedResult =
              joinedResult.apply("merge join results",
                           MapElements
                         .into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                         .via( ( KV<Long, CoGbkResult> group ) -> {
                             List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
                             Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                             Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                             leftObjects.forEach((MusicBrainzDataObject l) -> {
                               rightObjects.forEach((MusicBrainzDataObject r) -> {
                                 result.add(l.duplicate().merge(r));
                               });
                             });
                             return result;
                           }
                         )
      );
    3. La méthode réorganise la collection dans une liste d'objets KV pour commencer la jointure suivante. Cette fois, la valeur K correspond à la colonne artist_credit, utilisée pour joindre la table d'enregistrement.

      PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. La méthode obtient la collection finale résultante d'objets MusicBrainzDataObject en joignant ce résultat à la collection chargée des enregistrements organisés par artist_credit.id.

      PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
         artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. La méthode mappe les objets MusicBrainzDataObjects obtenus dans TableRows.

      PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Elle écrit les lignes TableRows résultantes dans BigQuery.

      tableRows.apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
          .to(BQETLOptions.getBigQueryTablename())
          .withSchema(bqTableSchema)
          .withCustomGcsTempLocation(StaticValueProvider.of(BQETLOptions.getTempLocation() ))
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Pour plus d'informations sur les mécanismes de programmation de pipeline Dataflow, consultez les rubriques spécifiques suivantes sur le modèle de programmation :

Après avoir examiné les étapes effectuées par le code, vous pouvez exécuter le pipeline.

Exécuter le code de pipeline

  1. Dans Cloud Console, ouvrez Cloud Shell.

    Ouvrir Cloud Shell

  2. Définissez les variables d'environnement pour votre projet :

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    

    Remplacez [PROJECT_ID] par l'ID de votre projet Google Cloud et remplacez [CHOOSE_AN_APPROPRIATE_ZONE] par une zone Google Cloud.

  3. Définissez les variables d'environnement utilisées par le script de pipeline :

    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export DATASET=musicbrainz
    export SERVICE_ACCOUNT=project-owner
    
  4. Assurez-vous que gcloud utilise bien le projet que vous avez créé ou sélectionné au début de ce tutoriel :

    gcloud config set project $PROJECT_ID
    
  5. Créez un compte de service pour exécuter le pipeline :

    gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
        --display-name "Project Owner Account"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
        --role roles/owner
    gcloud iam service-accounts keys create \
        ~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json \
        --iam-account ${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
    

    Cette commande télécharge un fichier JSON contenant la clé de votre compte de service. Veillez à conserver ce fichier dans un emplacement sécurisé.

  6. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS de façon à pointer vers le chemin du fichier JSON contenant la clé de votre compte de service :

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  7. Clonez le dépôt contenant le code Dataflow :

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  8. Remplacez le répertoire par celui de l'exemple :

    cd bigquery-etl-dataflow-sample
    
  9. Vous devez maintenant créer un bucket de préproduction dans Cloud Storage, car les tâches Dataflow ont besoin d'un tel bucket pour le stockage temporaire des fichiers binaires servant à exécuter le pipeline :

    gsutil mb gs://$STAGING_BUCKET
    
  10. Définissez le cycle de vie de l'objet pour [STAGING_BUCKET_NAME] sur celui du fichier dataflow-staging-policy.json :

    gsutil lifecycle set dataflow-staging-policy.json gs://$STAGING_BUCKET
    
  11. Exécutez la tâche Dataflow :

    ./run.sh simple
    
  12. Pour voir la progression du pipeline, accédez à la page Dataflow dans Cloud Console.

    Accéder à la page Dataflow

    L'état de chaque tâche est indiqué dans la colonne d'état. L'état Réussie indique que la tâche est terminée.

  13. (Facultatif) Pour afficher le graphique de la tâche, ainsi que des informations détaillées sur ses étapes, cliquez sur son nom, par exemple etl-into-bigquery-bqetlsimple.

  14. Dans Cloud Console, accédez à la page BigQuery.

    Accéder à la page BigQuery

    Assurez-vous que votre projet Google Cloud est sélectionné.

  15. Pour exécuter une requête sur la nouvelle table, dans le volet de l'éditeur de requête, saisissez les éléments suivants :

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Éditeur de requêtes mis à jour avec la requête de nouvelle table.

Nettoyer les données

L'étape suivante consiste à modifier légèrement le pipeline Dataflow afin de pouvoir charger des tables de recherche et de les traiter en tant qu'entrées secondaires, comme illustré dans le diagramme suivant.

Pipeline Dataflow mis à jour pour les entrées secondaires.

Lorsque vous interrogez la table BigQuery résultante, il est difficile d'identifier l'origine géographique de l'artiste sans avoir à rechercher manuellement l'identifiant numérique de la région dans la table area de la base de données MusicBrainz. Ainsi, l'analyse des résultats de la requête n'est pas aussi directe qu'elle pourrait l'être.

De même, le sexe des artistes est indiqué en tant qu'ID, or la table de sexe MusicBrainz ne comprend que trois lignes. Pour résoudre ce problème, vous pouvez ajouter une étape dans le pipeline Dataflow afin d'utiliser les tables area et gender de MusicBrainz pour mapper les ID aux étiquettes correspondantes.

Les tables artist_area et artist_gender contiennent un nombre de lignes nettement inférieur à celui des tables des artistes ou des données relatives aux enregistrements. Le nombre d'éléments dans ces tables est limité, respectivement, par le nombre de zones géographiques ou de sexes.

En conséquence, l'étape de recherche utilise la fonctionnalité d'entrée secondaire de Dataflow.

Les entrées secondaires sont chargées en tant qu'exportations de tables au format JSON délimité par des lignes et sont utilisées pour dénormaliser les données de table en une seule étape.

Examiner le code ajoutant les entrées secondaires au pipeline

Avant d'exécuter le pipeline, consultez son code pour bien comprendre les nouvelles étapes.

Dans le fichier BQETLSimple.java, passez en revue les lignes commentées. La mise en commentaire sera levée dans une étape suivante.

//PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//        MusicBrainzTransforms.lookup("gender","id","name","gender"));

PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

Ce code illustre le nettoyage des données avec des entrées secondaires. La classe MusicBrainzTransforms permet d'utiliser plus facilement les entrées secondaires pour mapper les valeurs de clés étrangères avec des étiquettes. La bibliothèque MusicBrainzTransforms fournit par exemple une méthode permettant de créer une classe de recherche interne. La classe de recherche décrit chaque table de conversion et les champs qui doivent être remplacés par des étiquettes et des arguments de longueur variable. keyKey est le nom de la colonne qui contient la clé pour la recherche et valueKey est le nom de la colonne qui contient l'étiquette correspondante.

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Chaque entrée secondaire est chargée sous la forme d'un objet de mappage unique, utilisé pour rechercher l'étiquette correspondant à un ID.

Le fichier JSON du tableau de conversion est initialement chargé dans MusicBrainzDataObjects avec un espace de noms vide, puis il est transformé en mappage de la valeur de la colonne Key à la valeur de la colonne Value.

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(
        "sideInput_" + name,
        MapElements
          .into(new TypeDescriptor<KV<Long, String>>() {})
          .via((String input) -> {
                 MusicBrainzDataObject object = JSONReader.readObject(name, input);
                 Long key = (Long) object.getColumnValue(keyKeyName);

                 String value = (String) object.getColumnValue(valueKeyName);
                 return KV.of(key, value);
               })
        );

  return entries.apply(View.<Long, String>asMap());
}

Chacun de ces objets Map est placé dans une Map suivant la valeur de sa clé de destination destinationKey, qui est la clé à remplacer par les valeurs recherchées.

List<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>> mapSideInputs = new ArrayList<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
                            .map( destinationKey -> name + "_" + destinationKey )
                            .collect(Collectors.toList());

    mapSideInputs.add(new SimpleEntry(destKeyList, mapView));

}

Puis, lors de la transformation des objets d'artiste à partir de JSON, la valeur de la clé destinationKey (qui commence par un nombre) est remplacée par son étiquette.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach( ( String key ) -> {
  Long id = (Long) result.getColumnValue(key);
  if (id != null) {
    String label = (String) sideInputMap.get(id);
    if (label == null) {
      label = "" + id;
    }
    result.replace(key, label);

Pour modifier le fichier BQETLSimple.java et utiliser des recherches afin de décoder les données des champs artist_area et artist_gender, procédez comme suit :

  1. Modifiez légèrement le déroulement du programme :

    1. Annulez la mise en commentaire des lignes qui chargent les données d'artiste à l'aide des recherches.
    2. Mettez en commentaire l'appel à loadTable qui charge les données d'artiste sans les recherches.
    //PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
    //        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
    //        MusicBrainzTransforms.lookup("gender","id","name","gender"));
    
    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");
  2. Modifiez TableFieldSchemas pour artist_area et artist_gender afin de définir le type de données string à la place de int en mettant les champs int correspondants en commentaire et en supprimant la mise en commentaire des champs string correspondants.

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")
    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")
    /*Switch these two lines when using mapping table for artist_begin_area */
            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")
  3. Pour exécuter à nouveau le code de pipeline, procédez comme suit :

    1. Définissez les variables d'environnement pour votre projet :

      export PROJECT_ID=[PROJECT_ID]
      export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
      
    2. Assurez-vous que l'environnement est correctement configuré :

      export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
      export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
      export DATASET=musicbrainz
      export SERVICE_ACCOUNT=project-owner
      
    3. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service.

      export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
      
    4. Exécutez le pipeline pour imbriquer des lignes d'enregistrement dans des lignes d'artiste :

      ./run.sh simple
      
  4. Envoyez la même requête, qui inclut artist_area et artist_gender :

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Dans la sortie, les éléments artist_area et artist_gender sont maintenant décodés :

    Sortie décodée par

Optimiser le schéma BigQuery

Dans la dernière partie de ce tutoriel, vous allez exécuter un pipeline générant un schéma de table plus optimal grâce à des champs imbriqués.

Prenez le temps d'examiner le code utilisé pour générer cette version optimisée de la table.

Le diagramme suivant illustre un pipeline Dataflow légèrement différent, qui imbrique les enregistrements des artistes dans chaque ligne d'artiste au lieu de créer des lignes d'artiste en double.

Pipeline Dataflow qui imbrique les enregistrements de l'artiste dans chaque ligne d'artiste.

La représentation actuelle des données est relativement plate. En effet, elle comprend une ligne par enregistrement crédité, qui inclut toutes les métadonnées de l'artiste tirées du schéma BigQuery ainsi que toutes les métadonnées de l'enregistrement et de artist_credit_name. Cette représentation plate présente au moins deux inconvénients :

  • Elle répète les métadonnées artist pour chaque enregistrement porté au crédit d'un artiste, ce qui augmente l'espace de stockage nécessaire.
  • Lorsque vous exportez les données au format JSON, cette représentation exporte un tableau qui répète ces données, plutôt qu'un artiste avec les données d'enregistrement imbriquées (ce que vous souhaitez certainement obtenir).

Sans affecter les performances et sans utiliser de stockage supplémentaire, plutôt que de stocker un enregistrement par ligne, vous pouvez stocker les enregistrements sous la forme d'un champ répété dans chaque enregistrement d'artiste en apportant des modifications relativement simples au pipeline Dataflow.

Au lieu de joindre les enregistrements et leurs informations d'artiste selon artist_credit_name.artist, le pipeline modifié crée une liste imbriquée d'enregistrements dans un objet d'artiste.

public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                      PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                      String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(){};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(){};

  PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply("merge join results " + nestingKey,
                            MapElements
                             .into(new TypeDescriptor<MusicBrainzDataObject>() {})
                             .via((KV<Long, CoGbkResult> group) -> {
                                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                                List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
                                children.forEach(childList::add);
                                parentObject = parentObject.duplicate();
                                parentObject.addColumnValue("recordings", childList);
                                return parentObject;
                              })
                         );
}

TableRow présente des limites de taille dans l'API BigQuery. Ainsi, le code limite le nombre d'enregistrements imbriqués pour un enregistrement donné à 1 000 éléments. Si un artiste possède plus de 1 000 enregistrements, le code duplique la ligne, y compris les métadonnées artist, et continue l'imbrication des données d'enregistrement dans la ligne dupliquée.

private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<TableRow>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }

    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
      if (nestedList.size() > 0) {
        if (parentRow.get(key) == null) {
          parentRow.set(key, new ArrayList<TableRow>());
        }
        List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
        childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
      }
    });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

Le diagramme montre les sources, les transformations et les récepteurs du pipeline.

Pipeline optimisé avec des sources, des transformations et des récepteurs.

Dans la plupart des cas, les noms des étapes sont fournis dans le code lors de l'appel de la méthode apply.

Pour créer ce pipeline optimisé, procédez comme suit :

  1. Dans Cloud Shell, assurez-vous que l'environnement est correctement configuré pour le script de pipeline :

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export SERVICE_ACCOUNT=project-owner
    
  2. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service :

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  3. Exécutez le pipeline pour imbriquer des lignes d'enregistrement dans des lignes d'artiste :

    ./run.sh nested
    
  4. Interrogez les champs de la table imbriquée dans BigQuery :

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Résultats de la requête de table imbriquée.

  5. Exécutez une requête pour extraire des valeurs de la structure STRUCT et utilisez ces valeurs pour filtrer les résultats :

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    Requête de filtrage des résultats.

Nettoyer

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud Platform, procédez comme suit :

Supprimer le projet

  1. Dans Cloud Console, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer .
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer des ressources individuelles

Suivez les étapes ci-dessous pour supprimer des ressources individuelles au lieu de supprimer tout le projet.

Supprimer le bucket Cloud Storage

  1. Dans Cloud Console, accédez à la page Navigateur Cloud Storage.

    Accéder au navigateur Cloud Storage

  2. Cochez la case correspondant au bucket que vous souhaitez supprimer.
  3. Pour supprimer le bucket, cliquez sur Supprimer .

Supprimer les ensembles de données BigQuery

  1. Ouvrez l'interface utilisateur Web BigQuery.

    Ouvrir BIGQUERY

  2. Sélectionnez les ensembles de données BigQuery que vous avez créés au cours du tutoriel.

  3. Cliquez sur Supprimer.

Étapes suivantes