Créer un pipeline de streaming d'e-commerce


Dans ce tutoriel, vous allez créer un pipeline de flux de données Dataflow qui transforme les données d'e-commerce provenant de sujets et d'abonnements Pub/Sub, et génère les données de sortie dans BigQuery et Bigtable. Ce tutoriel nécessite Gradle.

Le tutoriel fournit un exemple d'application d'e-commerce de bout en bout qui diffuse les données de la boutique en ligne vers BigQuery et Bigtable. L'exemple d'application illustre des cas d'utilisation courants et des bonnes pratiques pour mettre en œuvre l'analyse de flux de données en continu et l'intelligence artificielle (IA) en temps réel. Utilisez ce tutoriel pour apprendre à répondre de manière dynamique aux actions des clients afin d'analyser les événements et d'y réagir en temps réel. Ce tutoriel explique comment stocker, analyser et visualiser les données des événements afin d'obtenir des insights sur le comportement des clients.

L'exemple d'application est disponible sur GitHub. Pour exécuter ce tutoriel à l'aide de Terraform, suivez les étapes fournies avec l'exemple d'application sur GitHub.

Objectifs

  • Valider les données entrantes et apporter des corrections, si possible.
  • Analyser les données de flux de clics pour obtenir le nombre de vues par produit sur une période donnée. Stockez ces informations dans un magasin à faible latence. L'application peut ensuite utiliser ces données pour fournir des messages sur le nombre de personnes ayant consulté ce produit aux clients sur le site Web.
  • Utiliser les données de transaction pour optimiser l'ordre de l'inventaire :

    • Analyser les données de transaction pour calculer le nombre total de ventes pour chaque article, à la fois par magasin et à l'échelle mondiale, sur une période donnée.
    • Analyser les données d'inventaire pour calculer l'inventaire entrant pour chaque élément.
    • Transmettre en continu ces données aux systèmes d'inventaire afin de pouvoir les utiliser pour la prise de décisions d'achats.
  • Valider les données entrantes et apporter des corrections, si possible. Écrire toutes les données non rectifiables dans une file d'attente de lettres mortes à des fins d'analyse et de traitement supplémentaires. Créer une métrique représentant le pourcentage de données entrantes envoyées à la file d'attente de lettres mortes et mettre cette métrique à disposition des outils de surveillance et de génération d'alertes.

  • Traiter toutes les données entrantes dans un format standard et les stocker dans un entrepôt de données afin de les utiliser ultérieurement à des fins d'analyse et de visualisation.

  • Dénormaliser les données de transaction pour les ventes en magasin afin qu'elles puissent inclure des informations telles que la latitude et la longitude de l'emplacement du magasin. Diffuser les informations sur le magasin via une table à évolution lente dans BigQuery, en utilisant l'ID du magasin en tant que clé.

Données

L'application traite les types de données suivants :

  • Données de flux de clics envoyées par des systèmes en ligne à Pub/Sub
  • Données de transaction envoyées par des systèmes sur site ou SaaS (Software as a Service) à Pub/Sub
  • Données sur les stocks envoyées par des systèmes sur site ou SaaS à Pub/Sub

Modèles de tâche

L'application contient les modèles de tâches suivants, communs aux pipelines créés avec le SDK Apache Beam pour Java:

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Créez un compte de service de nœud de calcul géré par l'utilisateur pour votre nouveau pipeline et attribuez à ce compte les rôles nécessaires.

    1. Pour créer le compte de service, exécutez la commande gcloud iam service-accounts create :

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Attribuez des rôles au compte de service. Exécutez la commande ci-dessous une fois pour chacun des rôles IAM suivants :

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Remplacez SERVICE_ACCOUNT_ROLE par chaque rôle individuel.

    3. Attribuez à votre compte Google un rôle qui vous permet de créer des jetons d'accès pour le compte de service :

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. Si nécessaire, téléchargez et installez Gradle.

Créer les exemples de sources et de récepteurs

Cette section explique comment créer les éléments suivants :

  • Un bucket Cloud Storage à utiliser comme emplacement de stockage temporaire
  • Des sources de données en flux à l'aide de Pub/Sub
  • Des ensembles de données pour charger les données dans BigQuery
  • Instance Bigtable

Créer un bucket Cloud Storage

Commencez par créer un bucket Cloud Storage. Ce bucket sert d'emplacement de stockage temporaire pour le pipeline Dataflow.

Exécutez la commande gcloud storage buckets create :

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Remplacez les éléments suivants :

Créer les sujets et les abonnements Pub/Sub

Créez quatre sujets Pub/Sub, puis créez trois abonnements.

Pour créer vos sujets, exécutez la commande gcloud pubsub topics create une fois pour chaque sujet. Pour savoir comment nommer un abonnement, consultez la section Consignes de dénomination d'un sujet ou d'un abonnement.

gcloud pubsub topics create TOPIC_NAME

Remplacez TOPIC_NAME par les valeurs suivantes, en exécutant la commande quatre fois, soit une fois pour chaque sujet :

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Pour créer un abonnement associé à votre sujet, exécutez la commande gcloud pubsub subscriptions create une fois pour chaque abonnement :

  1. Créez l'abonnement Clickstream-inbound-sub :

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Créez l'abonnement Transactions-inbound-sub :

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Créez l'abonnement Inventory-inbound-sub :

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Créer les ensembles de données et la table BigQuery

Créez un ensemble de données BigQuery et une table partitionnée avec le schéma approprié pour votre sujet Pub/Sub.

  1. Utilisez la commande bq mk pour créer le premier ensemble de données.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Créez le deuxième ensemble de données.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Utilisez l'instruction SQL CREATE TABLE pour créer une table avec un schéma et des données de test. Les données de test contiennent un magasin dont l'identifiant est 1. Le modèle d'entrée secondaire à mise à jour lente utilise cette table.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Créer l'instance et la table Bigtable

Créez l'instance et la table Bigtable. Pour en savoir plus sur la création d'instances Bigtable, consultez la section Créer une instance.

  1. Si nécessaire, exécutez la commande suivante pour installer la CLI cbt :

    gcloud components install cbt
    
  2. Utilisez la commande bigtable instances create pour créer une instance :

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Remplacez CLUSTER_ZONE par la zone dans laquelle s'exécute le cluster.

  3. Utilisez la commande cbt createtable pour créer une table permanente :

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Utilisez la commande suivante pour ajouter une famille de colonnes à la table :

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Exécuter le pipeline

Utilisez Gradle pour exécuter le pipeline de traitement par flux. Pour afficher le code Java utilisé par le pipeline, consultez la section RetailDataProcessingPipeline.java.

  1. Exécutez la commande git clone pour cloner le dépôt GitHub :

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Accédez au répertoire de l'application :

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Pour tester le pipeline, dans votre shell ou votre terminal, exécutez la commande suivante avec Gradle :

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Pour exécuter le pipeline, exécutez la commande suivante avec Gradle :

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

Vous pouvez consulter le code source du pipeline sur GitHub.

Créer et exécuter les tâches Cloud Scheduler

Créez et exécutez trois tâches Cloud Scheduler : une qui publie les données du flux de clics, une pour les données d'inventaire et une pour les données de transaction. Cette étape génère des données d'exemple pour le pipeline.

  1. Pour créer une tâche Cloud Scheduler pour ce tutoriel, utilisez la commande gcloud scheduler jobs create. Cette étape permet de créer un éditeur pour les données du flux de clics, qui publie un message par minute.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Pour démarrer la tâche Cloud Scheduler, utilisez la commande gcloud scheduler jobs run.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Créez et exécutez un autre éditeur similaire pour les données d'inventaire, qui publie un message toutes les deux minutes.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Démarrez la deuxième tâche Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Créez et exécutez un troisième éditeur pour les données de transaction, qui publie un message toutes les deux minutes.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Démarrez la troisième tâche Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Afficher les résultats

Affichez les données écrites dans vos tables BigQuery. Vérifiez les résultats dans BigQuery en exécutant les requêtes suivantes. Pendant l'exécution de ce pipeline, de nouvelles lignes sont ajoutées chaque minute aux tables BigQuery.

Vous devrez peut-être attendre que les tables soient remplies avec des données.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Effectuer un nettoyage

Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

Supprimer le projet

Le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Google Cloud que vous avez créé pour le tutoriel.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Supprimer les ressources individuelles

Si vous souhaitez réutiliser le projet, supprimez les ressources que vous avez créées pour le tutoriel.

Nettoyer les ressources du projet Google Cloud

  1. Pour supprimer les jobs Cloud Scheduler, utilisez la commande gcloud scheduler jobs delete.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Pour supprimer les abonnements et les sujets Pub/Sub, utilisez les commandes gcloud pubsub subscriptions delete et gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Pour supprimer la table BigQuery, utilisez la commande bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Supprimer les ensembles de données BigQuery L'ensemble de données seul ne génère aucuns frais.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Pour supprimer l'instance Bigtable, utilisez la commande cbt deleteinstance. Le bucket seul ne génère aucuns frais.

    cbt deleteinstance aggregate-tables
    
  6. Pour supprimer le bucket Cloud Storage, utilisez la commande gcloud storage rm. Le bucket seul ne génère aucuns frais.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Révoquer les identifiants

  1. Révoquez les rôles que vous avez accordés au compte de service de nœud de calcul géré par l'utilisateur. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Étapes suivantes