Joindre un flux de données à l'aide de Dataflow SQL


Ce tutoriel explique comment joindre un flux de données de Pub/Sub aux données d'une table BigQuery à l'aide de Dataflow SQL.

Objectifs

Dans ce tutoriel, vous allez :

  • écrire une requête Dataflow SQL pour joindre un flux de données Pub/Sub aux données d'une table BigQuery ;
  • déployer une tâche Dataflow à partir de l'interface utilisateur Dataflow SQL.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  11. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  12. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  13. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  14. Installer et initialiser gcloud CLI Choisissez l'une des options d'installation. Il vous faudra peut-être définir la propriété project sur le projet utilisé pour ce tutoriel.
  15. Accédez à l'interface utilisateur Web de Dataflow SQL dans la console Google Cloud. Le projet le plus récemment utilisé s'ouvre. Pour changer de projet, cliquez sur le nom du projet dans la partie supérieure de l'interface utilisateur Web de Dataflow SQL, et cherchez le projet que vous souhaitez utiliser.
    Accéder à l'UI Web de Dataflow SQL

Créer des sources pour l'exemple présenté dans le tutoriel

Si vous souhaitez suivre l'exemple présenté dans ce tutoriel, créez les sources suivantes afin de les utiliser au cours des différentes étapes du tutoriel.

  • Un sujet Pub/Sub appelé transactions - Flux de données de transaction provenant d'un abonnement à un sujet Pub/Sub. Les données de chaque transaction comprennent des informations telles que le produit acheté, le prix d'achat, le lieu d'achat (ville et État). Après avoir créé le sujet Pub/Sub, créez un script qui publie des messages sur le sujet. Ce script sera exécuté au cours d'une étape ultérieure du tutoriel.
  • Une table BigQuery appelée us_state_salesregions - Table fournissant un mappage des États avec les régions de vente. Avant de créer cette table, il vous faudra créer un ensemble de données BigQuery.

Attribuer un schéma au sujet Pub/Sub

L'attribution d'un schéma vous permet d'exécuter des requêtes SQL sur les données de votre sujet Pub/Sub. Dataflow SQL attend des messages sérialisés au format JSON dans les sujets Pub/Sub.

Pour attribuer un schéma aux transactions de l'exemple de sujet Pub/Sub :

  1. Créez un fichier texte et nommez-le transactions_schema.yaml. Copiez le schéma suivant et collez-le dans transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Attribuez le schéma à l'aide de Google Cloud CLI.

    a. Mettez à jour gcloud CLI à l'aide de la commande suivante. Assurez-vous que gcloud CLI est de version 242.0.0 ou ultérieure.

      gcloud components update

    b. Exécutez la commande suivante dans une fenêtre de ligne de commande. Remplacez project-id par votre ID de projet et path-to-file par le chemin d'accès à votre fichier transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml

    Pour plus d'informations sur les paramètres de la commande et sur les formats de fichiers de schéma autorisés, consultez la page de documentation gcloud data-catalog entries update.

    c. (Facultatif) Vérifiez que le schéma est bien attribué au sujet Pub/Sub transactions. Remplacez project-id par l'ID de votre projet :

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'

Trouver des sources Pub/Sub

L’interface utilisateur Dataflow SQL permet de rechercher des objets sources de données Pub/Sub pour chaque projet auquel vous avez accès. Il n'est donc pas nécessaire de mémoriser leurs noms complets.

Pour l'exemple de ce tutoriel, accédez à l'éditeur SQL Dataflow et recherchez le sujet Pub/Sub transactions que vous avez créé :

  1. Accédez à SQL Workspace.

  2. Dans le panneau Éditeur SQL de Dataflow, recherchez projectid=project-id transactions dans la barre de recherche. Remplacez project-id par l'ID de votre projet.

    Panneau de recherche Data Catalog dans l'espace de travail Dataflow SQL.

Afficher le schéma

  1. Dans le panneau Éditeur Dataflow SQL de l'interface utilisateur Dataflow SQL, cliquez sur Transactions ou recherchez un sujet Pub/Sub en saisissant projectid=project-id system=cloud_pubsub, puis sélectionnez le sujet.
  2. Sous Schéma, vous pouvez observer le schéma attribué au sujet Pub/Sub.

    Schéma attribué au sujet, ce qui inclut la liste des noms de champs et leurs descriptions.

Créer une requête SQL

L'interface utilisateur Dataflow SQL permet de créer des requêtes SQL pour exécuter les tâches Dataflow.

La requête SQL suivante est une requête d'enrichissement des données. Elle ajoute un champ supplémentaire (sales_region) au flux d’événements Pub/Sub (transactions) à l'aide d'une table BigQuery (us_state_salesregions) qui met en correspondance les États et les zones de vente.

Copiez la requête SQL suivante et collez-la dans l'éditeur de requête. Remplacez project-id par l'ID de votre projet.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Lorsque vous saisissez une requête dans l'interface utilisateur Dataflow SQL, l'outil de validation des requêtes vérifie la syntaxe de la requête. Une icône en forme de coche verte s'affiche lorsque la requête est valide. Si la requête n'est pas valide, une icône en forme de point d'exclamation rouge s'affiche. Dans le cas où la syntaxe de votre requête n'est pas valide, cliquez sur l'icône de l'outil de validation pour obtenir des informations sur les problèmes à résoudre.

La capture d'écran suivante montre l'affichage d'une requête valide dans l'éditeur de requête. L'icône de validation est une coche verte.

Espace de travail Dataflow SQL avec la requête du tutoriel visible dans l'éditeur

Créer une tâche Dataflow pour exécuter une requête SQL

Pour exécuter votre requête SQL, créez une tâche Dataflow depuis l'interface utilisateur Dataflow SQL.

  1. Au-dessus de l'Éditeur de requête, cliquez sur Créer une tâche.

  2. Dans le panneau Créer une tâche Dataflow qui s'affiche :

    • Pour Destination, sélectionnez BigQuery.
    • Dans le champ ID de l'ensemble de données, sélectionnez dataflow_sql_tutorial.
    • Pour le Nom de la table, saisissez sales.
    Créer un formulaire de tâche Dataflow SQL
  3. Facultatif : Dataflow choisit automatiquement les paramètres optimaux pour votre tâche Dataflow SQL, mais vous pouvez développer le menu Paramètres facultatifs pour spécifier manuellement les options de pipeline suivantes :

    • Nombre maximal de nœuds de calcul
    • Zone
    • Adresse e-mail du compte de service
    • Type de machine
    • Tests supplémentaires
    • Configuration des adresses IP des nœuds de calcul
    • Réseau
    • Sous-réseau
  4. Cliquez sur Créer. Le démarrage de la tâche Dataflow prend quelques minutes.

Afficher la tâche Dataflow

Dataflow transforme la requête SQL en pipeline Apache Beam. Cliquez sur Afficher la tâche pour ouvrir l'interface utilisateur Web de Dataflow, où vous pouvez voir une représentation graphique de votre pipeline.

Pipeline de la requête SQL affichée dans l'interface utilisateur Web de Dataflow.

Pour afficher la répartition des transformations se produisant dans le pipeline, cliquez sur les cases. Par exemple, si vous cliquez sur la première case de la représentation graphique, nommée Exécuter la requête SQL, un graphique s'affiche montrant les opérations effectuées en arrière-plan.

Les deux premières cases représentent les deux entrées jointes : le sujet Pub/Sub (transactions) et la table BigQuery (us_state_salesregions).

L'écriture du résultat d'une jointure de deux entrées s'effectue en 25 secondes.

Pour afficher la table de sortie qui contient les résultats de la tâche, accédez à l'interface utilisateur de BigQuery. Dans le panneau Explorateur de votre projet, cliquez sur l'ensemble de données dataflow_sql_tutorial que vous avez créé. Ensuite, cliquez sur la table de sortie sales. L'onglet Preview (Prévisualiser) affiche le contenu de la table de sortie.

Le tableau d'aperçu dfsqltable_sales contient des colonnes pour tr_time_str, first_name, last_name, city, state, product, amount, and sales_region.

Afficher les anciennes tâches et modifier les requêtes

L'interface utilisateur Dataflow stocke les tâches et les requêtes passées dans la page Tâches de Dataflow.

Vous pouvez utiliser la liste de l'historique des tâches pour afficher les requêtes SQL précédentes. Vous pouvez, par exemple, modifier la requête pour agréger les ventes par région toutes les 15 secondes. Accédez à la page Tâches pour accéder à la tâche en cours d'exécution que vous avez démarrée en début de tutoriel, copier la requête SQL et exécuter une autre tâche avec une requête modifiée.

  1. Sur la page Tâches de Dataflow, cliquez sur la tâche que vous souhaitez modifier.

  2. Sur la page Informations sur la tâche, dans le panneau Informations sur la tâche, sous Options du pipeline, localisez la requête SQL. Recherchez la ligne correspondant à queryString.

    L'option de pipeline de tâche nommée queryString
  3. Copiez la requête SQL suivante et collez-la dans l'éditeur Dataflow SQL de l'espace de travail SQL afin d'ajouter les fenêtres de bascule. Remplacez project-id par l'ID de votre projet.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
  4. Cliquez sur Créer un job pour créer un job avec la requête modifiée.

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte de facturation Cloud, procédez comme suit :

  1. Arrêtez le script de publication transactions_injector.py s'il est encore en cours d'exécution.

  2. Arrêtez vos tâches Dataflow. Accédez à l'interface utilisateur Web de Dataflow dans la console Google Cloud.

    Accéder à l'interface utilisateur Web de Dataflow

    Pour chaque tâche créée lors de ce tutoriel, procédez comme suit :

    1. Cliquez sur le nom de la tâche.

    2. Sur la page Informations sur la tâche, cliquez sur Arrêter. La boîte de dialogue Stop Job (Arrêter la tâche) affiche les options d'arrêt de la tâche.

    3. Sélectionnez Annuler.

    4. Cliquez sur Stop job (Arrêter la tâche). Cela permet de stopper toutes les ingestions et tous les traitements de données dans les meilleurs délais possibles. La commande Cancel (Annuler) interrompt immédiatement le traitement de données. Cela peut donc entraîner la perte des données en cours de transfert. L'arrêt d'une tâche peut prendre plusieurs minutes.

  3. Supprimez l'ensemble de données BigQuery. Accédez à l'UI Web de BigQuery dans la console Google Cloud.

    Accéder à l'UI Web de BigQuery

    1. Dans la section Ressources du panneau Explorateur, cliquez sur l'ensemble de données dataflow_sql_tutorial que vous avez créé.

    2. Dans le panneau des détails, cliquez sur Supprimer. Une boîte de dialogue de confirmation s'affiche.

    3. Dans la boîte de dialogue Supprimer l'ensemble de données, confirmez la commande de suppression en saisissant delete, puis cliquez sur Supprimer.

  4. Supprimez le sujet Pub/Sub. Accédez à la page des sujets Pub/Sub dans la console Google Cloud.

    Accéder à la page des sujets Pub/Sub

    1. Sélectionnez le sujet transactions.

    2. Cliquez sur Delete (Supprimer) pour supprimer définitivement le sujet. Une boîte de dialogue de confirmation s'affiche.

    3. Dans la boîte de dialogue Supprimer le sujet, confirmez la commande de suppression en saisissant delete, puis cliquez sur Supprimer.

    4. Accédez à la page Abonnements Pub/Sub.

    5. Sélectionnez les abonnements restants à transactions. Si les tâches ne sont plus en cours d'exécution, il n'y a peut-être aucun abonnement présent.

    6. Cliquez sur Delete (Supprimer) pour supprimer définitivement l'abonnement. Dans la boîte de dialogue de confirmation, cliquez sur Supprimer.

  5. Supprimez le bucket de préproduction Dataflow dans Cloud Storage. Accédez à la page Buckets de Cloud Storage dans la console Google Cloud.

    Accéder à la page "Buckets"

    1. Sélectionnez le bucket de préproduction Dataflow.

    2. Cliquez sur Delete (Supprimer) pour supprimer définitivement le bucket. Une boîte de dialogue de confirmation s'affiche.

    3. Dans la boîte de dialogue Supprimer le bucket, confirmez la commande de suppression en saisissant DELETE, puis cliquez sur Supprimer.

Étape suivante