Exécuter des opérations ETL à partir d'une base de données relationnelle dans BigQuery à l'aide de Dataflow

Last reviewed 2022-08-21 UTC

Ce tutoriel explique comment utiliser Dataflow pour extraire, transformer et charger des données (ETL, "extract, transform and load") à partir d'une base de données relationnelle de traitement des transactions en ligne (OLTP, "online transaction processing") vers BigQuery à des fins d'analyse.

Ce tutoriel est destiné aux administrateurs de bases de données, aux responsables d'exploitation et aux architectes cloud qui souhaitent bénéficier des capacités de requêtes analytiques de BigQuery et des fonctionnalités de traitement par lots de Dataflow.

Les bases de données OLTP sont souvent des bases de données relationnelles qui stockent des informations et traitent des transactions pour des sites d'e-commerce, des applications SaaS (Software as a Service) ou encore des jeux. Elles sont généralement optimisées pour les transactions présentant des schémas hautement normalisés et exigeant des propriétés ACID : atomicité, cohérence, isolation et durabilité. En revanche, les entrepôts de données sont généralement optimisés pour l'extraction et l'analyse des données plutôt que pour les transactions, et possèdent donc des schémas dénormalisés. Généralement, la dénormalisation des données à partir d'une base de données OLTP facilite leur analyse dans BigQuery.

Objectifs

Le tutoriel présente deux approches pour l'ETL de données de SGBDR normalisés vers des données BigQuery dénormalisées :

  • Utiliser BigQuery pour charger et transformer les données. Utilisez cette approche pour le chargement ponctuel de petits volumes de données dans BigQuery à des fins d'analyse. Vous pouvez également utiliser cette approche pour prototyper votre ensemble de données avant d'utiliser l'automatisation avec des ensembles de données plus volumineux ou plusieurs ensembles de données.
  • Utiliser Dataflow pour charger, transformer et nettoyer les données : Utilisez cette approche pour charger des données plus volumineuses ou provenant de plusieurs sources, ou pour charger des données de manière incrémentielle ou automatique.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  4. Activer les API Compute Engine et Dataflow.

    Activer les API

  5. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  6. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  7. Activer les API Compute Engine et Dataflow.

    Activer les API

Utiliser l'ensemble de données MusicBrainz

Ce tutoriel s'appuie sur des instantanés JSON de tables figurant dans la base de données MusicBrainz, construite en PostgreSQL et contenant des informations sur l'ensemble de la musique répertoriée dans MusicBrainz. Parmi les éléments du schéma MusicBrainz, on trouve les suivants :

  • Artistes
  • Groupes de parution
  • Versions
  • Enregistrements
  • Travaux
  • Labels
  • Différentes relations entre ces entités

Le schéma MusicBrainz comprend trois tables pertinentes : artist, recording et artist_credit_name. Un crédit d'artiste artist_credit représente le crédit accordé à l'artiste pour un enregistrement, et les lignes artist_credit_name associent l'enregistrement à l'artiste correspondant via la valeur artist_credit.

Ce tutoriel fournit les tables PostgreSQL déjà extraites au format JSON délimité par un retour à la ligne et stockées dans le bucket Cloud Storage public : gs://solutions-public-assets/bqetl.

Si vous souhaitez effectuer cette étape vous-même, vous devez disposer d'une base de données PostgreSQL contenant l'ensemble de données MusicBrainz et utiliser les commandes suivantes pour exporter chacune des tables :

host=POSTGRES_HOST
user=POSTGRES_USER
database=POSTGRES_DATABASE

for table in artist recording artist_credit_name
do
    pg_cmd="\\copy (select row_to_json(r) from (select * from ${table}) r ) to exported_${table}.json"
    psql -w -h ${host} -U ${user} -d ${db} -c $pg_cmd
    # clean up extra '\' characters
    sed -i -e 's/\\\\/\\/g' exported_${table}.json
done

Approche 1 : ETL avec BigQuery

Utilisez cette approche pour charger une petite quantité de données dans BigQuery à des fins d'analyse. Vous pouvez également utiliser cette approche pour prototyper votre ensemble de données avant d'utiliser l'automatisation avec des ensembles de données plus volumineux ou plusieurs ensembles de données.

Créer un ensemble de données BigQuery

Pour créer un ensemble de données BigQuery, chargez les tables MusicBrainz une par une dans BigQuery, puis joignez les tables chargées de sorte que chaque ligne contienne l'association de données souhaitée. Stockez les résultats de la jointure dans une nouvelle table BigQuery. Vous pouvez ensuite supprimer les tables d'origine que vous avez chargées.

  1. Dans la console Google Cloud, ouvrez BigQuery.

    OUVRIR BIGQUERY

  2. Dans le panneau Explorateur, cliquez sur le menu à côté du nom de votre projet, puis sur Créer un ensemble de données.

  3. Dans la boîte de dialogue Créer un ensemble de données, effectuez les opérations suivantes :

    1. Dans le champ ID de l'ensemble de données, saisissez musicbrainz.
    2. Définissez le paramètre Emplacement des données sur us.
    3. Cliquez sur Créer un ensemble de données.

Importer des tables MusicBrainz

Pour chaque table MusicBrainz, suivez les étapes ci-dessous pour ajouter une table à l'ensemble de données que vous avez créé :

  1. Dans le panneau Explorateur de BigQuery, développez la ligne contenant le nom de votre projet pour afficher le nouvel ensemble de données musicbrainz.
  2. Cliquez sur le menu à côté de l'ensemble de données musicbrainz, puis sur Créer une table.
  3. Dans la boîte de dialogue Créer une table, procédez comme suit :

    1. Dans la liste déroulante Créer une table à partir de, sélectionnez Google Cloud Storage.
    2. Dans le champ Sélectionner un fichier depuis le bucket GCS, saisissez le chemin d'accès au fichier de données :

      solutions-public-assets/bqetl/artist.json
      
    3. Pour le champ Format de fichier, sélectionnez JSONL (fichier JSON délimité par un retour à la ligne).

    4. Assurez-vous que le champ Projet contient le nom de votre projet.

    5. Assurez-vous que l'ensemble de données est défini sur musicbrainz.

    6. Pour Table, saisissez le nom de la table, artist.

    7. Pour Table type (Type de table), laissez l'option Native table (Table native) sélectionnée.

    8. Sous la section Schema (Schéma), cliquez sur Edit as Text (Modifier sous forme de texte).

    9. Téléchargez le fichier de schéma artist et ouvrez-le dans un éditeur ou une visionneuse de texte.

    10. Remplacez le contenu de la section Schema (Schéma) par le contenu du fichier de schéma que vous venez de télécharger.

    11. Cliquez sur Créer une table.

  4. Attendez quelques instants que le chargement soit terminé.

  5. Une fois le chargement terminé, la nouvelle table apparaît sous l'ensemble de données.

  6. Répétez les étapes 1 à 5 pour créer la table artist_credit_name avec les modifications suivantes :

    • Utilisez le chemin suivant pour le fichier de données source :

      solutions-public-assets/bqetl/artist_credit_name.json
      
    • Utilisez artist_credit_name comme nom de table.

    • Téléchargez le fichier de schéma artist_credit_name et utilisez son contenu pour le schéma.

  7. Répétez les étapes 1 à 5 pour créer la table recording avec les modifications suivantes :

    • Utilisez le chemin suivant pour le fichier de données source :

      solutions-public-assets/bqetl/recording.json
      
    • Utilisez recording comme nom de table.

    • Téléchargez le fichier de schéma recording et utilisez son contenu pour le schéma.

Dénormaliser manuellement les données

Pour dénormaliser les données, joignez-les dans une nouvelle table BigQuery comportant une ligne pour chaque enregistrement d'artiste, avec les métadonnées sélectionnées que vous souhaitez conserver pour l'analyse.

  1. Si l'éditeur de requête BigQuery n'est pas ouvert dans la console Google Cloud, cliquez sur Saisir une nouvelle requête.
  2. Copiez la requête suivante et collez-la dans l'éditeur de requête :

    SELECT
        artist.id,
        artist.gid AS artist_gid,
        artist.name AS artist_name,
        artist.area,
        recording.name AS recording_name,
        recording.length,
        recording.gid AS recording_gid,
        recording.video
    FROM
        `musicbrainz.artist` AS artist
    INNER JOIN
        `musicbrainz.artist_credit_name` AS artist_credit_name
    ON
        artist.id = artist_credit_name.artist
    INNER JOIN
        `musicbrainz.recording` AS recording
    ON
        artist_credit_name.artist_credit = recording.artist_credit
    
  3. Cliquez sur la liste déroulante Plus, puis sélectionnez Paramètres de requête.

  4. Dans la boîte de dialogue Paramètres de requête, procédez comme suit :

    1. Cochez la case Set a destination table for query results (Définir une table de destination pour les résultats de la requête).
    2. Dans Ensemble de données, saisissez musicbrainz et sélectionnez l'ensemble de données dans votre projet.
    3. Dans le champ ID de la table, saisissez recordings_by_artists_manual.
    4. Dans la section Destination table write preference (Préférence d'écriture pour la table de destination), choisissez Overwrite table (Écraser la table).
    5. Cochez la case Allow Large Results (no size limit) (Autoriser un nombre élevé de résultats (aucune limite)).
    6. Cliquez sur Save (Enregistrer).
  5. Cliquez sur Exécuter.

    À l'issue de la requête, les données sont triées par chanson pour chaque artiste dans la nouvelle table BigQuery, et un échantillon des résultats est affiché dans le volet Résultats de la requête, similaire à celui-ci :

    Row id artist_gid artist_name area recording_name length recording_gid vidéo
    1 97546 125ec42a... unknown 240 Horo Gun Toireamaid Hùgan Fhathast Air 174106 c8bbe048... FALSE
    2 266317 2e7119b5... Capella Istropolitana 189 Concerto Grosso in D minor, op. 2 no. 3: II. Adagio 134000 af0f294d... FALSE
    3 628060 34cd3689... Conspirare 5196 Liturgy, op. 42: 9. Praise the Lord from the Heavens 126933 8bab920d... FALSE
    4 423877 54401795... Boys Air Choir 1178 Nunc Dimittis 190000 111611eb... FALSE
    5 394456 9914f9f9... L’Orchestre de la Suisse Romande 23036 Concert Waltz no. 2, op. 51 509960 b16742d1... FALSE

Approche 2 : ETL dans BigQuery avec Dataflow

Dans cette section du tutoriel, au lieu d'utiliser l'interface utilisateur BigQuery, vous utilisez un exemple de programme pour charger des données dans BigQuery à l'aide d'un pipeline Dataflow. Vous utilisez ensuite le modèle de programmation de Beam pour dénormaliser et nettoyer les données à charger dans BigQuery.

Avant de commencer, prenez un moment pour examiner les concepts et l'exemple de code.

Examiner les concepts

Bien que les données soient d'un volume réduit et qu'elles puissent être importées rapidement à l'aide de l'interface utilisateur de BigQuery, vous pouvez aussi utiliser Dataflow pour l'ETL dans le cadre de ce tutoriel. Utilisez Dataflow pour l'ETL dans BigQuery plutôt que l'interface utilisateur de BigQuery lorsque vous effectuez des jointures massives (entre 500 et 5 000 colonnes représentant plus de 10 To de données), avec les objectifs suivants :

  • Vous souhaitez nettoyer ou transformer vos données à mesure qu'elles sont chargées dans BigQuery, au lieu de les stocker et de les joindre ultérieurement. Cette approche a pour effet de réduire les besoins en stockage car les données ne sont stockées dans BigQuery que dans leur état joint et transformé.
  • Vous souhaitez effectuer un nettoyage personnalisé des données (qui ne peut être réalisé de manière simple avec SQL).
  • Vous souhaitez combiner les données avec des données extérieures à OLTP, telles que des journaux ou des données accessibles à distance, durant le processus de chargement.
  • Vous souhaitez automatiser les tests et le déploiement de la logique de chargement des données à l'aide d'une intégration ou d'un déploiement continus (CI/CD).
  • Vous prévoyez une itération graduelle et une amélioration du processus ETL au fil du temps.
  • Vous souhaitez ajouter des données de manière incrémentielle, plutôt qu'en suivant un processus ETL unique.

Voici un diagramme du pipeline de données créé par l'exemple de programme :

Pipeline de données utilisant BigQuery.

Dans l'exemple de code, de nombreuses étapes du pipeline sont regroupées et/ou encapsulées dans des méthodes pratiques, nommées et réutilisées. Dans le diagramme, les étapes réutilisées sont signalées par des bordures en pointillés.

Examiner le code du pipeline

Le code crée un pipeline qui effectue les étapes suivantes :

  1. Chaque table que vous voulez intégrer à la jointure du bucket Cloud Storage public est chargée dans une collection PCollection de chaînes. Chaque élément comprend la représentation JSON d'une ligne de la table.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Les chaînes JSON sont converties en représentations d'objets MusicBrainzDataObject, puis les représentations d'objets sont organisées suivant l'une des valeurs de colonne, par exemple une clé primaire ou étrangère.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(
        PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply(
          "load " + name,
          MapElements.into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                    Long key = (Long) datum.getColumnValue(namespacedKeyname);
                    return KV.of(key, datum);
                  }));
    }
  3. La liste fait l'objet d'une jointure sur la base de l'artiste commun. artist_credit_name associe un crédit d'artiste à son enregistrement et inclut la clé étrangère d'artiste. La table artist_credit_name est chargée en tant que liste d'objets KV (Key Value). Le membre K correspond à l'artiste.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. La liste fait l'objet d'une jointure à l'aide de la méthode MusicBrainzTransforms.innerJoin().

    public static PCollection<MusicBrainzDataObject> innerJoin(
        String name,
        PCollection<KV<Long, MusicBrainzDataObject>> table1,
        PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>() {};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>() {};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Cette méthode regroupe les collections d'objets KV selon le membre clé que vous souhaitez joindre. Cela produit une collection PCollection d'objets KV avec une clé longue (la valeur de colonne artist.id) et l'objet CoGbkResult (combinaison des groupes par résultat clé) résultant. L'objet CoGbkResult est un tuple de listes d'objets avec la valeur de clé commune des première et deuxième collections PCollections. Ce tuple est adressable à l'aide du tag tuple formulé pour chaque collection PCollection avant l'exécution de l'opération de regroupement CoGroupByKey dans la méthode group.
    2. La méthode fusionne chaque correspondance d'objets dans un objet MusicBrainzDataObject qui représente un résultat de jointure.

      PCollection<List<MusicBrainzDataObject>> mergedResult =
          joinedResult.apply(
              "merge join results",
              MapElements.into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                  .via(
                      (KV<Long, CoGbkResult> group) -> {
                        List<MusicBrainzDataObject> result = new ArrayList<>();
                        Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                        Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                        leftObjects.forEach(
                            (MusicBrainzDataObject l) ->
                                rightObjects.forEach(
                                    (MusicBrainzDataObject r) -> result.add(l.duplicate().merge(r))));
                        return result;
                      }));
    3. La méthode réorganise la collection dans une liste d'objets KV pour commencer la jointure suivante. Cette fois, la valeur K correspond à la colonne artist_credit, utilisée pour joindre la table d'enregistrement.

      PCollection<KV<Long, MusicBrainzDataObject>> artistCreditNamesByArtistCredit =
          MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. La méthode obtient la collection finale résultante d'objets MusicBrainzDataObject en joignant ce résultat à la collection chargée des enregistrements organisés par artist_credit.id.

      PCollection<MusicBrainzDataObject> artistRecordings =
          MusicBrainzTransforms.innerJoin(
              "joined recordings", artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. La méthode mappe les objets MusicBrainzDataObjects obtenus dans TableRows.

      PCollection<TableRow> tableRows =
          MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Elle écrit les lignes TableRows résultantes dans BigQuery.

      tableRows.apply(
          "Write to BigQuery",
          BigQueryIO.writeTableRows()
              .to(options.getBigQueryTablename())
              .withSchema(bqTableSchema)
              .withCustomGcsTempLocation(StaticValueProvider.of(options.getTempLocation()))
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Pour plus d'informations sur les mécanismes de programmation de pipeline Beam, consultez les rubriques spécifiques suivantes sur le modèle de programmation :

Après avoir examiné les étapes effectuées par le code, vous pouvez exécuter le pipeline.

Créer un bucket Cloud Storage

Exécuter le code de pipeline

  1. Dans Google Cloud Console, ouvrez Cloud Shell.

    Ouvrir Cloud Shell

  2. Définir les variables d'environnement pour le projet et le script de pipeline

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export DATASET=musicbrainz
    

    Remplacez PROJECT_ID par l'ID de votre projet Google Cloud.

  3. Assurez-vous que gcloud utilise bien le projet que vous avez créé ou sélectionné au début de ce tutoriel :

    gcloud config set project $PROJECT_ID
    
  4. Conformément au principe de sécurité du moindre privilège, créez un compte de service pour le pipeline Dataflow et ne lui accordez que les privilèges nécessaires : roles/dataflow.worker, roles/bigquery.jobUser, et le rôle dataEditor sur l'ensemble de données musicbrainz :

    gcloud iam service-accounts create musicbrainz-dataflow
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/dataflow.worker
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/bigquery.jobUser
    bq query  --use_legacy_sql=false \
        "GRANT \`roles/bigquery.dataEditor\` ON SCHEMA musicbrainz
         TO 'serviceAccount:${SERVICE_ACCOUNT}'"
    
  5. Créez un bucket pour le pipeline Dataflow afin de l'utiliser pour les fichiers temporaires et accordez sur celui-ci, au compte de service musicbrainz-dataflow, les privilèges Owner :

    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    gsutil mb -l us ${DATAFLOW_TEMP_BUCKET}
    gsutil acl ch -u ${SERVICE_ACCOUNT}:O ${DATAFLOW_TEMP_BUCKET}
    
  6. Clonez le dépôt contenant le code Dataflow :

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  7. Remplacez le répertoire par celui de l'exemple :

    cd bigquery-etl-dataflow-sample
    
  8. Compilez et exécutez la tâche Dataflow :

    ./run.sh simple
    

    L'exécution de la tâche devrait prendre environ 10 minutes.

  9. Pour voir la progression du pipeline, accédez à la page Dataflow dans la console Google Cloud.

    Accéder à Dataflow

    L'état de chaque tâche est indiqué dans la colonne d'état. L'état Réussie indique que la tâche est terminée.

  10. (Facultatif) Pour afficher le graphique de la tâche, ainsi que des informations détaillées sur ses étapes, cliquez sur son nom, par exemple etl-into-bigquery-bqetlsimple.

  11. Une fois la tâche terminée, accédez à la page BigQuery.

    Accéder à BigQuery

  12. Pour exécuter une requête sur la nouvelle table, dans le volet de l'éditeur de requête, saisissez les éléments suivants :

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Le volet des résultats affiche un ensemble de résultats semblable à celui-ci :

    Row artist_name artist_gender artist_area recording_name recording_length
    1 mirin 2 107 Sylphia 264000
    2 mirin 2 107 Dependence 208000
    3 Gaudiburschen 1 81 Die Hände zum Himmel 210000
    4 Sa4 1 331 Ein Tag aus meiner Sicht 221000
    5 Dpat 1 7326 Cutthroat 249000
    6 Dpat 1 7326 Deloused 178000

    Le résultat réel peut différer, car les résultats ne sont pas triés.

Nettoyer les données

L'étape suivante consiste à modifier légèrement le pipeline Dataflow afin de pouvoir charger des tables de recherche et de les traiter en tant qu'entrées secondaires, comme illustré dans le diagramme suivant.

Pipeline Dataflow mis à jour pour les entrées secondaires.

Lorsque vous interrogez la table BigQuery résultante, il est difficile d'identifier l'origine géographique de l'artiste sans avoir à rechercher manuellement l'identifiant numérique de la région dans la table area de la base de données MusicBrainz. Ainsi, l'analyse des résultats de la requête n'est pas aussi directe qu'elle pourrait l'être.

De même, le genre des artistes est indiqué en tant qu'ID, or la table de genre MusicBrainz ne comprend que trois lignes. Pour résoudre ce problème, vous pouvez ajouter une étape dans le pipeline Dataflow afin d'utiliser les tables area et gender de MusicBrainz pour mapper les ID aux étiquettes correspondantes.

Les tables artist_area et artist_gender contiennent un nombre de lignes nettement inférieur à celui des tables des artistes ou des données relatives aux enregistrements. Le nombre d'éléments dans ces tables est limité, respectivement, par le nombre de zones géographiques ou de sexes.

En conséquence, l'étape de recherche utilise la fonctionnalité d'entrée secondaire de Dataflow.

Les entrées secondaires sont chargées en tant qu'exportations de fichiers JSON délimitées par des lignes dans le bucket Cloud Storage public contenant l'ensemble de données MusicBrainz. Elles sont utilisées pour dénormaliser les données de table en une seule étape.

Examiner le code ajoutant les entrées secondaires au pipeline

Avant d'exécuter le pipeline, consultez son code pour bien comprendre les nouvelles étapes.

Ce code illustre le nettoyage des données avec des entrées secondaires. La classe MusicBrainzTransforms permet d'utiliser plus facilement les entrées secondaires pour mapper les valeurs de clés étrangères avec des étiquettes. La bibliothèque MusicBrainzTransforms fournit par exemple une méthode permettant de créer une classe de recherche interne. La classe de recherche décrit chaque table de conversion et les champs qui doivent être remplacés par des étiquettes et des arguments de longueur variable. keyKey est le nom de la colonne qui contient la clé pour la recherche et valueKey est le nom de la colonne qui contient l'étiquette correspondante.

public static LookupDescription lookup(
    String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Chaque entrée secondaire est chargée sous la forme d'un objet de mappage unique, utilisé pour rechercher l'étiquette correspondant à un ID.

Le fichier JSON du tableau de conversion est initialement chargé dans MusicBrainzDataObjects avec un espace de noms vide, puis il est transformé en mappage de la valeur de la colonne Key à la valeur de la colonne Value.

public static PCollectionView<Map<Long, String>> loadMapFromText(
    PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries =
      text.apply(
          "sideInput_" + name,
          MapElements.into(new TypeDescriptor<KV<Long, String>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject object = JSONReader.readObject(name, input);
                    Long key = (Long) object.getColumnValue(keyKeyName);

                    String value = (String) object.getColumnValue(valueKeyName);
                    return KV.of(key, value);
                  }));

  return entries.apply(View.asMap());
}

Chacun de ces objets Map est placé dans une Map suivant la valeur de sa clé de destination destinationKey, qui est la clé à remplacer par les valeurs recherchées.

List<SimpleEntry<List<String>, PCollectionView<Map<Long, String>>>> mapSideInputs =
    new ArrayList<>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView =
      loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
          .map(destinationKey -> name + "_" + destinationKey)
          .collect(Collectors.toList());

  mapSideInputs.add(new SimpleEntry<>(destKeyList, mapView));
}

Puis, lors de la transformation des objets d'artiste à partir de JSON, la valeur de la clé destinationKey (qui commence par un nombre) est remplacée par son étiquette.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach(
    (String key) -> {
      Long id = (Long) result.getColumnValue(key);
      if (id != null) {
        String label = sideInputMap.get(id);
        if (label == null) {
          label = "" + id;
        }
        result.replace(key, label);

Pour ajouter le décodage des champs artist_area et artist_gender, procédez comme suit :

  1. Dans Cloud Shell, assurez-vous que l'environnement est correctement configuré pour le script de pipeline :

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    

    Remplacez PROJECT_ID par l'ID de votre projet Google Cloud.

  2. Exécutez le pipeline pour créer la table avec la zone décodée et le genre de l'artiste :

    ./run.sh simple-with-lookups
    
  3. Comme précédemment, pour voir la progression du pipeline, accédez à la page Dataflow.

    Accéder à Dataflow

    L'exécution du pipeline va prendre environ 10 minutes.

  4. Une fois la tâche terminée, accédez à la page BigQuery.

    Accéder à BigQuery

  5. Envoyez la même requête, qui inclut artist_area et artist_gender :

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
      FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
     WHERE artist_area is NOT NULL
       AND artist_gender IS NOT NULL
     LIMIT 1000;
    

    Dans la sortie, les éléments artist_area et artist_gender sont maintenant décodés :

    Row artist_name artist_gender artist_area recording_name recording_length
    1 mirin Female Japan Sylphia 264000
    2 mirin Female Japan Dependence 208000
    3 Gaudiburschen Male Germany Die Hände zum Himmel 210000
    4 Sa4 Male Hamburg Ein Tag aus meiner Sicht 221000
    5 Dpat Male Houston Cutthroat 249000
    6 Dpat Male Houston Deloused 178000

    Le résultat réel peut différer, car les résultats ne sont pas triés.

Optimiser le schéma BigQuery

Dans la dernière partie de ce tutoriel, vous allez exécuter un pipeline générant un schéma de table plus optimal grâce à des champs imbriqués.

Prenez le temps d'examiner le code utilisé pour générer cette version optimisée de la table.

Le diagramme suivant illustre un pipeline Dataflow légèrement différent, qui imbrique les enregistrements des artistes dans chaque ligne d'artiste au lieu de créer des lignes d'artiste en double.

Pipeline Dataflow qui imbrique les enregistrements de l'artiste dans chaque ligne d'artiste.

La représentation actuelle des données est relativement plate. En effet, elle comprend une ligne par enregistrement crédité, qui inclut toutes les métadonnées de l'artiste tirées du schéma BigQuery ainsi que toutes les métadonnées de l'enregistrement et de artist_credit_name. Cette représentation plate présente au moins deux inconvénients :

  • Elle répète les métadonnées artist pour chaque enregistrement porté au crédit d'un artiste, ce qui augmente l'espace de stockage nécessaire.
  • Lorsque vous exportez les données au format JSON, cette représentation exporte un tableau qui répète ces données, plutôt qu'un artiste avec les données d'enregistrement imbriquées (ce que vous souhaitez certainement obtenir).

Sans affecter les performances et sans utiliser de stockage supplémentaire, plutôt que de stocker un enregistrement par ligne, vous pouvez stocker les enregistrements sous la forme d'un champ répété dans chaque enregistrement d'artiste en apportant des modifications relativement simples au pipeline Dataflow.

Au lieu de joindre les enregistrements et leurs informations d'artiste selon artist_credit_name.artist, le pipeline modifié crée une liste imbriquée d'enregistrements dans un objet d'artiste.

public static PCollection<MusicBrainzDataObject> nest(
    PCollection<KV<Long, MusicBrainzDataObject>> parent,
    PCollection<KV<Long, MusicBrainzDataObject>> child,
    String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>() {};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>() {};

  PCollection<KV<Long, CoGbkResult>> joinedResult =
      group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply(
      "merge join results " + nestingKey,
      MapElements.into(new TypeDescriptor<MusicBrainzDataObject>() {})
          .via(
              (KV<Long, CoGbkResult> group) -> {
                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                List<MusicBrainzDataObject> childList = new ArrayList<>();
                children.forEach(childList::add);
                parentObject = parentObject.duplicate();
                parentObject.addColumnValue("recordings", childList);
                return parentObject;
              }));
}

L'API BigQuery a une limite maximale de taille de ligne de 100 Mo pour les insertions groupées (10 Mo pour les insertions en flux continu). Le code limite donc le nombre d'enregistrements imbriqués pour un enregistrement donné à 1 000 éléments, pour vous assurer de ne pas dépasser cette limite. Si un artiste possède plus de 1 000 enregistrements, le code duplique la ligne, y compris les métadonnées artist, et continue l'imbrication des données d'enregistrement dans la ligne dupliquée.

private static List<TableRow> toTableRows(
    MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        @SuppressWarnings("unchecked")
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }
    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting
   * limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach(
        (String key, List<MusicBrainzDataObject> nestedList) -> {
          if (nestedList.size() > 0) {
            if (parentRow.get(key) == null) {
              parentRow.set(key, new ArrayList<TableRow>());
            }
            @SuppressWarnings("unchecked")
            List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
            @SuppressWarnings("unchecked")
            Map<String, Object> map = (Map<String, Object>) serializableSchema.get(key);
            childRows.add(toChildRow(nestedList.remove(0), map));
          }
        });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

Le diagramme montre les sources, les transformations et les récepteurs du pipeline.

Pipeline optimisé avec des sources, des transformations et des récepteurs.

Dans la plupart des cas, les noms des étapes sont fournis dans le code lors de l'appel de la méthode apply.

Pour créer ce pipeline optimisé, procédez comme suit :

  1. Dans Cloud Shell, assurez-vous que l'environnement est correctement configuré pour le script de pipeline :

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    
  2. Exécutez le pipeline pour imbriquer des lignes d'enregistrement dans des lignes d'artiste :

    ./run.sh nested
    
  3. Comme précédemment, pour voir la progression du pipeline, accédez à la page Dataflow.

    Accéder à Dataflow

    L'exécution du pipeline va prendre environ 10 minutes.

  4. Une fois la tâche terminée, accédez à la page BigQuery.

    Accéder à BigQuery

  5. Interrogez les champs de la table imbriquée dans BigQuery :

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Dans le résultat, les champs artist_recordings sont affichés sous forme de lignes imbriquées pouvant être développées :

    Row artist_name artist_gender artist_area artist_recordings
    1 mirin Female Japan (5 rows)
    3 Gaudiburschen Male Germany (1 row)
    4 Sa4 Male Hamburg (10 rows)
    6 Dpat Male Houston (9 rows)

    Le résultat réel peut différer, car les résultats ne sont pas triés.

  6. Exécutez une requête pour extraire des valeurs de la structure STRUCT et utilisez ces valeurs pour filtrer les résultats, par exemple pour les artistes dont les enregistrements contiennent le mot "Justin" :

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    Dans le résultat, les champs artist_credit_name_name et recording_name sont affichés sous forme de lignes imbriquées pouvant être développées, par exemple :

    Row artist_name artist_gender artist_area artist_credit_name_name recording_name
    1 Damonkenutz null null (1 row) 1 Yellowpants (Justin Martin remix)
    3 Fabian Male Germany (10+ rows) 1 Heatwave
    . 2 Starlight Love
    . 3 Dreams To Wishes
    . 4 Last Flight (Justin Faust remix)
    . ...
    4 Digital Punk Boys null null (6 rows) 1 Come True
    . 2 We Are... (Punkgirlz remix by Justin Famous)
    . 3 Chaos (short cut)
    . ...

    Le résultat réel peut différer, car les résultats ne sont pas triés.

Effectuer un nettoyage

Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

Supprimer le projet

  1. Dans la console Google Cloud, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer des ressources individuelles

Suivez les étapes ci-dessous pour supprimer des ressources individuelles au lieu de supprimer tout le projet.

Supprimer le bucket Cloud Storage

  1. Dans la console Google Cloud, accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  2. Cochez la case correspondant au bucket que vous souhaitez supprimer.
  3. Pour supprimer le bucket, cliquez sur Supprimer , puis suivez les instructions.

Supprimer les ensembles de données BigQuery

  1. Ouvrez l'interface utilisateur Web BigQuery.

    Ouvrir BIGQUERY

  2. Sélectionnez les ensembles de données BigQuery que vous avez créés au cours du tutoriel.

  3. Cliquez sur Supprimer.

Étape suivante