在本教程中,您将创建一个 Dataflow 流处理流水线,该流水线会转换来自 Pub/Sub 主题和订阅的电子商务数据,并将数据输出到 BigQuery 和 Bigtable。本教程需要使用 Gradle。
本教程提供了一个端到端的电子商务示例应用,该应用可将来自网上商店的数据流式传输到 BigQuery 和 Bigtable。 示例应用展示了实现流式数据分析和实时人工智能 (AI) 的常见使用场景和最佳实践。使用本教程学习如何动态响应客户操作,以便实时分析和应对事件。本教程介绍了如何存储、分析和直观呈现事件数据,以深入了解客户行为。
示例应用可在 GitHub 上找到。如需使用 Terraform 运行本教程,请按照 GitHub 上的示例应用提供的步骤操作。
目标
- 验证传入的数据,并尽可能对其应用更正。
- 分析点击流数据以保持每个产品在指定时间段内的查看次数计数。请将此信息存储在低延时存储区中。 然后,应用可以使用这些数据向网站上的客户提供“该产品的查看人数”消息。
使用交易数据告知库存排序:
- 分析交易数据,以按全球的商店来计算每个商品在指定时间段内的总销售量。
- 分析库存数据以计算每个商品的传入的库存。
- 将此数据连续传递给库存系统,以便可以使用它做出库存购买决策。
验证传入的数据,并尽可能对其应用更正。 将所有不可更正的数据写入死信队列,以进行其他分析和处理。创建一个指标,表示发送到死信队列以进行监控和提醒的传入数据百分比。
将所有传入数据处理为标准格式,并将其存储在数据仓库中以供将来的分析和可视化使用。
对门店销售的交易数据进行反规范化,以便可以包含商店位置的纬度和经度等信息。将商店 ID 用作键,在 BigQuery 中通过缓慢变化的表提供商店信息。
数据
该应用会处理以下类型的数据:
- 在线系统发送到 Pub/Sub 的点击流数据。
- 本地系统或软件即服务 (SaaS) 系统发送到 Pub/Sub 的交易数据。
- 本地系统或 SaaS 系统发送到 Pub/Sub 的库存数据。
任务模式
该应用包含使用 Java 版 Apache Beam SDK 构建的流水线中常见的任务模式:
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
为新流水线创建用户管理的工作器服务账号,并向服务账号授予必要的角色。
如需创建服务账号,请运行
gcloud iam service-accounts create
命令。gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
向服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
将
SERVICE_ACCOUNT_ROLE
替换为每个角色。为您的 Google 账号授予一个可让您为服务账号创建访问令牌的角色:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- 如有需要,请下载并安装 Gradle。
创建示例来源和接收器
本部分介绍如何创建以下内容:
- 用作临时存储位置的 Cloud Storage 存储桶
- 使用 Pub/Sub 的流式数据源
- 用于将数据加载到 BigQuery 中的数据集
- Bigtable 实例
创建 Cloud Storage 存储桶
首先创建一个 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。
使用 gcloud storage buckets create
命令:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
替换以下内容:
创建 Pub/Sub 主题和订阅
创建四个 Pub/Sub 主题,然后创建三个订阅。
如要创建主题,请为每个主题运行一次 gcloud pubsub topics create
命令。如需了解如何命名订阅,请参阅主题或订阅命名指南。
gcloud pubsub topics create TOPIC_NAME
将 TOPIC_NAME 替换为以下值,对每个主题运行该命令一次,即总共四次:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
如需创建对主题的订阅,请为每个订阅运行一次 gcloud pubsub subscriptions create
命令:
创建
Clickstream-inbound-sub
订阅:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
创建
Transactions-inbound-sub
订阅:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
创建
Inventory-inbound-sub
订阅:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
创建 BigQuery 数据集和表
为 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和分区表。
使用
bq mk
命令创建第一个数据集。bq --location=US mk \ PROJECT_ID:Retail_Store
创建第二个数据集。
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
使用 CREATE TABLE SQL 语句创建具有架构和测试数据的表。测试数据有一个 ID 值为
1
的存储区。缓慢更新的旁路输入模式使用此表。bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
创建 Bigtable 实例和表
创建 Bigtable 实例和表。如需详细了解如何创建 Bigtable 实例,请参阅创建实例。
如果需要,请运行以下命令安装
cbt
CLI:gcloud components install cbt
使用
bigtable instances create
命令创建一个实例:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
将 CLUSTER_ZONE 替换为集群在其中运行的可用区。
使用
cbt createtable
命令创建表:cbt -instance=aggregate-tables createtable PageView5MinAggregates
使用以下命令将列族添加到表中:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
运行流水线
使用 Gradle 运行流处理流水线。如需查看流水线使用的 Java 代码,请参阅 RetailDataProcessingPipeline.java。
使用
git clone
命令克隆 GitHub 代码库:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
切换到应用目录:
cd dataflow-sample-applications/retail/retail-java-applications
如需测试流水线,请在 shell 或终端中使用 Gradle 运行以下命令:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
如需运行流水线,请使用 Gradle 运行以下命令:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
请参阅 GitHub 上的流水线源代码。
创建并运行 Cloud Scheduler 作业
创建并运行三个 Cloud Scheduler 作业,一个发布点击流数据,一个用于库存数据,另一个用于交易数据。此步骤为流水线生成示例数据。
如需为本教程创建 Cloud Scheduler 作业,请使用
gcloud scheduler jobs create
命令。此步骤会创建一个点击流数据发布方,该发布方每分钟发布一条消息。gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
如需启动 Cloud Scheduler 作业,请使用
gcloud scheduler jobs run
命令。gcloud scheduler jobs run --location=LOCATION clickstream
为库存数据创建并运行另一个类似的发布方,该发布方每两分钟发布一条消息。
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
启动第二个 Cloud Scheduler 作业。
gcloud scheduler jobs run --location=LOCATION inventory
为交易数据创建并运行第三个发布方,该发布方每两分钟发布一条消息。
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
启动第三个 Cloud Scheduler 作业。
gcloud scheduler jobs run --location=LOCATION transactions
查看结果
查看写入 BigQuery 表的数据。通过运行以下查询来查看 BigQuery 中的结果。在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。
您可能需要等待表填充数据。
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
逐个删除资源
如果您希望重复使用该项目,请删除为本教程创建的资源。
清理 Google Cloud 项目资源
如需删除 Cloud Scheduler 作业,请使用
gcloud scheduler jobs delete
命令。gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
如需删除 Pub/Sub 订阅和主题,请使用
gcloud pubsub subscriptions delete
和gcloud pubsub topics delete
命令。gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
如需删除 BigQuery 表,请使用
bq rm
命令。bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
删除 BigQuery 数据集。单独的数据集不会产生任何费用。
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
如需删除 Bigtable 实例,请使用
cbt deleteinstance
命令。单独的存储桶不会产生任何费用。cbt deleteinstance aggregate-tables
如需删除 Cloud Storage 存储桶,请使用
gcloud storage rm
命令。单独的存储桶不会产生任何费用。gcloud storage rm gs://BUCKET_NAME --recursive
撤销凭据
撤消您授予用户管理的工作器服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
- 查看 GitHub 上的示例应用。
- 阅读相关博文:通过 Google 跟踪代码管理器数据的点击流处理了解 Beam 模式。
- 了解如何使用 Pub/Sub 创建和使用主题以及使用订阅。
- 了解如何使用 BigQuery 创建数据集。
- 探索有关 Google Cloud 的参考架构、图表和最佳实践。查看我们的 Cloud 架构中心。