In dieser Anleitung erstellen Sie eine Dataflow-Streamingpipeline, die E-Commerce-Daten aus Pub/Sub-Themen und -Abos transformiert und die Daten in BigQuery und Bigtable ausgibt. Für diese Anleitung ist Gradle erforderlich.
Die Anleitung bietet eine durchgängige E-Commerce-Beispielanwendung, die Daten aus einem Webstore an BigQuery und Bigtable streamt. Die Beispielanwendung veranschaulicht gängige Anwendungsfälle und Best Practices für die Implementierung von Streamingdatenanalysen und KI in Echtzeit. In dieser Anleitung erfahren Sie, wie Sie dynamisch auf Kundenaktionen reagieren, um Ereignisse in Echtzeit zu analysieren und auf sie zu reagieren. In dieser Anleitung erfahren Sie, wie Sie Ereignisdaten speichern, analysieren und visualisieren, um mehr über das Kundenverhalten zu erfahren.
Die Beispielanwendung steht auf GitHub zur Verfügung. Zum Ausführen dieser Anleitung mit Terraform folgen Sie den Schritten in der Beispielanwendung auf GitHub.
Ziele
- Die Validierung eingehender Daten und Korrekturen, falls sie möglich sind.
- Die Analyse von Clickstream-Daten, um die Anzahl von Aufrufen pro Produkt in einem bestimmten Zeitraum festzustellen. Speichern Sie diese Informationen in einem Speicher mit niedriger Latenz. Die Anwendung kann dann die Daten verwenden, um Kunden auf der Website Nachrichten vom Typ Anzahl von Nutzern, dieses Produkt angesehen haben zu präsentieren.
Verwenden Sie Transaktionsdaten, um die Inventarreihenfolge festzulegen:
- Analysieren Sie Transaktionsdaten, um die Gesamtzahl von Verkäufen für jeden Artikel sowohl im Geschäft als auch weltweit für einen bestimmten Zeitraum zu berechnen.
- Analysieren Sie Inventardaten, um das eingehende Inventar für jeden Artikel zu berechnen.
- Übergeben Sie diese Daten kontinuierlich an Inventarsysteme, damit sie für Entscheidungen zum Inventarkauf verwendet werden können.
Die Validierung eingehender Daten und Korrekturen, falls sie möglich sind. Schreiben Sie nicht korrigierbare Daten zur weiteren Analyse und Verarbeitung in eine Dead-Letter-Warteschlange. Erstellen Sie einen Messwert, der den Prozentsatz der eingehenden Daten darstellt, die an die Dead-Letter-Warteschlange für Monitoring und Benachrichtigungen gesendet werden.
Verarbeiten Sie alle eingehenden Daten in einem Standardformat und speichern Sie sie in einem Data Warehouse, um sie für Analysen und Visualisierungen zu verwenden.
Denormalisieren Sie Transaktionsdaten für Ladenverkäufe, sodass sie Informationen wie Breiten- und Längengrad des Geschäftsstandorts enthalten können. Stellen Sie die Speicherinformationen über eine sich langsam ändernde Tabelle in BigQuery bereit, wobei Sie die Geschäfts-ID als Schlüssel verwenden.
Daten
Die Anwendung verarbeitet die folgenden Datentypen:
- Clickstream-Daten, die von Onlinesystemen an Pub/Sub gesendet werden
- Transaktionsdaten, die von lokalen oder SaaS-Systemen (Software as a Service) an Pub/Sub gesendet werden
- Lagerdaten, die von lokalen oder SaaS-Systemen an Pub/Sub gesendet werden
Aufgabenmuster
Die Anwendung enthält die folgenden Aufgabenmuster, die häufig mit Pipelines verwendet werden, die mit dem Apache Beam SDK für Java erstellt wurden:
- Über Apache Beam-Schemas mit strukturierten Daten arbeiten
JsonToRow
zum Konvertieren von JSON-Daten- Die
AutoValue
Codegenerator zum Generieren von einfachen alten Java-Objekten (POJOs) - Nicht verarbeitbare Daten zur weiteren Analyse in einer Warteschlange bereitstellen
- Transformationen der seriellen Datenvalidierung
DoFn.StartBundle
für Mikro-Batch-Aufrufe von externen Diensten- Nebeneingabemuster
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Erstellen Sie ein nutzerverwaltetes Worker-Dienstkonto für Ihre neue Pipeline und weisen Sie dem Dienstkonto die erforderlichen Rollen zu.
Führen Sie den Befehl
gcloud iam service-accounts create
aus, um das Dienstkonto zu erstellen.gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Ersetzen Sie
SERVICE_ACCOUNT_ROLE
durch jede einzelne Rolle.Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie Zugriffstokens für das Dienstkonto erstellen können:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Laden Sie bei Bedarf Gradle herunter und installieren Sie es.
Beispielquellen und -senken erstellen
In diesem Abschnitt wird Folgendes erstellt:
- Ein Cloud Storage-Bucket, der als temporärer Speicherort verwendet werden soll
- Datenquellen mit Pub/Sub streamen
- Datasets zum Laden der Daten in BigQuery
- Eine Bigtable-Instanz
Cloud Storage-Bucket erstellen
Erstellen Sie zuerst einen Cloud Storage-Bucket. Dieser Bucket wird von der Dataflow-Pipeline als temporärer Speicherort verwendet.
Führen Sie den Befehl gcloud storage buckets create
aus:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Ersetzen Sie Folgendes:
- BUCKET_NAME: Ein Name für Ihren Cloud Storage-Bucket, der den Anforderungen für Bucket-Namen entspricht. Cloud Storage-Bucket-Namen müssen global einmalig sein.
- LOCATION: der Speicherort für den Bucket.
Pub/Sub-Themen und -Abos erstellen
Erstellen Sie vier Pub/Sub-Themen und erstellen Sie dann drei Abos.
Führen Sie zum Erstellen der Themen den Befehl gcloud pubsub topics create
für jedes Thema einmal aus. Informationen zum Benennen eines Abos finden Sie unter Richtlinien für die Benennung eines Themas oder Abos.
gcloud pubsub topics create TOPIC_NAME
Ersetzen Sie TOPIC_NAME durch die folgenden Werte und führen Sie den Befehl viermal einmal für jedes Thema aus:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Führen Sie den Befehl gcloud pubsub subscriptions create
für jedes Abo einmal aus, um ein Abo für Ihr Thema zu erstellen:
Ein
Clickstream-inbound-sub
-Abo erstellen:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Ein
Transactions-inbound-sub
-Abo erstellen:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Ein
Inventory-inbound-sub
-Abo erstellen:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
BigQuery-Datasets und -Tabelle erstellen
Erstellen Sie ein BigQuery-Dataset und eine partitionierte Tabelle mit dem entsprechenden Schema für Ihr Pub/Sub-Thema.
Verwenden Sie den Befehl
bq mk
, um das erste Dataset zu erstellen.bq --location=US mk \ PROJECT_ID:Retail_Store
Erstellen Sie das zweite Dataset.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Verwenden Sie die SQL-Anweisung CREATE TABLE, um eine Tabelle mit einem Schema und Testdaten zu erstellen. Die Testdaten haben einen Speicher mit dem ID-Wert
1
. Das Nebeneingabemuster für langsame Aktualisierungen verwendet diese Tabelle.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Bigtable-Instanz und -Tabelle erstellen
Bigtable-Instanz und -Tabelle erstellen Weitere Informationen zum Erstellen von Bigtable-Instanzen finden Sie unter Instanz erstellen.
Führen Sie bei Bedarf den folgenden Befehl aus, um die
cbt
-Befehlszeile zu installieren:gcloud components install cbt
Verwenden Sie den Befehl
bigtable instances create
, um eine Instanz zu erstellen:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Ersetzen Sie CLUSTER_ZONE durch die Zone, in der der Cluster ausgeführt wird.
Zum Erstellen einer permanente Tabelle verwenden Sie den Befehl
cbt createtable
.cbt -instance=aggregate-tables createtable PageView5MinAggregates
Verwenden Sie den folgenden Befehl, um eine Spaltenfamilie der Tabelle hinzuzufügen:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Pipeline ausführen
Führen Sie mit Gradle eine Streaming-Pipeline aus. Informationen zum von der Pipeline verwendeten Java-Code finden Sie unter RetailDataProcessingPipeline.java.
Klonen Sie das GitHub-Repository mit dem Befehl
git clone
:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Wechseln Sie in das Anwendungsverzeichnis:
cd dataflow-sample-applications/retail/retail-java-applications
Führen Sie zum Testen der Pipeline in der Shell oder dem Terminal den folgenden Befehl mit Gradle aus:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Führen Sie mit Gradle den folgenden Befehl aus, um die Pipeline auszuführen:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Sehen Sie sich den Pipeline-Quellcode auf GitHub an.
Cloud Scheduler-Jobs erstellen und ausführen
Erstellen und führen Sie drei Cloud Scheduler-Jobs aus, einen, der Clickstream-Daten veröffentlicht, einen für Inventardaten und einen für Transaktionsdaten. Mit diesem Schritt werden Beispieldaten für die Pipeline generiert.
Verwenden Sie den Befehl
gcloud scheduler jobs create
, um einen Cloud Scheduler-Job für diese Anleitung zu erstellen. Mit diesem Schritt wird ein Publisher für Clickstream-Daten erstellt, der eine Nachricht pro Minute veröffentlicht.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Verwenden Sie den Befehl
gcloud scheduler jobs run
, um den Cloud Scheduler-Job zu starten.gcloud scheduler jobs run --location=LOCATION clickstream
Einen weiteren ähnlichen Publisher für Inventardaten erstellen und ausführen, die alle zwei Minuten eine Nachricht veröffentlicht.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Starten Sie den zweiten Cloud Scheduler-Job.
gcloud scheduler jobs run --location=LOCATION inventory
Erstellen und führen Sie einen dritten Publisher für Transaktionsdaten aus, die alle zwei Minuten eine Nachricht veröffentlichen.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Starten Sie den dritten Cloud Scheduler-Job.
gcloud scheduler jobs run --location=LOCATION transactions
Ergebnisse ansehen
Sehen Sie sich die Daten an, die in Ihren BigQuery-Tabellen geschrieben wurden. Prüfen Sie die Ergebnisse in BigQuery mit den folgenden Abfragen: Während diese Pipeline ausgeführt wird, können Sie jede Minute neue Zeilen an die BigQuery-Tabellen anhängen.
Möglicherweise müssen Sie warten, bis die Tabellen mit Daten gefüllt sind.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Projekt löschen
Am einfachsten können Sie weitere Kosten vermeiden, wenn Sie das Google Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Einzelne Ressourcen löschen
Wenn Sie das Projekt wiederverwenden möchten, löschen Sie die für die Anleitung erstellten Ressourcen.
Google Cloud-Projektressourcen bereinigen
Verwenden Sie den Befehl
gcloud scheduler jobs delete
, um die Cloud Scheduler-Jobs zu löschen.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Verwenden Sie zum Löschen der Pub/Sub-Abos und -Themen die Befehle
gcloud pubsub subscriptions delete
undgcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Verwenden Sie zum Löschen der BigQuery-Tabelle den Befehl
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
BigQuery-Datasets löschen. Das Dataset allein verursacht keine Gebühren.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Verwenden Sie zum Löschen der Bigtable-Instanz den Befehl
cbt deleteinstance
. Für den Bucket fallen keine Gebühren an.cbt deleteinstance aggregate-tables
Verwenden Sie den Befehl
gcloud storage rm
, um den Cloud Storage-Bucket zu löschen. Für den Bucket fallen keine Gebühren an.gcloud storage rm gs://BUCKET_NAME --recursive
Anmeldedaten entfernen
Widerrufen Sie die Rollen, die Sie dem nutzerverwalteten Worker-Dienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Nächste Schritte
- Beispielanwendung auf GitHub ansehen
- Blogpost zu Beam-Mustern mit Clickstream-Verarbeitung von Google Tag Manager-Daten
- Verwendung von Pub/Sub zum Erstellen und Verwenden von Themen sowie zum Verwenden von Abos
- Datasets mit BigQuery erstellen
- Referenzarchitekturen, Diagramme und Best Practices zu Google Cloud kennenlernen. Weitere Informationen zu Cloud Architecture Center