Créer un pipeline de traitement par flux à l'aide d'un modèle Dataflow

Ce guide de démarrage rapide explique comment créer un pipeline de traitement par flux à l'aide d'un modèle Dataflow fourni par Google. Plus précisément, nous utiliserons ici le modèle Pub/Sub vers BigQuery à titre d'exemple.

Le modèle Pub/Sub vers BigQuery est un pipeline de flux de données capable de lire des messages au format JSON à partir d'un sujet Pub/Sub et de les écrire dans une table BigQuery.


Pour obtenir des instructions détaillées sur cette tâche directement dans la console Google Cloud, cliquez sur Visite guidée :

Visite guidée


Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  4. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager.

    Activer les API

  5. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  6. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  7. Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager.

    Activer les API

  8. Créez un bucket Cloud Storage :
    1. Dans la console Google Cloud, accédez à la page Buckets Cloud Storage.

      Accéder à la page "Buckets"

    2. Cliquez sur Créer un bucket.
    3. Sur la page Créer un bucket, saisissez les informations concernant votre bucket. Pour passer à l'étape suivante, cliquez sur Continuer.
      • Pour nommer votre bucket, saisissez un nom unique. N'incluez aucune information sensible dans le nom des buckets, car leur espace de noms est global et visible par tous.
      • Pour Choisir l'emplacement de stockage des données, procédez comme suit :
        • Sélectionnez une option de type d'emplacement.
        • Sélectionnez une option Location (Emplacement).
      • Pour Choisir une classe de stockage par défaut pour vos données, sélectionnez l'option suivante : Standard
      • Pour le champ Choisir comment contrôler l'accès aux objets, sélectionnez une option de Contrôle des accès.
      • Sous Paramètres avancés (facultatif), choisissez une méthode de chiffrement, une règle de conservation ou des libellés de bucket.
    4. Cliquez sur Create (Créer).
  9. Copiez les éléments suivants, car vous en aurez besoin dans une section ultérieure :
    • Le nom de votre bucket Cloud Storage.
    • L'ID de votre projet Google Cloud.

      Pour trouver cet ID, consultez la section Identifier des projets.
  10. Pour suivre la procédure de ce guide de démarrage rapide, votre compte utilisateur doit disposer du rôle Administrateur Dataflow et du rôle Utilisateur du compte de service. Le compte de service Compute Engine par défaut doit disposer du rôle Nœud de calcul Dataflow. Pour ajouter les rôles requis dans la console Google Cloud, procédez comme suit :

    1. Accédez à la page IAM.
      Accéder à IAM
    2. Sélectionnez votre projet.
    3. Sur la ligne contenant votre compte utilisateur, cliquez sur Modifier le compte principal, puis sur Ajouter un autre rôle.
    4. Dans la liste déroulante, sélectionnez le rôle Administrateur Dataflow.
    5. Répétez l'opération pour le rôle Utilisateur du compte de service, puis cliquez sur Enregistrer.
    6. Sur la ligne contenant le compte de service Compute Engine par défaut, cliquez sur Modifier le compte principal, puis sur Ajouter un autre rôle.
    7. Dans la liste déroulante, sélectionnez le rôle Nœud de calcul Dataflow.
    8. Répétez l'opération pour les rôles Éditeur Pub/Sub et Éditeur de données BigQuery, puis cliquez sur Enregistrer.

      Pour en savoir plus sur l'attribution de rôles, consultez la page Attribuer un rôle IAM à l'aide de la console.

  11. Par défaut, chaque nouveau projet démarre avec un réseau par défaut. Si le réseau par défaut de votre projet est désactivé ou a été supprimé, vous devez disposer d'un réseau dans votre projet pour lequel votre compte utilisateur dispose du Rôle d'utilisateur de réseau de Compute (roles/compute.networkUser ).

Créer un ensemble de données et une table BigQuery

Créez un ensemble de données et une table BigQuery selon le schéma approprié à votre sujet Pub/Sub à l'aide de la console Google Cloud.

Dans cet exemple, le nom de l'ensemble de données est taxirides et le nom de la table est realtime. Pour créer cet ensemble de données et cette table, procédez comme suit :

  1. Accédez à la page BigQuery.
    Accéder à BigQuery
  2. Dans le panneau Explorateur, à côté du projet dans lequel vous souhaitez créer l'ensemble de données, cliquez sur Afficher les actions, puis cliquez sur Créer un ensemble de données.
  3. Dans le panneau Créer un ensemble de données, procédez comme suit :
    1. Dans le champ ID de l'ensemble de données, saisissez taxirides. Les ID des ensembles de données sont uniques pour chaque projet Google Cloud.
    2. Pour Type d'emplacement, choisissez Multirégional, puis sélectionnez US (plusieurs régions aux États-Unis). Les ensembles de données publics sont stockés dans l'emplacement multirégional US. Par souci de simplicité, utilisez le même emplacement pour votre ensemble de données.
    3. Conservez les autres paramètres par défaut, puis cliquez sur Créer un ensemble de données.
  4. Dans le panneau Explorateur, développez votre projet.
  5. À côté de votre ensemble de données taxirides, cliquez sur Afficher les actions, puis sur Créer une table.
  6. Dans le panneau Créer une table, procédez comme suit :
    1. Dans la section Source, sous Créer une table à partir de, sélectionnez Table vide.
    2. Pour le champ Table de la section Destination, saisissez realtime.
    3. Dans la section Schéma, cliquez sur le bouton Modifier sous forme de texte et collez la définition de schéma suivante dans la zone :
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. Dans la section Paramètres de partitionnement et de clustering, sous Partitionnement, sélectionnez le champ Code temporel.
  7. Ne modifiez aucun autre paramètre par défaut et cliquez sur Créer une table.

Exécuter le pipeline

Exécutez un pipeline de flux de données à l'aide du modèle Pub/Sub vers BigQuery fourni par Google. Le pipeline reçoit des données entrantes à partir du sujet d'entrée.

  1. Accédez à la page Tâches Dataflow.
    Accéder aux tâches
  2. Cliquez sur Créer une tâche à partir d'un modèle.
  3. Saisissez taxi-data comme nom de la tâche pour votre tâche Dataflow.
  4. Pour Modèle Dataflow, sélectionnez le modèle Pub/Sub vers BigQuery.
  5. Dans le champ Table BigQuery de sortie, saisissez ce qui suit :
    PROJECT_ID:taxirides.realtime

    Remplacez PROJECT_ID par l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery.

  6. Développez la section Paramètres facultatifs.
  7. Pour le champ Sujet Pub/Sub d'entrée, cliquez sur Saisir le sujet manuellement.
  8. Dans la boîte de dialogue, saisissez les éléments suivants comme Nom du sujet, puis cliquez sur Enregistrer :
    projects/pubsub-public-data/topics/taxirides-realtime

    Ce sujet Pub/Sub disponible publiquement est basé sur l'ensemble de données ouvert de la NYC Taxi & Limousine Commission. Voici un exemple de message de cet article au format JSON :

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  9. Dans le champ Emplacement temporaire, saisissez ce qui suit :
    gs://BUCKET_NAME/temp/

    Remplacez BUCKET_NAME par le nom de votre bucket Cloud Storage. Le dossier temp stocke les fichiers temporaires, comme la tâche de pipeline en préproduction.

  10. Si votre projet ne possède pas de réseau par défaut, saisissez un réseau et un sous-réseau. Pour plus d'informations, consultez la section Spécifier un réseau et un sous-réseau.
  11. Cliquez sur Exécuter la tâche.

Afficher les résultats

Pour afficher les données écrites dans la table realtime, procédez comme suit :

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Cliquez sur Saisir une nouvelle requête. Un onglet Éditeur s'affiche.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Remplacez PROJECT_ID par l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery. L'affichage des données dans votre table peut prendre jusqu'à une minute.

  3. Cliquez sur Exécuter.

    La requête renvoie les lignes qui ont été ajoutées à votre table au cours des dernières 24 heures. Vous pouvez également exécuter des requêtes en SQL standard.

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page soient facturées sur votre compte Google Cloud, procédez comme suit :

Supprimer le projet

Le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Google Cloud que vous avez créé dans le cadre de ce guide de démarrage rapide.

  1. Dans la console Google Cloud, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer les ressources individuelles

Si vous souhaitez conserver le projet Google Cloud que vous avez utilisé dans ce guide de démarrage rapide, supprimez les ressources individuelles :

  1. Accédez à la page Tâches Dataflow.
    Accéder aux tâches
  2. Sélectionnez votre tâche de traitement par flux dans la liste des tâches.
  3. Dans la barre de navigation, cliquez sur Arrêter.
  4. Dans la boîte de dialogue Arrêter la tâche, annulez ou drainez votre pipeline, puis cliquez sur Arrêter la tâche.
  5. Accédez à la page BigQuery.
    Accéder à BigQuery
  6. Sur le panneau Explorateur, développez votre projet.
  7. À côté de l'ensemble de données que vous souhaitez supprimer, cliquez sur Afficher les actions, puis sur Ouvrir.
  8. Dans le panneau des détails, cliquez sur Supprimer l'ensemble de données, puis suivez les instructions.
  9. Dans la console Google Cloud, accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  10. Cochez la case correspondant au bucket que vous souhaitez supprimer.
  11. Pour supprimer le bucket, cliquez sur Supprimer , puis suivez les instructions.

Étape suivante