创建电子商务流处理流水线


在本教程中,您将创建一个 Dataflow 流处理流水线,该流水线会转换来自 Pub/Sub 主题和订阅的电子商务数据,并将数据输出到 BigQuery 和 Bigtable。本教程需要使用 Gradle

本教程提供了一个端到端的电子商务示例应用,该应用可将来自网上商店的数据流式传输到 BigQuery 和 Bigtable。 示例应用展示了实现流式数据分析和实时人工智能 (AI) 的常见使用场景和最佳实践。使用本教程学习如何动态响应客户操作,以便实时分析和应对事件。本教程介绍了如何存储、分析和直观呈现事件数据,以深入了解客户行为。

示例应用可在 GitHub 上找到。如需使用 Terraform 运行本教程,请按照 GitHub 上的示例应用提供的步骤操作。

目标

  • 验证传入的数据,并尽可能对其应用更正。
  • 分析点击流数据以保持每个产品在指定时间段内的查看次数计数。请将此信息存储在低延时存储区中。 然后,应用可以使用这些数据向网站上的客户提供“该产品的查看人数”消息。
  • 使用交易数据告知库存排序:

    • 分析交易数据,以按全球的商店来计算每个商品在指定时间段内的总销售量。
    • 分析库存数据以计算每个商品的传入的库存。
    • 将此数据连续传递给库存系统,以便可以使用它做出库存购买决策。
  • 验证传入的数据,并尽可能对其应用更正。 将所有不可更正的数据写入死信队列,以进行其他分析和处理。创建一个指标,表示发送到死信队列以进行监控和提醒的传入数据百分比。

  • 将所有传入数据处理为标准格式,并将其存储在数据仓库中以供将来的分析和可视化使用。

  • 对门店销售的交易数据进行反规范化,以便可以包含商店位置的纬度和经度等信息。将商店 ID 用作键,在 BigQuery 中通过缓慢变化的表提供商店信息。

数据

该应用会处理以下类型的数据:

  • 在线系统发送到 Pub/Sub 的点击流数据。
  • 本地系统或软件即服务 (SaaS) 系统发送到 Pub/Sub 的交易数据。
  • 本地系统或 SaaS 系统发送到 Pub/Sub 的库存数据。

任务模式

该应用包含使用 Java 版 Apache Beam SDK 构建的流水线中常见的以下任务模式:

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. 确保您的 Google Cloud 项目已启用结算功能

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. 为新流水线创建用户管理的工作器服务账号,并向服务账号授予必要的角色。

    1. 如需创建服务账号,请运行 gcloud iam service-accounts create 命令。

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. 向服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE 替换为每个角色。

    3. 为您的 Google 账号授予一个可让您为服务账号创建访问令牌的角色:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. 如果需要,请下载并安装 Gradle

创建示例来源和接收器

本部分介绍如何创建以下内容:

  • 用作临时存储位置的 Cloud Storage 存储桶
  • 使用 Pub/Sub 的流式数据源
  • 用于将数据加载到 BigQuery 中的数据集
  • Bigtable 实例

创建 Cloud Storage 存储桶

首先创建一个 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。

使用 gcloud storage buckets create 命令

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

替换以下内容:

  • BUCKET_NAME:符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。
  • LOCATION:存储桶的位置

创建 Pub/Sub 主题和订阅

创建四个 Pub/Sub 主题,然后创建三个订阅。

如要创建主题,请为每个主题运行一次 gcloud pubsub topics create 命令。如需了解如何命名订阅,请参阅主题或订阅命名指南

gcloud pubsub topics create TOPIC_NAME

TOPIC_NAME 替换为以下值,对每个主题运行该命令一次,即总共四次:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

如需创建对主题的订阅,请为每个订阅运行一次 gcloud pubsub subscriptions create 命令:

  1. 创建 Clickstream-inbound-sub 订阅:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. 创建 Transactions-inbound-sub 订阅:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. 创建 Inventory-inbound-sub 订阅:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

创建 BigQuery 数据集和表

为 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和分区表

  1. 使用 bq mk 命令创建第一个数据集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. 创建第二个数据集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. 使用 CREATE TABLE SQL 语句创建具有架构和测试数据的表。测试数据有一个 ID 值为 1 的存储区。缓慢更新的旁路输入模式使用此表。

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

创建 Bigtable 实例和表

创建 Bigtable 实例和表。如需详细了解如何创建 Bigtable 实例,请参阅创建实例

  1. 如果需要,请运行以下命令安装 cbt CLI

    gcloud components install cbt
    
  2. 使用 bigtable instances create 命令创建一个实例:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    CLUSTER_ZONE 替换为集群在其中运行的可用区

  3. 使用 cbt createtable 命令创建表:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. 使用以下命令将列族添加到表中:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

运行流水线

使用 Gradle 运行流处理流水线。如需查看流水线使用的 Java 代码,请参阅 RetailDataProcessingPipeline.java

  1. 使用 git clone 命令克隆 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 切换到应用目录:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. 如需测试流水线,请在 shell 或终端中使用 Gradle 运行以下命令:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. 如需运行流水线,请使用 Gradle 运行以下命令:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

请参阅 GitHub 上的流水线源代码

创建并运行 Cloud Scheduler 作业

创建并运行三个 Cloud Scheduler 作业,一个发布点击流数据,一个用于库存数据,另一个用于交易数据。此步骤为流水线生成示例数据。

  1. 如需为本教程创建 Cloud Scheduler 作业,请使用 gcloud scheduler jobs create 命令。此步骤会创建一个点击流数据发布方,该发布方每分钟发布一条消息。

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. 如需启动 Cloud Scheduler 作业,请使用 gcloud scheduler jobs run 命令。

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. 为库存数据创建并运行另一个类似的发布方,该发布方每两分钟发布一条消息。

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. 启动第二个 Cloud Scheduler 作业。

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. 为交易数据创建并运行第三个发布方,该发布方每两分钟发布一条消息。

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. 启动第三个 Cloud Scheduler 作业。

    gcloud scheduler jobs run --location=LOCATION transactions
    

查看结果

查看写入 BigQuery 表的数据。通过运行以下查询来查看 BigQuery 中的结果。在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。

您可能需要等待表填充数据。

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

逐个删除资源

如果您希望重复使用该项目,请删除为本教程创建的资源。

清理 Google Cloud 项目资源

  1. 如需删除 Cloud Scheduler 作业,请使用 gcloud scheduler jobs delete 命令。

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. 如需删除 Pub/Sub 订阅和主题,请使用 gcloud pubsub subscriptions deletegcloud pubsub topics delete 命令。

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. 如需删除 BigQuery 表,请使用 bq rm 命令。

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. 删除 BigQuery 数据集。单独的数据集不会产生任何费用。

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. 如需删除 Bigtable 实例,请使用 cbt deleteinstance 命令。单独的存储桶不会产生任何费用。

    cbt deleteinstance aggregate-tables
    
  6. 如需删除 Cloud Storage 存储桶,请使用 gcloud storage rm 命令。单独的存储桶不会产生任何费用。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

撤销凭据

  1. 撤消您授予用户管理的工作器服务账号的角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

后续步骤