Dans ce tutoriel, vous allez créer un pipeline de flux de données Dataflow qui transforme les données d'e-commerce provenant de sujets et d'abonnements Pub/Sub, et génère les données de sortie dans BigQuery et Bigtable. Ce tutoriel nécessite Gradle.
Le tutoriel fournit un exemple d'application d'e-commerce de bout en bout qui diffuse les données de la boutique en ligne vers BigQuery et Bigtable. L'exemple d'application illustre des cas d'utilisation courants et des bonnes pratiques pour mettre en œuvre l'analyse de flux de données en continu et l'intelligence artificielle (IA) en temps réel. Utilisez ce tutoriel pour apprendre à répondre de manière dynamique aux actions des clients afin d'analyser les événements et d'y réagir en temps réel. Ce tutoriel explique comment stocker, analyser et visualiser les données des événements afin d'obtenir des insights sur le comportement des clients.
L'exemple d'application est disponible sur GitHub. Pour exécuter ce tutoriel à l'aide de Terraform, suivez les étapes fournies avec l'exemple d'application sur GitHub.
Objectifs
- Valider les données entrantes et apporter des corrections, si possible.
- Analyser les données de flux de clics pour obtenir le nombre de vues par produit sur une période donnée. Stockez ces informations dans un magasin à faible latence. L'application peut ensuite utiliser ces données pour fournir des messages sur le nombre de personnes ayant consulté ce produit aux clients sur le site Web.
Utiliser les données de transaction pour optimiser l'ordre de l'inventaire :
- Analyser les données de transaction pour calculer le nombre total de ventes pour chaque article, à la fois par magasin et à l'échelle mondiale, sur une période donnée.
- Analyser les données d'inventaire pour calculer l'inventaire entrant pour chaque élément.
- Transmettre en continu ces données aux systèmes d'inventaire afin de pouvoir les utiliser pour la prise de décisions d'achats.
Valider les données entrantes et apporter des corrections, si possible. Écrire toutes les données non rectifiables dans une file d'attente de lettres mortes à des fins d'analyse et de traitement supplémentaires. Créer une métrique représentant le pourcentage de données entrantes envoyées à la file d'attente de lettres mortes et mettre cette métrique à disposition des outils de surveillance et de génération d'alertes.
Traiter toutes les données entrantes dans un format standard et les stocker dans un entrepôt de données afin de les utiliser ultérieurement à des fins d'analyse et de visualisation.
Dénormaliser les données de transaction pour les ventes en magasin afin qu'elles puissent inclure des informations telles que la latitude et la longitude de l'emplacement du magasin. Diffuser les informations sur le magasin via une table à évolution lente dans BigQuery, en utilisant l'ID du magasin en tant que clé.
Données
L'application traite les types de données suivants :
- Données de flux de clics envoyées par des systèmes en ligne à Pub/Sub
- Données de transaction envoyées par des systèmes sur site ou SaaS (Software as a Service) à Pub/Sub
- Données sur les stocks envoyées par des systèmes sur site ou SaaS à Pub/Sub
Modèles de tâche
L'application contient les modèles de tâches suivants, communs aux pipelines créés avec le SDK Apache Beam pour Java:
- Des schémas Apache Beam pour traiter des données structurées
JsonToRow
pour convertir les données JSON- Le générateur de code
AutoValue
pour générer des POJO (Plain Old Java Objects, anciens objets Java standards) - La mise en file d'attente des données impossibles à traiter en vue d'une analyse plus approfondie
- Des transformations de validation des données en série
DoFn.StartBundle
pour traiter les appels vers des services externes par micro-lots- Des modèles d'entrées secondaires
Coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.
Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.
Avant de commencer
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Créez un compte de service de nœud de calcul géré par l'utilisateur pour votre nouveau pipeline et attribuez à ce compte les rôles nécessaires.
Pour créer le compte de service, exécutez la commande
gcloud iam service-accounts create
:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Attribuez des rôles au compte de service. Exécutez la commande ci-dessous une fois pour chacun des rôles IAM suivants :
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Remplacez
SERVICE_ACCOUNT_ROLE
par chaque rôle individuel.Attribuez à votre compte Google un rôle qui vous permet de créer des jetons d'accès pour le compte de service :
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Si nécessaire, téléchargez et installez Gradle.
Créer les exemples de sources et de récepteurs
Cette section explique comment créer les éléments suivants :
- Un bucket Cloud Storage à utiliser comme emplacement de stockage temporaire
- Des sources de données en flux à l'aide de Pub/Sub
- Des ensembles de données pour charger les données dans BigQuery
- Instance Bigtable
Créer un bucket Cloud Storage
Commencez par créer un bucket Cloud Storage. Ce bucket sert d'emplacement de stockage temporaire pour le pipeline Dataflow.
Exécutez la commande gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Remplacez les éléments suivants :
- BUCKET_NAME : nom du bucket Cloud Storage répondant aux exigences de dénomination des buckets. Les noms des buckets Cloud Storage doivent être uniques.
- LOCATION : emplacement du bucket.
Créer les sujets et les abonnements Pub/Sub
Créez quatre sujets Pub/Sub, puis créez trois abonnements.
Pour créer vos sujets, exécutez la commande gcloud pubsub topics create
une fois pour chaque sujet. Pour savoir comment nommer un abonnement, consultez la section Consignes de dénomination d'un sujet ou d'un abonnement.
gcloud pubsub topics create TOPIC_NAME
Remplacez TOPIC_NAME par les valeurs suivantes, en exécutant la commande quatre fois, soit une fois pour chaque sujet :
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Pour créer un abonnement associé à votre sujet, exécutez la commande gcloud pubsub subscriptions create
une fois pour chaque abonnement :
Créez l'abonnement
Clickstream-inbound-sub
:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Créez l'abonnement
Transactions-inbound-sub
:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Créez l'abonnement
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Créer les ensembles de données et la table BigQuery
Créez un ensemble de données BigQuery et une table partitionnée avec le schéma approprié pour votre sujet Pub/Sub.
Utilisez la commande
bq mk
pour créer le premier ensemble de données.bq --location=US mk \ PROJECT_ID:Retail_Store
Créez le deuxième ensemble de données.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Utilisez l'instruction SQL CREATE TABLE pour créer une table avec un schéma et des données de test. Les données de test contiennent un magasin dont l'identifiant est
1
. Le modèle d'entrée secondaire à mise à jour lente utilise cette table.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Créer l'instance et la table Bigtable
Créez l'instance et la table Bigtable. Pour en savoir plus sur la création d'instances Bigtable, consultez la section Créer une instance.
Si nécessaire, exécutez la commande suivante pour installer la CLI
cbt
:gcloud components install cbt
Utilisez la commande
bigtable instances create
pour créer une instance :gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Remplacez CLUSTER_ZONE par la zone dans laquelle s'exécute le cluster.
Utilisez la commande
cbt createtable
pour créer une table permanente :cbt -instance=aggregate-tables createtable PageView5MinAggregates
Utilisez la commande suivante pour ajouter une famille de colonnes à la table :
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Exécuter le pipeline
Utilisez Gradle pour exécuter le pipeline de traitement par flux. Pour afficher le code Java utilisé par le pipeline, consultez la section RetailDataProcessingPipeline.java.
Exécutez la commande
git clone
pour cloner le dépôt GitHub :git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Accédez au répertoire de l'application :
cd dataflow-sample-applications/retail/retail-java-applications
Pour tester le pipeline, dans votre shell ou votre terminal, exécutez la commande suivante avec Gradle :
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Pour exécuter le pipeline, exécutez la commande suivante avec Gradle :
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Vous pouvez consulter le code source du pipeline sur GitHub.
Créer et exécuter les tâches Cloud Scheduler
Créez et exécutez trois tâches Cloud Scheduler : une qui publie les données du flux de clics, une pour les données d'inventaire et une pour les données de transaction. Cette étape génère des données d'exemple pour le pipeline.
Pour créer une tâche Cloud Scheduler pour ce tutoriel, utilisez la commande
gcloud scheduler jobs create
. Cette étape permet de créer un éditeur pour les données du flux de clics, qui publie un message par minute.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Pour démarrer la tâche Cloud Scheduler, utilisez la commande
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Créez et exécutez un autre éditeur similaire pour les données d'inventaire, qui publie un message toutes les deux minutes.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Démarrez la deuxième tâche Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventory
Créez et exécutez un troisième éditeur pour les données de transaction, qui publie un message toutes les deux minutes.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Démarrez la troisième tâche Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Afficher les résultats
Affichez les données écrites dans vos tables BigQuery. Vérifiez les résultats dans BigQuery en exécutant les requêtes suivantes. Pendant l'exécution de ce pipeline, de nouvelles lignes sont ajoutées chaque minute aux tables BigQuery.
Vous devrez peut-être attendre que les tables soient remplies avec des données.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Effectuer un nettoyage
Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.
Supprimer le projet
Le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Google Cloud que vous avez créé pour le tutoriel.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Supprimer les ressources individuelles
Si vous souhaitez réutiliser le projet, supprimez les ressources que vous avez créées pour le tutoriel.
Nettoyer les ressources du projet Google Cloud
Pour supprimer les jobs Cloud Scheduler, utilisez la commande
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Pour supprimer les abonnements et les sujets Pub/Sub, utilisez les commandes
gcloud pubsub subscriptions delete
etgcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Pour supprimer la table BigQuery, utilisez la commande
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Supprimer les ensembles de données BigQuery L'ensemble de données seul ne génère aucuns frais.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Pour supprimer l'instance Bigtable, utilisez la commande
cbt deleteinstance
. Le bucket seul ne génère aucuns frais.cbt deleteinstance aggregate-tables
Pour supprimer le bucket Cloud Storage, utilisez la commande
gcloud storage rm
. Le bucket seul ne génère aucuns frais.gcloud storage rm gs://BUCKET_NAME --recursive
Révoquer les identifiants
Révoquez les rôles que vous avez accordés au compte de service de nœud de calcul géré par l'utilisateur. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Étapes suivantes
- Consultez l'exemple d'application sur GitHub.
- Lisez l'article de blog associé Découvrir les modèles Beam avec le traitement des flux de clics pour les données Google Tag Manager.
- Découvrez comment utiliser Pub/Sub pour créer et utiliser des sujets et utiliser des abonnements.
- Découvrez comment créer des ensembles de données à l'aide de BigQuery.
- Découvrez des architectures de référence, des schémas et des bonnes pratiques concernant Google Cloud. Consultez notre Cloud Architecture Center.