创建电子商务流处理流水线


在本教程中,您将创建一个 Dataflow 流处理流水线,该流水线会转换来自 Pub/Sub 主题和订阅的电子商务数据,并将数据输出到 BigQuery 和 Bigtable。本教程需要使用 Gradle

本教程提供了一个端到端的电子商务示例应用,该应用可将来自网上商店的数据流式传输到 BigQuery 和 Bigtable。 示例应用展示了实现流式数据分析和实时人工智能 (AI) 的常见使用场景和最佳实践。使用本教程学习如何动态响应客户操作,以便实时分析和应对事件。本教程介绍了如何存储、分析和直观呈现事件数据,以深入了解客户行为。

示例应用可在 GitHub 上找到。如需使用 Terraform 运行本教程,请按照 GitHub 上的示例应用提供的步骤操作。

目标

  • 验证传入的数据,并尽可能对其应用更正。
  • 分析点击流数据以保持每个产品在指定时间段内的查看次数计数。请将此信息存储在低延时存储区中。 然后,应用可以使用这些数据向网站上的客户提供“该产品的查看人数”消息。
  • 使用交易数据告知库存排序:

    • 分析交易数据,以按全球的商店来计算每个商品在指定时间段内的总销售量。
    • 分析库存数据以计算每个商品的传入的库存。
    • 将此数据连续传递给库存系统,以便可以使用它做出库存购买决策。
  • 验证传入的数据,并尽可能对其应用更正。 将所有不可更正的数据写入死信队列,以进行其他分析和处理。创建一个指标,表示发送到死信队列以进行监控和提醒的传入数据百分比。

  • 将所有传入数据处理为标准格式,并将其存储在数据仓库中以供将来的分析和可视化使用。

  • 对门店销售的交易数据进行反规范化,以便可以包含商店位置的纬度和经度等信息。将商店 ID 用作键,在 BigQuery 中通过缓慢变化的表提供商店信息。

数据

该应用会处理以下类型的数据:

  • 在线系统发送到 Pub/Sub 的点击流数据。
  • 本地系统或软件即服务 (SaaS) 系统发送到 Pub/Sub 的交易数据。
  • 本地系统或 SaaS 系统发送到 Pub/Sub 的库存数据。

任务模式

该应用包含使用 Java 版 Apache Beam SDK 构建的流水线中常见的以下任务模式:

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  8. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  9. 安装 Google Cloud CLI。
  10. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  11. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  12. 确保您的 Google Cloud 项目已启用结算功能

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  15. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  16. 为新流水线创建用户管理的工作器服务账号,并向服务账号授予必要的角色。

    1. 如需创建服务账号,请运行 gcloud iam service-accounts create 命令。

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. 向服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE 替换为每个角色。

    3. 为您的 Google 账号授予一个可让您为服务账号创建访问令牌的角色:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. 如果需要,请下载并安装 Gradle

创建示例来源和接收器

本部分介绍如何创建以下内容:

  • 用作临时存储位置的 Cloud Storage 存储桶
  • 使用 Pub/Sub 的流式数据源
  • 用于将数据加载到 BigQuery 中的数据集
  • Bigtable 实例

创建 Cloud Storage 存储桶

首先创建一个 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。

使用 gcloud storage buckets create 命令

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

替换以下内容:

  • BUCKET_NAME:符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。
  • LOCATION:存储桶的位置

创建 Pub/Sub 主题和订阅

创建四个 Pub/Sub 主题,然后创建三个订阅。

如要创建主题,请为每个主题运行一次 gcloud pubsub topics create 命令。如需了解如何命名订阅,请参阅主题或订阅命名指南

gcloud pubsub topics create TOPIC_NAME

TOPIC_NAME 替换为以下值,对每个主题运行该命令一次,即总共四次:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

如需创建对主题的订阅,请为每个订阅运行一次 gcloud pubsub subscriptions create 命令:

  1. 创建 Clickstream-inbound-sub 订阅:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. 创建 Transactions-inbound-sub 订阅:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. 创建 Inventory-inbound-sub 订阅:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

创建 BigQuery 数据集和表

为 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和分区表

  1. 使用 bq mk 命令创建第一个数据集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. 创建第二个数据集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. 使用 CREATE TABLE SQL 语句创建具有架构和测试数据的表。测试数据有一个 ID 值为 1 的存储区。缓慢更新的旁路输入模式使用此表。

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

创建 Bigtable 实例和表

创建 Bigtable 实例和表。如需详细了解如何创建 Bigtable 实例,请参阅创建实例

  1. 如果需要,请运行以下命令安装 cbt CLI

    gcloud components install cbt
    
  2. 使用 bigtable instances create 命令创建一个实例:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    CLUSTER_ZONE 替换为集群在其中运行的可用区

  3. 使用 cbt createtable 命令创建表:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. 使用以下命令将列族添加到表中:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

运行流水线

使用 Gradle 运行流处理流水线。如需查看流水线使用的 Java 代码,请参阅 RetailDataProcessingPipeline.java

  1. 使用 git clone 命令克隆 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 切换到应用目录:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. 如需测试流水线,请在 shell 或终端中使用 Gradle 运行以下命令:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. 如需运行流水线,请使用 Gradle 运行以下命令:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

请参阅 GitHub 上的流水线源代码

创建并运行 Cloud Scheduler 作业

创建并运行三个 Cloud Scheduler 作业,一个发布点击流数据,一个用于库存数据,另一个用于交易数据。此步骤为流水线生成示例数据。

  1. 如需为本教程创建 Cloud Scheduler 作业,请使用 gcloud scheduler jobs create 命令。此步骤会创建一个点击流数据发布方,该发布方每分钟发布一条消息。

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. 如需启动 Cloud Scheduler 作业,请使用 gcloud scheduler jobs run 命令。

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. 为库存数据创建并运行另一个类似的发布方,该发布方每两分钟发布一条消息。

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. 启动第二个 Cloud Scheduler 作业。

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. 为交易数据创建并运行第三个发布方,该发布方每两分钟发布一条消息。

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. 启动第三个 Cloud Scheduler 作业。

    gcloud scheduler jobs run --location=LOCATION transactions
    

查看结果

查看写入 BigQuery 表的数据。通过运行以下查询来查看 BigQuery 中的结果。在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。

您可能需要等待表填充数据。

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

如果您希望重复使用该项目,请删除为本教程创建的资源。

清理 Google Cloud 项目资源

  1. 如需删除 Cloud Scheduler 作业,请使用 gcloud scheduler jobs delete 命令。

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. 如需删除 Pub/Sub 订阅和主题,请使用 gcloud pubsub subscriptions deletegcloud pubsub topics delete 命令。

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. 如需删除 BigQuery 表,请使用 bq rm 命令。

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. 删除 BigQuery 数据集。单独的数据集不会产生任何费用。

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. 如需删除 Bigtable 实例,请使用 cbt deleteinstance 命令。单独的存储桶不会产生任何费用。

    cbt deleteinstance aggregate-tables
    
  6. 如需删除 Cloud Storage 存储桶,请使用 gcloud storage rm 命令。单独的存储桶不会产生任何费用。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

撤销凭据

  1. 撤消您授予用户管理的工作器服务账号的角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  3. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke

后续步骤