Démarrage rapide avec des modèles

Cette page explique comment créer un pipeline de traitement par flux à l'aide d'un modèle Dataflow fourni par Google. Plus précisément, nous utiliserons ici le modèle Sujet Pub/Sub vers BigQuery à titre d'exemple.

Le modèle Sujet Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un sujet Pub/Sub et les écrit dans une table BigQuery.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub et Cloud Resource Manager .

    Activer les API

  5. Créez un bucket Cloud Storage :
    1. Dans Cloud Console, accédez à la page du navigateur Cloud Storage.

      Accéder à la page du navigateur

    2. Cliquez sur Créer un bucket.
    3. Sur la page Créer un bucket, saisissez les informations concernant votre bucket. Pour passer à l'étape suivante, cliquez sur Continuer.
      • Pour nommer votre bucket, saisissez un nom unique. N'incluez aucune information sensible dans le nom des buckets, car leur espace de noms est global et visible par tous.
      • Pour Choisir l'emplacement de stockage des données, procédez comme suit :
        • Sélectionnez une option de type d'emplacement.
        • Sélectionnez une option Emplacement.
      • Pour Choisir une classe de stockage par défaut pour vos données, sélectionnez les éléments suivants : Standard.
      • Pour le champ Choisir comment contrôler l'accès aux objets, sélectionnez une option de Contrôle des accès.
      • Sous Paramètres avancés (facultatif), choisissez une méthode de chiffrement, une règle de conservation ou des libellés de bucket.
    4. Cliquez sur Create (Créer).

Créer un ensemble de données et une table BigQuery

Créez un ensemble de données et une table BigQuery selon le schéma approprié à votre sujet Pub/Sub à l'aide de Cloud Console.

Dans cet exemple, le nom de l'ensemble de données est taxirides et le nom de la table est realtime. Pour créer cet ensemble de données et cette table, procédez comme suit :

  1. Dans Cloud Console, accédez à la page BigQuery.
    Accéder à BigQuery
  2. Sur le panneau Explorateur, sélectionnez le projet pour lequel vous souhaitez créer l'ensemble de données.
  3. Sur le panneau des informations détaillées, cliquez sur Créer un ensemble de données.
  4. Sur la page Créer un ensemble de données, procédez comme suit :
    1. Dans le champ Dataset ID (ID de l'ensemble de données), saisissez taxirides.
    2. Dans le champ Data location (Emplacement des données), sélectionnez United States (US) (États-Unis). Les ensembles de données publics sont actuellement stockés dans l'emplacement multirégional US. Par souci de simplicité, utilisez le même emplacement pour votre ensemble de données.
  5. Ne modifiez aucun autre paramètre par défaut et cliquez sur Créer un ensemble de données.
  6. Sur le panneau Explorateur, développez votre projet.
  7. À côté de votre ensemble de données, cliquez sur l'icône d'affichage des actions , puis sur Ouvrir.
  8. Sur le panneau d'informations détaillées, cliquez sur Créer une table.
  9. Sur la page Créer une table :
    1. Dans la section Source, sous Créer une table à partir de, sélectionnez Table vide.
    2. Pour le champ Nom de la table de la section Destination, saisissez realtime.
    3. Dans la section Schéma, cliquez sur le bouton Modifier en tant que texte et collez la définition de schéma suivante dans la zone de texte :
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. Dans la section Paramètres de partitionnement et de clustering, sous Partitionnement, sélectionnez le champ horodatage.
  10. Ne modifiez aucun autre paramètre par défaut et cliquez sur Créer une table.

Exécuter le pipeline

Exécutez un pipeline de traitement par flux à l'aide du modèle Sujet Pub/Sub vers BigQuery fourni par Google.

  1. Dans Cloud Console, accédez à la page Tâches Dataflow.
    Accéder à la page Tâches Dataflow
  2. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  3. Saisissez un nom de tâche pour votre tâche Dataflow.
  4. Sous Modèle Dataflow, sélectionnez le modèle Sujet Pub/Sub vers BigQuery.
  5. Dans le champ Sujet Pub/Sub d'entrée, saisissez projects/pubsub-public-data/topics/taxirides-realtime. Le pipeline reçoit des données entrantes à partir du sujet d'entrée.

    Il s'agit d'un sujet public basé sur l'ensemble de données ouvert de la NYC Taxi & Limousine Commission, agrémenté d'informations de routage supplémentaires à l'aide de l'API Google Maps Directions et d'horodatages interpolés pour simuler un scénario en temps réel. Voici un exemple de message de cet article au format JSON :

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  6. Dans le champ BigQuery output table (Table de sortie BigQuery), saisissez PROJECT_ID:taxirides.realtime. Remplacez PROJECT_ID par l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery.
  7. Dans le champ Emplacement temporaire, saisissez gs://BUCKET_NAME/temp/. Remplacez BUCKET_NAME par le nom de votre bucket Cloud Storage. temp est un dossier de ce bucket destiné au stockage des fichiers temporaires, comme la tâche de pipeline par étapes.
  8. Cliquez sur Run Job (Exécuter la tâche).
  9. Affichez les données écrites dans BigQuery. Accédez à la page BigQuery.
    Accédez à BigQuery

    Vous pouvez envoyer des requêtes à l'aide du langage SQL standard. Par exemple, la requête suivante sélectionne toutes les lignes ajoutées au cours des dernières 24 heures :

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Remplacez PROJECT_ID par l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery.

Nettoyer

Pour éviter que les ressources utilisées dans ce guide démarrage rapide soient facturées sur votre compte Google Cloud :

  1. Dans Cloud Console, accédez à la page Tâches Dataflow.
    Accéder à Tâches Dataflow
    1. Sélectionnez votre tâche de traitement par flux dans la liste des tâches.
    2. Dans la barre de navigation, cliquez sur Arrêter.
    3. Dans la boîte de dialogue Arrêter la tâche, choisissez l'option Annuler ou Drainer pour votre pipeline, puis cliquez sur Arrêter la tâche.
  2. Dans Cloud Console, accédez à la page BigQuery.
    Accédez à BigQuery
    1. Sur le panneau Explorateur, développez votre projet.
    2. À côté de votre ensemble de données, cliquez sur l'icône "Affichage des actions", puis sur Ouvrir.
    3. Dans le panneau de détails, cliquez sur Supprimer un ensemble de données.
    4. Dans la boîte de dialogue Supprimer l'ensemble de données, confirmez la commande de suppression en saisissant le nom de votre ensemble de données (taxirides), puis en cliquant sur Supprimer.

Étape suivante