In questo tutorial crei una pipeline di streaming Dataflow che trasforma i dati di e-commerce da argomenti e iscrizioni Pub/Sub e li esporta in BigQuery e Bigtable. Questo tutorial richiede Gradle.
Il tutorial fornisce un'applicazione di esempio di e-commerce end-to-end che streamma i dati da un negozio web a BigQuery e Bigtable. L'applicazione di esempio illustra casi d'uso comuni e best practice per l'implementazione dell'analisi dei flussi di dati e dell'intelligenza artificiale (IA) in tempo reale. Usa questo tutorial per imparare come rispondere in modo dinamico alle azioni del cliente per analizzare e reagire alle eventi in tempo reale. Questo tutorial descrive come archiviare, analizzare e visualizzare i dati sugli eventi per ottenere informazioni più dettagliate sul comportamento dei clienti.
L'applicazione di esempio è disponibile su GitHub. Per eseguire questo tutorial utilizzando Terraform, segui i passaggi forniti con l'applicazione di esempio su GitHub.
Obiettivi
- Convalida i dati in entrata e applica le correzioni, se possibile.
- Analizza i dati dei flussi di clic per contare il numero di visualizzazioni per prodotto in un determinato periodo di tempo. Archivia queste informazioni in uno spazio di archiviazione a bassa latenza. L'applicazione può quindi utilizzare i dati per fornire il numero di persone che hanno visualizzato questo prodotto i messaggi ai clienti sul sito web.
Utilizza i dati delle transazioni per definire l'ordine di inventario:
- Analizza i dati sulle transazioni per calcolare il numero totale di vendite per ogni elemento, sia per negozio che a livello globale, per un determinato periodo.
- Analizza i dati di inventario per calcolare l'inventario in entrata per ogni articolo.
- Trasmettere costantemente questi dati ai sistemi di inventario in modo che possano essere utilizzate per le decisioni di acquisto dell'inventario.
Convalida i dati in entrata e applica le correzioni, se possibile. Scrivi i dati non correggibili in una coda dead-letter per ulteriori analisi e elaborazione. Crea una metrica che rappresenti la percentuale di dati in entrata che viene inviato alla coda dei messaggi non recapitabili disponibile per il monitoraggio e gli avvisi.
Elabora tutti i dati in entrata in un formato standard e memorizzali in un data warehouse da utilizzare per analisi e visualizzazioni future.
Denormalizza i dati sulle transazioni per le vendite in negozio in modo che possano includere informazioni come la latitudine e la longitudine della sede del negozio. Fornisci le informazioni del datastore attraverso una tabella che cambia lentamente in BigQuery, utilizzando l'ID negozio come chiave.
Dati
L'applicazione elabora i seguenti tipi di dati:
- Dati dei flussi di clic inviati dai sistemi online a Pub/Sub.
- Dati sulle transazioni inviati da on-premise o da Software as a Service (SaaS) in Pub/Sub.
- Dati stock inviati da on-premise o SaaS in Pub/Sub.
Pattern di attività
L'applicazione contiene i seguenti pattern di attività comuni alle pipeline create con SDK Apache Beam per Java:
- Schemi Apache Beam per funzionare con i dati strutturati
JsonToRow
per convertire i dati JSON- Il generatore di codice
AutoValue
per generare POJO (Plain Old Java Object) - Coda di dati non elaborabili per ulteriori analisi
- Trasformazioni di convalida dei dati seriali
DoFn.StartBundle
per effettuare micro-batch delle chiamate a servizi esterni- Pattern di input laterale
Costi
In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Per generare una stima dei costi basata sull'utilizzo previsto,
utilizza il Calcolatore prezzi.
Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.
Prima di iniziare
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Crea un account di servizio gestito dall'utente per la nuova pipeline e concedi all'account di servizio i ruoli necessari.
Per creare l'account di servizio, esegui il comando
gcloud iam service-accounts create
:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Concedi i ruoli all'account di servizio. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Sostituisci
SERVICE_ACCOUNT_ROLE
con ogni singolo ruolo.Concedi al tuo Account Google un ruolo che ti consenta di creare token di accesso per l'account di servizio:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Se necessario, scarica e installa Gradle.
Crea origini e sink di esempio
Questa sezione spiega come creare quanto segue:
- Un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea
- Origini dati in modalità flusso con Pub/Sub
- Set di dati per caricare i dati in BigQuery
- Un'istanza Bigtable
Crea un bucket Cloud Storage
Per iniziare, crea un bucket Cloud Storage. Questo bucket viene utilizzato come posizione di archiviazione temporanea Dataflow.
Utilizza il
comando gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Sostituisci quanto segue:
- BUCKET_NAME: un nome per il tuo Cloud Storage che soddisfa i requisiti requisiti di denominazione dei bucket. I nomi dei bucket Cloud Storage devono essere univoci a livello globale.
- LOCATION: la posizione del bucket.
Creare argomenti e sottoscrizioni Pub/Sub
Crea quattro argomenti Pub/Sub e poi tre sottoscrizioni.
Per creare gli argomenti, esegui il comando
gcloud pubsub topics create
una volta per ogni argomento. Per informazioni su come assegnare un nome a una sottoscrizione, vedi
Linee guida per denominare un argomento o una sottoscrizione.
gcloud pubsub topics create TOPIC_NAME
Sostituisci TOPIC_NAME con i seguenti valori, eseguendo il comando quattro volte, una volta per ogni argomento:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Per creare una sottoscrizione all'argomento, esegui il comando
gcloud pubsub subscriptions create
una volta per ogni sottoscrizione:
Crea un abbonamento
Clickstream-inbound-sub
:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Crea una sottoscrizione
Transactions-inbound-sub
:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Crea una sottoscrizione
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
crea set di dati e tabella BigQuery
crea un set di dati BigQuery e tabella partizionata con lo schema appropriato per il tuo argomento Pub/Sub.
Utilizza la
bq mk
per creare il primo set di dati.bq --location=US mk \ PROJECT_ID:Retail_Store
Crea il secondo set di dati.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Utilizza l'istruzione SQL CREATE TABLE per creare una tabella con uno schema e dati di test. I dati di test hanno un negozio con un valore ID pari a
1
. Input lato aggiornamento lento pattern utilizza questa tabella.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Creare un'istanza e una tabella Bigtable
Crea un'istanza e una tabella Bigtable. Per ulteriori informazioni sulla creazione di istanze Bigtable, consulta Crea un'istanza.
Se necessario, esegui questo comando per installare l'interfaccia a riga di comando
cbt
:gcloud components install cbt
Utilizza il comando
bigtable instances create
per creare un'istanza:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Sostituisci CLUSTER_ZONE con la zona in cui viene eseguito il cluster.
Utilizza il comando
cbt createtable
per creare una tabella:cbt -instance=aggregate-tables createtable PageView5MinAggregates
Utilizza il seguente comando per aggiungere una famiglia di colonne alla tabella:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
esegui la pipeline.
Utilizza Gradle per eseguire una pipeline in modalità flusso. Per visualizzare il codice Java utilizzato dalla pipeline, consulta RetailDataProcessingPipeline.java.
Utilizza il comando
git clone
per clonare il repository GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Passa alla directory dell'applicazione:
cd dataflow-sample-applications/retail/retail-java-applications
Per testare la pipeline, nella shell o nel terminale, esegui il seguente comando utilizzando Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Per eseguire la pipeline, esegui questo comando utilizzando Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Consulta il codice sorgente della pipeline su GitHub.
Crea ed esegui job Cloud Scheduler
Crea ed esegui tre job Cloud Scheduler, uno che pubblica i dati dei clickstream, uno per i dati di inventario e uno per i dati delle transazioni. Questo passaggio genera un campione per la pipeline.
Per creare un job Cloud Scheduler per questo tutorial, utilizza il comando
gcloud scheduler jobs create
. Questo passaggio crea un publisher per i dati dei flussi di clic che pubblica un messaggio al minuto.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Per avviare il job Cloud Scheduler, utilizza la
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Crea e pubblica un altro publisher simile per i dati di inventario che pubblica un messaggio ogni due minuti.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Avviare il secondo job di Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventory
Crea e pubblica un terzo publisher per i dati sulle transazioni che pubblica un messaggio ogni due minuti.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Avviare il terzo job di Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Visualizza i tuoi risultati
Visualizza i dati scritti nelle tabelle BigQuery. Controlla i risultati in a BigQuery eseguendo le seguenti query. Mentre questa pipeline è in esecuzione, puoi vedere nuove righe aggiunte alla di tabelle BigQuery al minuto.
Potresti dover attendere che le tabelle vengano completate con i dati.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.
Elimina il progetto
Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per il tutorial.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Elimina le singole risorse
Se vuoi riutilizzare il progetto, elimina le risorse che hai creato per il tutorial.
Esegui la pulizia delle risorse del progetto Google Cloud
Per eliminare i job Cloud Scheduler, utilizza
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Per eliminare le sottoscrizioni e gli argomenti Pub/Sub, utilizza
gcloud pubsub subscriptions delete
e aigcloud pubsub topics delete
tramite comandi SQL.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Per eliminare la tabella BigQuery, utilizza il comando
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Elimina i set di dati BigQuery. Il set di dati da solo non comporta costi.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Per eliminare l'istanza Bigtable, utilizza Comando
cbt deleteinstance
. Il bucket da solo non comporta alcun addebito.cbt deleteinstance aggregate-tables
Per eliminare il bucket Cloud Storage, utilizza la classe Comando
gcloud storage rm
. Il bucket da solo non prevede costi.gcloud storage rm gs://BUCKET_NAME --recursive
Revocare le credenziali
Revoca i ruoli che hai concesso all'account di servizio worker gestito dall'utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Passaggi successivi
- Visualizza l'applicazione di esempio su GitHub.
- Leggi il post del blog correlato Scopri i pattern di Beam con l'elaborazione dei dati di Google Tag Manager dei clickstream.
- Scopri come utilizzare Pub/Sub per creare e utilizzare gli argomenti e per utilizzare le sottoscrizioni.
- Scopri di più sull'utilizzo di BigQuery per creare set di dati.
- Esplora le architetture di riferimento, i diagrammi e le best practice su Google Cloud. Dai un'occhiata al nostro Centro architetture cloud.