Joindre un flux de données à l'aide de Dataflow SQL

Ce tutoriel explique comment joindre un flux de données de Pub/Sub aux données d'une table BigQuery à l'aide de Dataflow SQL.

Objectifs

Dans ce tutoriel, vous allez :

  • écrire une requête Dataflow SQL pour joindre un flux de données Pub/Sub aux données d'une table BigQuery ;
  • déployer une tâche Dataflow à partir de l'interface utilisateur Dataflow SQL.

Coûts

Ce tutoriel utilise des composants facturables de Google Cloud, dont :

  • Dataflow
  • Cloud Storage
  • Pub/Sub

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Cloud.

    Accéder à la page de sélection du projet

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager .

    Activer les API

  5. Configurez l'authentification :
    1. Dans Cloud Console, accédez à la page Créer une clé de compte de service.

      Accéder à la page Créer une clé de compte de service
    2. Dans la liste Compte de service, sélectionnez Nouveau compte de service.
    3. Dans le champ Nom du compte de service, saisissez un nom.
    4. Dans la liste Rôle, sélectionnez Projet > Propriétaire.

      Remarque : Le champ Rôle autorise votre compte de service à accéder aux ressources. Vous pourrez afficher et modifier ce champ ultérieurement à l'aide de Cloud Console. Si vous développez une application de production, spécifiez des autorisations plus précises que Projet > Propriétaire. Pour plus d'informations, consultez la page Attribuer des rôles aux comptes de service.
    5. Cliquez sur Créer. Un fichier JSON contenant votre clé est téléchargé sur votre ordinateur.
  6. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  7. Installez et initialisez le SDK Cloud. Choisissez l'une des options d'installation. Il vous faudra peut-être définir la propriété project sur le projet utilisé pour ce tutoriel.
  8. Accédez à l'UI Web de BigQuery dans Cloud Console. Le projet le plus récemment utilisé s'ouvre. Pour changer de projet, cliquez sur le nom du projet dans la partie supérieure de l'interface utilisateur Web de BigQuery, et cherchez le projet que vous souhaitez utiliser.
    Accéder à l'UI Web de BigQuery

Passer à l'interface utilisateur Dataflow SQL

Dans l'interface utilisateur Web de BigQuery, effectuez les étapes suivantes afin de passer à l'interface utilisateur Dataflow.

  1. Cliquez sur More (Plus) et choisissez Query Settings (Paramètres de requête) dans le menu déroulant.

  2. Dans le menu Query Settings (Paramètres de la requête) qui s'ouvre à droite, sélectionnez Dataflow engine (Moteur Dataflow).

  3. Si les API Dataflow et Data Catalog ne sont pas activées pour votre projet, il vous sera demandé de les activer. Cliquez sur Enable APIs (Activer les API). L'activation des API Dataflow et Data Catalog peut prendre quelques minutes.

  4. Lorsque l'activation des API est terminée, cliquez sur Save (Enregistrer).

Créer des sources pour l'exemple présenté dans le tutoriel

Si vous souhaitez suivre l'exemple présenté dans ce tutoriel, créez les sources suivantes afin de les utiliser au cours des différentes étapes du tutoriel.

  • Un sujet Pub/Sub appelé transactions - Flux de données de transaction provenant d'un abonnement à un sujet Pub/Sub. Les données de chaque transaction comprennent des informations telles que le produit acheté, le prix d'achat, le lieu d'achat (ville et État). Après avoir créé le sujet Pub/Sub, créez un script qui publie des messages sur le sujet. Ce script sera exécuté au cours d'une étape ultérieure du tutoriel.
  • Une table BigQuery appelée us_state_salesregions - Table fournissant un mappage des États avec les régions de vente. Avant de créer cette table, il vous faudra créer un ensemble de données BigQuery.

Trouver des sources Pub/Sub

L’interface utilisateur Dataflow SQL permet de rechercher des objets sources de données Pub/Sub pour chaque projet auquel vous avez accès. Il n'est donc pas nécessaire de mémoriser leurs noms complets.

Pour l'exemple de ce tutoriel, ajoutez le sujet Pub/Sub transactions que vous avez créé :

  1. Dans le panneau de navigation, à gauche, cliquez sur le menu déroulant Add data (Ajouter des données) et sélectionnez Cloud Dataflow sources (Sources Cloud Dataflow).

  2. Dans le panneau Add Cloud Dataflow source (Ajouter une source Cloud Dataflow) qui s'ouvre sur la droite, sélectionnez Pub/Sub topics (Sujets Pub/Sub). Dans le champ de recherche, recherchez transactions. Sélectionnez le sujet et cliquez sur Add (Ajouter).

Attribuer un schéma au sujet Pub/Sub

L'attribution d'un schéma vous permet d'exécuter des requêtes SQL sur les données de votre sujet Pub/Sub. Dataflow SQL attend des messages sérialisés au format JSON dans les sujets Pub/Sub. La compatibilité avec d'autres formats, tel que Avro, sera ajoutée ultérieurement.

Après avoir ajouté l'exemple de sujet Pub/Sub en tant que source Dataflow, procédez comme suit pour attribuer un schéma au sujet dans l'interface utilisateur Dataflow SQL :

  1. Sélectionnez le sujet dans le panneau Resources (Ressources).

  2. Dans l'onglet Schema (Schéma), cliquez sur Edit schema (Modifier le schéma). Le panneau latéral Schema (Schéma) s'ouvre sur la droite.

    Panneau latéral permettant d'ajouter ou de modifier un schéma

  3. Cliquez sur le bouton Edit as text (Modifier sous forme de texte), puis collez le schéma suivant dans l'éditeur. Ensuite, cliquez sur Submit (Envoyer).

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "type": "FLOAT64"
      }
    ]
    
  4. (Facultatif) Cliquez sur Preview topic (Aperçu du sujet) pour examiner le contenu de vos messages et vérifier qu'ils correspondent au schéma que vous avez défini.

    Le bouton

Afficher le schéma

  1. Dans le panneau de navigation de l'interface utilisateur Dataflow SQL, à gauche, cliquez sur Cloud Dataflow sources (Sources Cloud Dataflow).
  2. Cliquez sur Pub/Sub topics (Sujets Pub/Sub).
  3. Cliquez sur transactions.
  4. Sous Schema (Schéma), vous pouvez observer le schéma attribué au sujet Pub/Sub transactions.

Créer une requête SQL

L'interface utilisateur Dataflow SQL permet de créer des requêtes SQL pour exécuter les tâches Dataflow.

La requête SQL suivante est une requête d'enrichissement des données. Elle ajoute un champ supplémentaire (sales_region) au flux d’événements Pub/Sub (transactions) à l'aide d'une table BigQuery (us_state_salesregions) qui met en correspondance les États et les zones de vente.

Copiez la requête suivante et collez-la dans l'éditeur de requête. Remplacez project-id par votre ID de projet.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Lorsque vous saisissez une requête dans l'interface utilisateur Dataflow SQL, l'outil de validation des requêtes vérifie la syntaxe de la requête. Une icône en forme de coche verte s'affiche lorsque la requête est valide. Si la requête n'est pas valide, une icône en forme de point d'exclamation rouge s'affiche. Dans le cas où la syntaxe de votre requête n'est pas valide, cliquez sur l'icône de l'outil de validation pour obtenir des informations sur les problèmes à résoudre.

La capture d'écran suivante montre l'affichage d'une requête valide dans l'éditeur de requête. L'icône de validation est une coche verte.

Saisissez votre requête dans l'éditeur.

Créer une tâche Dataflow pour exécuter une requête SQL

Pour exécuter votre requête SQL, créez une tâche Dataflow depuis l'interface utilisateur Dataflow SQL.

  1. En dessous de l'éditeur de requête, cliquez sur Create Dataflow job (Créer une tâche Dataflow).

  2. Dans le panneau Create Dataflow job (Créer une tâche Dataflow) qui s'affiche à droite, remplacez la valeur par défaut Table name (Nom de la table) par dfsqltable_sales.

  3. (Facultatif) Dataflow choisit automatiquement les paramètres optimaux pour votre tâche Dataflow SQL, mais vous pouvez développer le menu Paramètres facultatifs pour spécifier manuellement les options de pipeline suivantes :

    • Nombre maximal de nœuds de calcul
    • Zone
    • Adresse e-mail du compte de service
    • Type de machine
    • Tests supplémentaires
    • Configuration des adresses IP des nœuds de calcul
    • Réseau
    • Sous-réseau
  4. Cliquez sur Créer. Le démarrage de la tâche Dataflow prend quelques minutes.

  5. Le panneau des résultats de la requête apparaît dans l'interface utilisateur. Pour retourner ultérieurement au panneau des résultats de la requête d'une tâche, cherchez la tâche dans le panneau Historique des tâches et utilisez le bouton Open query in editor (Ouvrir la requête dans l'éditeur) comme décrit dans la section Afficher les tâches Dataflow et les résultats.

  6. Sous Job information (Informations sur la tâche), cliquez sur le lien Job ID (ID de la tâche). Un nouvel onglet de navigation s'ouvre dans l'interface utilisateur Web de Dataflow. Il affiche la page contenant les détails de la tâche Dataflow.

Afficher les tâches Dataflow et les résultats

Dataflow transforme la requête SQL en pipeline Apache Beam. Dans le nouvel onglet présentant l'interface utilisateur Web de Dataflow, vous pouvez voir une représentation graphique de votre pipeline.

Cliquez sur les cases pour visualiser les détails sur les transformations du pipeline. Par exemple, si vous cliquez sur la case du haut de la représentation graphique, nommée Run SQL Query (Exécuter la requête SQL), un graphique s'affiche montrant les opérations effectuées en arrière-plan.

Les deux cases du haut représentent les deux entrées jointes : le sujet Pub/Sub (transactions) et la table BigQuery (us_state_salesregions).

Pour visualiser la table de sortie qui contient les résultats de la tâche, revenez à l'onglet affichant l'interface utilisateur Dataflow SQL. Dans le panneau de navigation de gauche, sous "Project" (Projet), cliquez sur le nom de l'ensemble de données dataflow_sql_dataset que vous avez créé. Ensuite, cliquez sur le nom de la table de sortie (dfsqltable_sales). L'onglet Preview (Prévisualiser) affiche le contenu de la table de sortie.

Afficher les anciennes tâches et modifier les requêtes

L'interface utilisateur Dataflow stocke les anciennes tâches et les requêtes dans le panneau Job history (Historique des tâches). Les tâches sont présentées selon la date du démarrage de la tâche. La liste des tâches affiche d'abord les jours pour lesquels il y a des tâches en cours d'exécution. La liste affiche ensuite les jours pour lesquels il n'y a aucune tâche en cours d'exécution.

Dans l'historique des tâches, vous pouvez modifier vos anciennes requêtes SQL et exécuter de nouvelles tâches Dataflow. Vous pouvez, par exemple, modifier la requête pour agréger les ventes par région, toutes les 15 secondes. À l'aide du panneau Job history (Historique des tâches), accédez à la tâche en cours d'exécution démarrée en début de tutoriel, modifiez la requête SQL et exécutez une autre tâche avec la requête ainsi modifiée.

  1. Dans le panneau de navigation de gauche, cliquez sur Job history (Historique des tâches).

  2. Sous Job history (Historique des tâches), cliquez sur Cloud Dataflow. Toutes les anciennes tâches du projet s'affichent.

  3. Cliquez sur la tâche à modifier. Cliquez sur Open in query editor (Ouvrir dans l'éditeur de requête).

  4. Modifiez la requête SQL dans Query editor (Éditeur de requête) pour ajouter le type de fenêtrage tumbling windows. Remplacez project-id par votre ID de projet si vous copiez la requête suivante :

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. Sous Query editor (Éditeur de requête), cliquez sur Create Cloud Dataflow job (Créer une tâche Cloud Dataflow) pour créer une tâche avec la requête modifiée.

Nettoyer

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud, procédez comme suit :

  1. Arrêtez le script de publication transactions_injector.py s'il est encore en cours d'exécution.

  2. Arrêtez vos tâches Dataflow. Accédez à l'interface utilisateur Web de Dataflow dans Cloud Console.

    Accéder à l'interface utilisateur Web de Dataflow

    Pour chaque tâche créée lors de ce tutoriel, procédez comme suit :

    1. Cliquez sur le nom de la tâche.

    2. Dans le panneau Job summary (Résumé de la tâche) de la tâche, cliquez sur Stop job (Arrêter la tâche). La boîte de dialogue Stop Job (Arrêter la tâche) affiche les options d'arrêt de la tâche.

    3. Cliquez sur Cancel (Annuler).

    4. Cliquez sur Stop job (Arrêter la tâche). Cela permet de stopper toutes les ingestions et tous les traitements de données dans les meilleurs délais possibles. La commande Cancel (Annuler) interrompt immédiatement le traitement de données. Cela peut donc entraîner la perte des données en cours de transfert. L'arrêt d'une tâche peut prendre plusieurs minutes.

  3. Supprimez l'ensemble de données BigQuery. Accédez à l'UI Web de BigQuery dans Cloud Console.

    Accéder à l'UI Web de BigQuery

    1. Dans le panneau de navigation, dans la section Resources (Ressources), cliquez sur l'ensemble de données dataflow_sql_dataset que vous avez créé.

    2. Dans le panneau de détails, sur le côté droit, cliquez sur Delete dataset (Supprimer l'ensemble de données). Cette action supprime l'ensemble de données, la table et toutes les données.

    3. Dans la boîte de dialogue Supprimer l'ensemble de données, confirmez la commande de suppression en saisissant le nom de votre ensemble de données (dataflow_sql_dataset), puis cliquez sur Supprimer.

  4. Supprimez le sujet Pub/Sub. Accédez à la page "Sujets Pub/Sub" dans Cloud Console.

    Accéder à la page des sujets Pub/Sub

    1. Cochez la case à côté du sujet transactions.

    2. Cliquez sur Supprimer pour supprimer définitivement le sujet.

    3. Accédez à la page Abonnements Pub/Sub.

    4. Cochez la case à côté des abonnements restants au sujet transactions. Si les tâches ne sont plus en cours d'exécution, il n'y a peut-être aucun abonnement présent.

    5. Cliquez sur Delete (Supprimer) pour supprimer définitivement l'abonnement.

  5. Supprimez le bucket de préproduction Dataflow dans Cloud Storage. Accédez au navigateur Cloud Storage dans Cloud Console.

    Accéder au navigateur Cloud Storage

    1. Cochez la case à côté du bucket de préproduction Dataflow.

    2. Cliquez sur Delete (Supprimer) pour supprimer définitivement le bucket.

Étape suivante