Joindre un flux de données à l'aide de Dataflow SQL


Ce tutoriel explique comment joindre un flux de données de Pub/Sub aux données d'une table BigQuery à l'aide de Dataflow SQL.

Objectifs

Dans ce tutoriel, vous allez :

  • écrire une requête Dataflow SQL pour joindre un flux de données Pub/Sub aux données d'une table BigQuery ;
  • déployer une tâche Dataflow à partir de l'interface utilisateur Dataflow SQL.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  4. Activer les API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager et Data Catalog. .

    Activer les API

  5. Créez un compte de service :

    1. Dans la console Google Cloud, accédez à la page Créer un compte de service.

      Accéder à la page "Créer un compte de service"
    2. Sélectionnez votre projet.
    3. Dans le champ Nom du compte de service, saisissez un nom. La console Google Cloud remplit le champ ID du compte de service en fonction de ce nom.

      Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

    4. Cliquez sur Créer et continuer.
    5. Attribuez le rôle Project > Owner au compte de service.

      Pour accorder le rôle, trouvez la liste Sélectionner un rôle, puis sélectionnez Project > Owner.

    6. Cliquez sur Continuer.
    7. Cliquez sur OK pour terminer la création du compte de service.

      Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

  6. Créez une clé de compte de service :

    1. Dans la console Google Cloud, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
    2. Cliquez sur Keys (Clés).
    3. Cliquez sur Ajouter une clé, puis sur Créer une clé.
    4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
    5. Cliquez sur Close (Fermer).
  7. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS sur le chemin d'accès du fichier JSON contenant vos identifiants. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  8. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  9. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  10. Activer les API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager et Data Catalog. .

    Activer les API

  11. Créez un compte de service :

    1. Dans la console Google Cloud, accédez à la page Créer un compte de service.

      Accéder à la page "Créer un compte de service"
    2. Sélectionnez votre projet.
    3. Dans le champ Nom du compte de service, saisissez un nom. La console Google Cloud remplit le champ ID du compte de service en fonction de ce nom.

      Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

    4. Cliquez sur Créer et continuer.
    5. Attribuez le rôle Project > Owner au compte de service.

      Pour accorder le rôle, trouvez la liste Sélectionner un rôle, puis sélectionnez Project > Owner.

    6. Cliquez sur Continuer.
    7. Cliquez sur OK pour terminer la création du compte de service.

      Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

  12. Créez une clé de compte de service :

    1. Dans la console Google Cloud, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
    2. Cliquez sur Keys (Clés).
    3. Cliquez sur Ajouter une clé, puis sur Créer une clé.
    4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
    5. Cliquez sur Close (Fermer).
  13. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS sur le chemin d'accès du fichier JSON contenant vos identifiants. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  14. Installer et initialiser gcloud CLI Choisissez l'une des options d'installation. Il vous faudra peut-être définir la propriété project sur le projet utilisé pour ce tutoriel.
  15. Accédez à l'interface utilisateur Web de Dataflow SQL dans la console Google Cloud. Le projet le plus récemment utilisé s'ouvre. Pour changer de projet, cliquez sur le nom du projet dans la partie supérieure de l'interface utilisateur Web de Dataflow SQL, et cherchez le projet que vous souhaitez utiliser.
    Accéder à l'UI Web de Dataflow SQL

Créer des sources pour l'exemple présenté dans le tutoriel

Si vous souhaitez suivre l'exemple présenté dans ce tutoriel, créez les sources suivantes afin de les utiliser au cours des différentes étapes du tutoriel.

  • Un sujet Pub/Sub appelé transactions - Flux de données de transaction provenant d'un abonnement à un sujet Pub/Sub. Les données de chaque transaction comprennent des informations telles que le produit acheté, le prix d'achat, le lieu d'achat (ville et État). Après avoir créé le sujet Pub/Sub, créez un script qui publie des messages sur le sujet. Ce script sera exécuté au cours d'une étape ultérieure du tutoriel.
  • Une table BigQuery appelée us_state_salesregions - Table fournissant un mappage des États avec les régions de vente. Avant de créer cette table, il vous faudra créer un ensemble de données BigQuery.

Attribuer un schéma au sujet Pub/Sub

L'attribution d'un schéma vous permet d'exécuter des requêtes SQL sur les données de votre sujet Pub/Sub. Dataflow SQL attend des messages sérialisés au format JSON dans les sujets Pub/Sub.

Pour attribuer un schéma aux transactions de l'exemple de sujet Pub/Sub :

  1. Créez un fichier texte et nommez-le transactions_schema.yaml. Copiez le schéma suivant et collez-le dans transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Attribuez le schéma à l'aide de Google Cloud CLI.

    a. Mettez à jour gcloud CLI à l'aide de la commande suivante. Assurez-vous que gcloud CLI est de version 242.0.0 ou ultérieure.

      gcloud components update
    

    b. Exécutez la commande suivante dans une fenêtre de ligne de commande. Remplacez project-id par votre ID de projet et path-to-file par le chemin d'accès à votre fichier transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Pour plus d'informations sur les paramètres de la commande et sur les formats de fichiers de schéma autorisés, consultez la page de documentation gcloud data-catalog entries update.

    c. (Facultatif) Vérifiez que le schéma est bien attribué au sujet Pub/Sub transactions. Remplacez project-id par l'ID de votre projet :

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Trouver des sources Pub/Sub

L’interface utilisateur Dataflow SQL permet de rechercher des objets sources de données Pub/Sub pour chaque projet auquel vous avez accès. Il n'est donc pas nécessaire de mémoriser leurs noms complets.

Pour l'exemple de ce tutoriel, accédez à l'éditeur SQL Dataflow et recherchez le sujet Pub/Sub transactions que vous avez créé :

  1. Accédez à SQL Workspace.

  2. Dans le panneau Éditeur SQL de Dataflow, recherchez projectid=project-id transactions dans la barre de recherche. Remplacez project-id par l'ID de votre projet.

    Panneau de recherche Data Catalog dans l'espace de travail Dataflow SQL.

Afficher le schéma

  1. Dans le panneau Éditeur Dataflow SQL de l'interface utilisateur Dataflow SQL, cliquez sur Transactions ou recherchez un sujet Pub/Sub en saisissant projectid=project-id system=cloud_pubsub, puis sélectionnez le sujet.
  2. Sous Schéma, vous pouvez observer le schéma attribué au sujet Pub/Sub.

    Schéma attribué au sujet, ce qui inclut la liste des noms de champs et leurs descriptions.

Créer une requête SQL

L'interface utilisateur Dataflow SQL permet de créer des requêtes SQL pour exécuter les tâches Dataflow.

La requête SQL suivante est une requête d'enrichissement des données. Elle ajoute un champ supplémentaire (sales_region) au flux d’événements Pub/Sub (transactions) à l'aide d'une table BigQuery (us_state_salesregions) qui met en correspondance les États et les zones de vente.

Copiez la requête SQL suivante et collez-la dans l'éditeur de requête. Remplacez project-id par l'ID de votre projet.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Lorsque vous saisissez une requête dans l'interface utilisateur Dataflow SQL, l'outil de validation des requêtes vérifie la syntaxe de la requête. Une icône en forme de coche verte s'affiche lorsque la requête est valide. Si la requête n'est pas valide, une icône en forme de point d'exclamation rouge s'affiche. Dans le cas où la syntaxe de votre requête n'est pas valide, cliquez sur l'icône de l'outil de validation pour obtenir des informations sur les problèmes à résoudre.

La capture d'écran suivante montre l'affichage d'une requête valide dans l'éditeur de requête. L'icône de validation est une coche verte.

Espace de travail Dataflow SQL avec la requête du tutoriel visible dans l'éditeur

Créer une tâche Dataflow pour exécuter une requête SQL

Pour exécuter votre requête SQL, créez une tâche Dataflow depuis l'interface utilisateur Dataflow SQL.

  1. Au-dessus de l'Éditeur de requête, cliquez sur Créer une tâche.

  2. Dans le panneau Créer une tâche Dataflow qui s'affiche :

    • Pour Destination, sélectionnez BigQuery.
    • Dans le champ ID de l'ensemble de données, sélectionnez dataflow_sql_tutorial.
    • Pour le Nom de la table, saisissez sales.
    Créer un formulaire de tâche Dataflow SQL
  3. Facultatif : Dataflow choisit automatiquement les paramètres optimaux pour votre tâche Dataflow SQL, mais vous pouvez développer le menu Paramètres facultatifs pour spécifier manuellement les options de pipeline suivantes :

    • Nombre maximal de nœuds de calcul
    • Zone
    • Adresse e-mail du compte de service
    • Type de machine
    • Tests supplémentaires
    • Configuration des adresses IP des nœuds de calcul
    • Réseau
    • Sous-réseau
  4. Cliquez sur Create (Créer). Le démarrage de la tâche Dataflow prend quelques minutes.

Afficher la tâche Dataflow

Dataflow transforme la requête SQL en pipeline Apache Beam. Cliquez sur Afficher la tâche pour ouvrir l'interface utilisateur Web de Dataflow, où vous pouvez voir une représentation graphique de votre pipeline.

Pipeline de la requête SQL affichée dans l'interface utilisateur Web de Dataflow.

Pour afficher la répartition des transformations se produisant dans le pipeline, cliquez sur les cases. Par exemple, si vous cliquez sur la première case de la représentation graphique, nommée Exécuter la requête SQL, un graphique s'affiche montrant les opérations effectuées en arrière-plan.

Les deux premières cases représentent les deux entrées jointes : le sujet Pub/Sub (transactions) et la table BigQuery (us_state_salesregions).

L'écriture du résultat d'une jointure de deux entrées s'effectue en 25 secondes.

Pour afficher la table de sortie qui contient les résultats de la tâche, accédez à l'interface utilisateur de BigQuery. Dans le panneau Explorateur de votre projet, cliquez sur l'ensemble de données dataflow_sql_tutorial que vous avez créé. Ensuite, cliquez sur la table de sortie sales. L'onglet Preview (Prévisualiser) affiche le contenu de la table de sortie.

Le tableau d'aperçu dfsqltable_sales contient des colonnes pour tr_time_str, first_name, last_name, city, state, product, amount, and sales_region.

Afficher les anciennes tâches et modifier les requêtes

L'interface utilisateur Dataflow stocke les tâches et les requêtes passées dans la page Tâches de Dataflow.

Vous pouvez utiliser la liste de l'historique des tâches pour afficher les requêtes SQL précédentes. Vous pouvez, par exemple, modifier la requête pour agréger les ventes par région toutes les 15 secondes. Accédez à la page Tâches pour accéder à la tâche en cours d'exécution que vous avez démarrée en début de tutoriel, copier la requête SQL et exécuter une autre tâche avec une requête modifiée.

  1. Sur la page Tâches de Dataflow, cliquez sur la tâche que vous souhaitez modifier.

  2. Sur la page Informations sur la tâche, dans le panneau Informations sur la tâche, sous Options du pipeline, localisez la requête SQL. Recherchez la ligne correspondant à queryString.

    L'option de pipeline de tâche nommée queryString
  3. Copiez la requête SQL suivante et collez-la dans l'éditeur Dataflow SQL de l'espace de travail SQL afin d'ajouter les fenêtres de bascule. Remplacez project-id par l'ID de votre projet.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Cliquez sur Créer une tâche pour créer une tâche avec la requête modifiée.

Nettoyer

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte de facturation Cloud, procédez comme suit :

  1. Arrêtez le script de publication transactions_injector.py s'il est encore en cours d'exécution.

  2. Arrêtez vos tâches Dataflow. Accédez à l'interface utilisateur Web de Dataflow dans la console Google Cloud.

    Accéder à l'interface utilisateur Web de Dataflow

    Pour chaque tâche créée lors de ce tutoriel, procédez comme suit :

    1. Cliquez sur le nom de la tâche.

    2. Sur la page Informations sur la tâche, cliquez sur Arrêter. La boîte de dialogue Stop Job (Arrêter la tâche) affiche les options d'arrêt de la tâche.

    3. Sélectionnez Annuler.

    4. Cliquez sur Stop job (Arrêter la tâche). Cela permet de stopper toutes les ingestions et tous les traitements de données dans les meilleurs délais possibles. La commande Cancel (Annuler) interrompt immédiatement le traitement de données. Cela peut donc entraîner la perte des données en cours de transfert. L'arrêt d'une tâche peut prendre plusieurs minutes.

  3. Supprimez l'ensemble de données BigQuery. Accédez à l'UI Web de BigQuery dans la console Google Cloud.

    Accéder à l'UI Web de BigQuery

    1. Dans la section Ressources du panneau Explorateur, cliquez sur l'ensemble de données dataflow_sql_tutorial que vous avez créé.

    2. Dans le panneau des détails, cliquez sur Supprimer. Une boîte de dialogue de confirmation s'affiche.

    3. Dans la boîte de dialogue Supprimer l'ensemble de données, confirmez la commande de suppression en saisissant delete, puis cliquez sur Supprimer.

  4. Supprimez le sujet Pub/Sub. Accédez à la page des sujets Pub/Sub dans la console Google Cloud.

    Accéder à la page des sujets Pub/Sub

    1. Sélectionnez le sujet transactions.

    2. Cliquez sur Delete (Supprimer) pour supprimer définitivement le sujet. Une boîte de dialogue de confirmation s'affiche.

    3. Dans la boîte de dialogue Supprimer le sujet, confirmez la commande de suppression en saisissant delete, puis cliquez sur Supprimer.

    4. Accédez à la page Abonnements Pub/Sub.

    5. Sélectionnez les abonnements restants à transactions. Si les tâches ne sont plus en cours d'exécution, il n'y a peut-être aucun abonnement présent.

    6. Cliquez sur Delete (Supprimer) pour supprimer définitivement l'abonnement. Dans la boîte de dialogue de confirmation, cliquez sur Supprimer.

  5. Supprimez le bucket de préproduction Dataflow dans Cloud Storage. Accédez à la page Buckets de Cloud Storage dans la console Google Cloud.

    Accéder à la page "Buckets"

    1. Sélectionnez le bucket de préproduction Dataflow.

    2. Cliquez sur Delete (Supprimer) pour supprimer définitivement le bucket. Une boîte de dialogue de confirmation s'affiche.

    3. Dans la boîte de dialogue Supprimer le bucket, confirmez la commande de suppression en saisissant DELETE, puis cliquez sur Supprimer.

Étape suivante