在本教程中,您将创建一个 Dataflow 流处理流水线,该流水线会转换来自 Pub/Sub 主题和订阅的电子商务数据,并将数据输出到 BigQuery 和 Bigtable。本教程需要使用 Gradle。
本教程提供了一个端到端的电子商务示例应用,该应用可将来自网上商店的数据流式传输到 BigQuery 和 Bigtable。 示例应用展示了实现流式数据分析和实时人工智能 (AI) 的常见使用场景和最佳实践。使用本教程学习如何动态响应客户操作,以便实时分析和应对事件。本教程介绍了如何存储、分析和直观呈现事件数据,以深入了解客户行为。
示例应用可在 GitHub 上找到。如需使用 Terraform 运行本教程,请按照 GitHub 上的示例应用提供的步骤操作。
目标
- 验证传入的数据,并尽可能对其应用更正。
- 分析点击流数据以保持每个产品在指定时间段内的查看次数计数。请将此信息存储在低延时存储区中。 然后,应用可以使用这些数据向网站上的客户提供“该产品的查看人数”消息。
使用交易数据告知库存排序:
- 分析交易数据,以按全球的商店来计算每个商品在指定时间段内的总销售量。
- 分析库存数据以计算每个商品的传入的库存。
- 将此数据连续传递给库存系统,以便可以使用它做出库存购买决策。
验证传入的数据,并尽可能对其应用更正。 将所有不可更正的数据写入死信队列,以进行其他分析和处理。创建一个指标,表示发送到死信队列以进行监控和提醒的传入数据百分比。
将所有传入数据处理为标准格式,并将其存储在数据仓库中以供将来的分析和可视化使用。
对门店销售的交易数据进行反规范化,以便可以包含商店位置的纬度和经度等信息。将商店 ID 用作键,在 BigQuery 中通过缓慢变化的表提供商店信息。
数据
该应用会处理以下类型的数据:
- 在线系统发送到 Pub/Sub 的点击流数据。
- 本地系统或软件即服务 (SaaS) 系统发送到 Pub/Sub 的交易数据。
- 本地系统或 SaaS 系统发送到 Pub/Sub 的库存数据。
任务模式
该应用包含使用 Java 版 Apache Beam SDK 构建的流水线中常见的以下任务模式:
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID
替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID
替换为您的 Google Cloud 项目 名称。
-
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
为您的 Google 账号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID
替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID
替换为您的 Google Cloud 项目 名称。
-
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
为您的 Google 账号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
为新流水线创建用户管理的工作器服务账号,并向服务账号授予必要的角色。
如需创建服务账号,请运行
gcloud iam service-accounts create
命令。gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
向服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
将
SERVICE_ACCOUNT_ROLE
替换为每个角色。为您的 Google 账号授予一个可让您为服务账号创建访问令牌的角色:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- 如果需要,请下载并安装 Gradle。
创建示例来源和接收器
本部分介绍如何创建以下内容:
- 用作临时存储位置的 Cloud Storage 存储桶
- 使用 Pub/Sub 的流式数据源
- 用于将数据加载到 BigQuery 中的数据集
- Bigtable 实例
创建 Cloud Storage 存储桶
首先创建一个 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。
使用 gcloud storage buckets create
命令:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
替换以下内容:
创建 Pub/Sub 主题和订阅
创建四个 Pub/Sub 主题,然后创建三个订阅。
如要创建主题,请为每个主题运行一次 gcloud pubsub topics create
命令。如需了解如何命名订阅,请参阅主题或订阅命名指南。
gcloud pubsub topics create TOPIC_NAME
将 TOPIC_NAME 替换为以下值,对每个主题运行该命令一次,即总共四次:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
如需创建对主题的订阅,请为每个订阅运行一次 gcloud pubsub subscriptions create
命令:
创建
Clickstream-inbound-sub
订阅:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
创建
Transactions-inbound-sub
订阅:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
创建
Inventory-inbound-sub
订阅:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
创建 BigQuery 数据集和表
为 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和分区表。
使用
bq mk
命令创建第一个数据集。bq --location=US mk \ PROJECT_ID:Retail_Store
创建第二个数据集。
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
使用 CREATE TABLE SQL 语句创建具有架构和测试数据的表。测试数据有一个 ID 值为
1
的存储区。缓慢更新的旁路输入模式使用此表。bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
创建 Bigtable 实例和表
创建 Bigtable 实例和表。如需详细了解如何创建 Bigtable 实例,请参阅创建实例。
如果需要,请运行以下命令安装
cbt
CLI:gcloud components install cbt
使用
bigtable instances create
命令创建一个实例:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
将 CLUSTER_ZONE 替换为集群在其中运行的可用区。
使用
cbt createtable
命令创建表:cbt -instance=aggregate-tables createtable PageView5MinAggregates
使用以下命令将列族添加到表中:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
运行流水线
使用 Gradle 运行流处理流水线。如需查看流水线使用的 Java 代码,请参阅 RetailDataProcessingPipeline.java。
使用
git clone
命令克隆 GitHub 代码库:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
切换到应用目录:
cd dataflow-sample-applications/retail/retail-java-applications
如需测试流水线,请在 shell 或终端中使用 Gradle 运行以下命令:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
如需运行流水线,请使用 Gradle 运行以下命令:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
请参阅 GitHub 上的流水线源代码。
创建并运行 Cloud Scheduler 作业
创建并运行三个 Cloud Scheduler 作业,一个发布点击流数据,一个用于库存数据,另一个用于交易数据。此步骤为流水线生成示例数据。
如需为本教程创建 Cloud Scheduler 作业,请使用
gcloud scheduler jobs create
命令。此步骤会创建一个点击流数据发布方,该发布方每分钟发布一条消息。gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
如需启动 Cloud Scheduler 作业,请使用
gcloud scheduler jobs run
命令。gcloud scheduler jobs run --location=LOCATION clickstream
为库存数据创建并运行另一个类似的发布方,该发布方每两分钟发布一条消息。
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
启动第二个 Cloud Scheduler 作业。
gcloud scheduler jobs run --location=LOCATION inventory
为交易数据创建并运行第三个发布方,该发布方每两分钟发布一条消息。
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
启动第三个 Cloud Scheduler 作业。
gcloud scheduler jobs run --location=LOCATION transactions
查看结果
查看写入 BigQuery 表的数据。通过运行以下查询来查看 BigQuery 中的结果。在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。
您可能需要等待表填充数据。
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。
- 在 Google Cloud 控制台中,进入管理资源页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击关闭以删除项目。
逐个删除资源
如果您希望重复使用该项目,请删除为本教程创建的资源。
清理 Google Cloud 项目资源
如需删除 Cloud Scheduler 作业,请使用
gcloud scheduler jobs delete
命令。gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
如需删除 Pub/Sub 订阅和主题,请使用
gcloud pubsub subscriptions delete
和gcloud pubsub topics delete
命令。gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
如需删除 BigQuery 表,请使用
bq rm
命令。bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
删除 BigQuery 数据集。单独的数据集不会产生任何费用。
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
如需删除 Bigtable 实例,请使用
cbt deleteinstance
命令。单独的存储桶不会产生任何费用。cbt deleteinstance aggregate-tables
如需删除 Cloud Storage 存储桶,请使用
gcloud storage rm
命令。单独的存储桶不会产生任何费用。gcloud storage rm gs://BUCKET_NAME --recursive
撤销凭据
撤消您授予用户管理的工作器服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
可选:撤消您创建的身份验证凭据,并删除本地凭据文件。
gcloud auth application-default revoke
-
可选:从 gcloud CLI 撤消凭据。
gcloud auth revoke
后续步骤
- 查看 GitHub 上的示例应用。
- 阅读相关博文:通过 Google 跟踪代码管理器数据的点击流处理了解 Beam 模式。
- 了解如何使用 Pub/Sub 创建和使用主题以及使用订阅。
- 了解如何使用 BigQuery 创建数据集。
- 探索有关 Google Cloud 的参考架构、图表和最佳实践。查看我们的 Cloud Architecture Center。