In questo tutorial creerai una pipeline di inserimento flussi di Dataflow che trasforma i dati di e-commerce da argomenti e sottoscrizioni Pub/Sub e restituisce i dati in BigQuery e Bigtable. Questo tutorial richiede Gradle.
Il tutorial fornisce un'applicazione di esempio per l'e-commerce end-to-end che trasmette i flussi di dati da un webstore a BigQuery e Bigtable. L'applicazione di esempio illustra casi d'uso comuni e best practice per implementare l'analisi dei flussi di dati e l'intelligenza artificiale (AI) in tempo reale. Utilizza questo tutorial per imparare a rispondere in modo dinamico alle azioni dei clienti per analizzare e reagire agli eventi in tempo reale. Questo tutorial descrive come archiviare, analizzare e visualizzare i dati sugli eventi per ottenere più informazioni sul comportamento dei clienti.
L'applicazione di esempio è disponibile su GitHub. Per eseguire questo tutorial con Terraform, segui i passaggi forniti con l'applicazione di esempio su GitHub.
Obiettivi
- Convalida i dati in arrivo e, ove possibile, applica le correzioni.
- Analizza i dati dei flussi di clic per tenere un conteggio del numero di visualizzazioni per prodotto in un determinato periodo di tempo. Archivia queste informazioni in un archivio a bassa latenza. L'applicazione può quindi utilizzare i dati per fornire ai clienti messaggi sul numero di persone che hanno visualizzato questo prodotto sul sito web.
Utilizza i dati delle transazioni per effettuare l'ordine dell'inventario:
- Analizza i dati delle transazioni per calcolare il numero totale di vendite per ogni articolo, sia per negozio che a livello globale, in un determinato periodo.
- Analizza i dati dell'inventario per calcolare l'inventario in arrivo per ogni articolo.
- Trasmetti in modo continuativo questi dati ai sistemi di inventario, in modo che possano essere utilizzati per le decisioni di acquisto dell'inventario.
Convalida i dati in arrivo e, ove possibile, applica le correzioni. Scrivi dati non correggibili in una coda di messaggi non recapitabili per ulteriori analisi ed elaborazioni. Crea una metrica che rappresenti la percentuale di dati in entrata che vengono inviati alla coda messaggi non recapitabili disponibile per il monitoraggio e gli avvisi.
Elabora tutti i dati in entrata in un formato standard e archiviali in un data warehouse da utilizzare per analisi e visualizzazioni future.
Denormalizzare i dati sulle transazioni per le vendite in negozio in modo che possano includere informazioni come la latitudine e la longitudine della sede del negozio. Fornisci le informazioni sul negozio tramite una tabella che cambia lentamente in BigQuery, utilizzando l'ID negozio come chiave.
Dati
L'applicazione elabora i seguenti tipi di dati:
- Dati del flusso di clic inviati dai sistemi online a Pub/Sub.
- Dati sulle transazioni inviati da sistemi on-premise o da sistemi SaaS (Software as a Service) a Pub/Sub.
- Dati di stock inviati da sistemi on-premise o SaaS a Pub/Sub.
Pattern delle attività
L'applicazione contiene i seguenti pattern di attività comuni alle pipeline create con l'SDK Apache Beam per Java:
- Schema di Apache Beam per il funzionamento con i dati strutturati
JsonToRow
per convertire i dati JSON- Il generatore di codice
AutoValue
per generare POJO (Semplici Oggetti Java) - Coda dei dati non elaborabili per ulteriori analisi
- Trasformazioni della convalida dei dati in serie
DoFn.StartBundle
per chiamate in micro-batch a servizi esterni- Pattern di input laterale
Costi
In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il Calcolatore prezzi.
Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.
Prima di iniziare
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler.
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni singolo ruolo.
- Sostituisci
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler.
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni singolo ruolo.
- Sostituisci
Crea un account di servizio worker gestito dall'utente per la nuova pipeline e concedi i ruoli necessari all'account di servizio.
Per creare l'account di servizio, esegui il comando
gcloud iam service-accounts create
:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Concedi ruoli all'account di servizio. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Sostituisci
SERVICE_ACCOUNT_ROLE
con ogni singolo ruolo.Concedi al tuo Account Google un ruolo che ti consente di creare token di accesso per l'account di servizio:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Se necessario, scarica e installa Gradle.
Crea le origini e i sink di esempio
In questa sezione viene spiegato come creare i seguenti elementi:
- Un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea
- Streaming delle origini dati con Pub/Sub
- Set di dati per caricare i dati in BigQuery
- Un'istanza Bigtable
crea un bucket Cloud Storage
Per iniziare, crea un bucket Cloud Storage. Questo bucket viene utilizzato come località di archiviazione temporanea dalla pipeline Dataflow.
Utilizza il comando gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Sostituisci quanto segue:
- BUCKET_NAME: un nome per il bucket Cloud Storage che soddisfa i requisiti di denominazione dei bucket. I nomi dei bucket Cloud Storage devono essere univoci a livello globale.
- LOCATION: la località del bucket.
crea argomenti e sottoscrizioni Pub/Sub
Creare quattro argomenti Pub/Sub e quindi tre sottoscrizioni.
Per creare gli argomenti, esegui il comando gcloud pubsub topics create
una volta per ogni argomento. Per informazioni su come assegnare un nome a una sottoscrizione, consulta le linee guida per assegnare un nome a un argomento o a una sottoscrizione.
gcloud pubsub topics create TOPIC_NAME
Sostituisci TOPIC_NAME con i valori seguenti, eseguendo il comando quattro volte, una volta per ogni argomento:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Per creare una sottoscrizione all'argomento, esegui il comando gcloud pubsub subscriptions create
una volta per ogni sottoscrizione:
Crea una sottoscrizione
Clickstream-inbound-sub
:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Crea una sottoscrizione
Transactions-inbound-sub
:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Crea una sottoscrizione
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Crea set di dati e tabelle BigQuery
Crea un set di dati BigQuery e una tabella partizionata con lo schema appropriato per il tuo argomento Pub/Sub.
Usa il comando
bq mk
per creare il primo set di dati.bq --location=US mk \ PROJECT_ID:Retail_Store
Crea il secondo set di dati.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Utilizza l'istruzione SQL CREATE TABLE per creare una tabella con uno schema e dati di test. I dati di test hanno un archivio con un valore ID di
1
. Il pattern di input lato aggiornamento lento utilizza questa tabella.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
crea un'istanza e una tabella Bigtable
Creare un'istanza e una tabella Bigtable. Per ulteriori informazioni sulla creazione di istanze Bigtable, consulta Creazione di un'istanza.
Se necessario, esegui questo comando per installare l'interfaccia a riga di comando
cbt
:gcloud components install cbt
Utilizza il comando
bigtable instances create
per creare un'istanza:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Sostituisci CLUSTER_ZONE con la zona in cui viene eseguito il cluster.
Utilizza il comando
cbt createtable
per creare una tabella:cbt -instance=aggregate-tables createtable PageView5MinAggregates
Utilizza il seguente comando per aggiungere una famiglia di colonne alla tabella:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
esegui la pipeline.
Utilizza Gradle per eseguire una pipeline in modalità flusso. Per visualizzare il codice Java utilizzato dalla pipeline, consulta RetailDataProcessingPipeline.java.
Usa il comando
git clone
per clonare il repository GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Passa alla directory delle applicazioni:
cd dataflow-sample-applications/retail/retail-java-applications
Per testare la pipeline, nella shell o nel terminale esegui questo comando utilizzando Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Per eseguire la pipeline, esegui il comando seguente utilizzando Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Consulta il codice sorgente della pipeline su GitHub.
Crea ed esegui job Cloud Scheduler
Creare ed eseguire tre job Cloud Scheduler: uno per la pubblicazione di dati di clickstream, uno per i dati di inventario e uno per i dati delle transazioni. Questo passaggio genera dati di esempio per la pipeline.
Per creare un job Cloud Scheduler per questo tutorial, utilizza il comando
gcloud scheduler jobs create
. Questo passaggio crea un publisher per i dati dei flussi di clic che pubblicano un messaggio al minuto.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Per avviare il job Cloud Scheduler, utilizza il comando
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Creare ed eseguire un altro publisher simile per i dati di inventario che pubblichi un messaggio ogni due minuti.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Avvia il secondo job Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventory
Creare ed eseguire un terzo editore per i dati delle transazioni che pubblica un messaggio ogni due minuti.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Avvia il terzo job di Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Visualizza i tuoi risultati
Visualizza i dati scritti nelle tue tabelle BigQuery. Controlla i risultati in BigQuery eseguendo le seguenti query. Mentre questa pipeline è in esecuzione, puoi vedere ogni minuto nuove righe aggiunte alle tabelle BigQuery.
Potresti dover attendere che le tabelle vengano compilate con i dati.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Esegui la pulizia
Per evitare che al tuo Account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.
Elimina il progetto
Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per il tutorial.
- Nella console Google Cloud, vai alla pagina Gestisci risorse.
- Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare, quindi fai clic su Elimina.
- Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.
Elimina le singole risorse
Se vuoi riutilizzare il progetto, elimina le risorse che hai creato per il tutorial.
Esegui la pulizia delle risorse di progetto Google Cloud
Per eliminare i job Cloud Scheduler, utilizza il comando
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Per eliminare le sottoscrizioni e gli argomenti Pub/Sub, utilizza i comandi
gcloud pubsub subscriptions delete
egcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Per eliminare la tabella BigQuery, utilizza il comando
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Elimina i set di dati BigQuery. Il set di dati da solo non prevede alcun addebito.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Per eliminare l'istanza Bigtable, utilizza il comando
cbt deleteinstance
. Il bucket da solo non prevede alcun addebito.cbt deleteinstance aggregate-tables
Per eliminare il bucket Cloud Storage, utilizza il comando
gcloud storage rm
. Il bucket da solo non prevede alcun addebito.gcloud storage rm gs://BUCKET_NAME --recursive
Revocare le credenziali
Revoca i ruoli che hai concesso all'account di servizio worker gestito dall'utente. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.
gcloud auth application-default revoke
-
Facoltativo: revoca le credenziali dallgcloud CLI.
gcloud auth revoke
Passaggi successivi
- Visualizza l'applicazione di esempio su GitHub.
- Leggi il post del blog correlato Scopri i pattern Beam con l'elaborazione clickstream dei dati di Google Tag Manager.
- Scopri come utilizzare Pub/Sub per creare e utilizzare argomenti e utilizzare le sottoscrizioni.
- Scopri come utilizzare BigQuery per creare set di dati.
- Esplora le architetture di riferimento, i diagrammi e le best practice su Google Cloud. Visita il nostro Cloud Architecture Center.