使用 Dataflow SQL 联接流式数据


本教程介绍如何使用 Dataflow SQL 将来自 Pub/Sub 的数据流与 BigQuery 表中的数据联接起来。

目标

在本教程中,您将执行以下操作:

  • 编写 Dataflow SQL 查询,该查询将 Pub/Sub 流式数据与 BigQuery 表中的数据联接起来。
  • 从 Dataflow SQL 界面部署 Dataflow 作业。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Cloud Dataflow、Compute Engine、Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager 和 Data Catalog。 API。

    启用 API

  5. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. Project > Owner 角色授予服务帐号。

      如需授予该角色,请找到选择角色列表,然后选择 Project > Owner

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  6. 创建服务帐号密钥:

    1. 在 Google Cloud 控制台中,点击您创建的服务帐号的电子邮件地址。
    2. 点击密钥
    3. 点击添加密钥,然后点击创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  7. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  8. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  9. 确保您的 Google Cloud 项目已启用结算功能

  10. 启用 Cloud Dataflow、Compute Engine、Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager 和 Data Catalog。 API。

    启用 API

  11. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. Project > Owner 角色授予服务帐号。

      如需授予该角色,请找到选择角色列表,然后选择 Project > Owner

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  12. 创建服务帐号密钥:

    1. 在 Google Cloud 控制台中,点击您创建的服务帐号的电子邮件地址。
    2. 点击密钥
    3. 点击添加密钥,然后点击创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  13. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  14. 安装并初始化 gcloud CLI。请选择一个安装选项。 您可能需要project 属性设置为要用于本演示的项目。
  15. 转到 Google Cloud 控制台中的 Dataflow SQL 网页界面。这将打开您最近访问的项目。如需切换到其他项目,请点击 BigQuery SQL 网页界面顶部的项目名称,然后搜索要使用的项目。
    转到 Dataflow SQL 网页界面

创建示例来源

如果您希望按照本教程中提供的示例进行操作,请创建以下来源并在本教程的步骤中使用。

  • Pub/Sub 主题 transactions - 通过订阅 Pub/Sub 主题收到的交易信息数据流。每笔交易的数据包括购买的产品、售价以及发生购买交易的城市和州等信息。创建 Pub/Sub 主题后,您可以创建一个将消息发布到主题的脚本。您将在本教程的后续部分中运行此脚本。
  • BigQuery 表格 us_state_salesregions - 提供州与销售区域对应关系的表格。在创建此表之前,您需要创建 BigQuery 数据集。

为您的 Pub/Sub 主题分配架构

通过分配架构,您可以对 Pub/Sub 主题数据运行 SQL 查询。当前,Dataflow SQL 要求将 Pub/Sub 主题中的消息序列化为 JSON 格式。

如需为示例 Pub/Sub 主题 transactions 分配架构,请执行以下操作:

  1. 创建一个文本文件并将其命名为 transactions_schema.yaml。将以下架构文本复制并粘贴到 transactions_schema.yaml

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. 使用 Google Cloud CLI 分配架构。

    a.使用以下命令更新 gcloud CLI。确保 gcloud CLI 的版本为 242.0.0 或更高版本。

      gcloud components update
    

    b. 在命令行窗口中运行以下命令。将 project-id 替换为您的项目 ID,将 path-to-file 替换为 transactions_schema.yaml 文件的路径。

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    如需详细了解命令的参数和允许使用的架构文件格式,请参阅 gcloud data-catalog entries update 的文档页面。

    c.确认您的架构已成功分配给 transactions Pub/Sub 主题。请将 project-id 替换为您的项目 ID。

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

查找 Pub/Sub 来源

使用 Dataflow SQL 界面,您可以找到自己有权访问的任何项目的 Pub/Sub 数据源对象,因此您不必记住这些对象的完整名称。

对于本教程中的示例,前往 Dataflow SQL 编辑器并搜索您创建的 transactions Pub/Sub 主题:

  1. 前往 SQL 工作区

  2. Dataflow SQL 编辑器面板的搜索栏中,搜索 projectid=project-id transactions。请将 project-id 替换为您的项目 ID。

    Dataflow SQL 工作区中的 Data Catalog 搜索面板。

查看架构

  1. 在 Dataflow SQL 界面的 Dataflow SQL 编辑器面板中,点击事务或者输入 projectid=project-id system=cloud_pubsub 来搜索 Pub/Sub 主题,然后选择主题。
  2. 架构下,您可以查看分配给 Pub/Sub 主题的架构。

    分配给主题的架构,包括字段名称及其说明的列表。

创建 SQL 查询

您可以在 Dataflow SQL 界面上创建运行 Dataflow 作业的 SQL 查询。

以下 SQL 查询为数据扩充查询。该查询会向 Pub/Sub 事件流 (transactions) 额外添加一个字段 sales_region,该字段使用州与销售区域对应关系 BigQuery 表 (us_state_salesregions)。

复制以下 SQL 查询并粘贴到查询编辑器中。将 project-id 替换为您的项目 ID。

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

在 Dataflow SQL 界面中输入查询时,查询验证器会验证查询语法。如果查询有效,会显示一个绿色对勾标记图标。如果查询无效,则会显示一个红色感叹号图标。如果查询语法无效,您可以点击验证器图标,查看有关您需要解决的问题的相关信息。

以下屏幕截图展示了查询编辑器中的有效查询。验证器显示绿色对勾标记。

Dataflow SQL 工作区,其中包含编辑器中可见教程的查询。

创建 Dataflow 作业以运行 SQL 查询

如需运行 SQL 查询,请通过 Dataflow SQL 界面创建 Dataflow 作业。

  1. 查询编辑器中,点击创建作业

  2. 在打开的创建 Dataflow 作业面板中:

    • 对于目标位置,选择 BigQuery
    • 对于数据集 ID,选择 dataflow_sql_tutorial
    • 表名称部分,输入 sales
    创建 Dataflow SQL 作业表单。
  3. 可选:Dataflow 会自动选择最适合您的 Dataflow SQL 作业的设置,但您可以展开可选参数菜单,手动指定以下流水线选项

    • 工作器数量上限
    • 地区
    • 服务账号电子邮件地址
    • 机器类型
    • 其他实验
    • 工作器 IP 地址配置
    • 网络
    • 子网
  4. 点击创建。Dataflow 作业需要几分钟才能开始运行。

查看 Dataflow 作业

Dataflow 将您的 SQL 查询转换为 Apache Beam 流水线。点击查看作业以打开 Dataflow 网页界面,您可以在其中查看流水线的图形表示。

Dataflow 网页界面中显示的来自 SQL 查询的流水线。

如需查看流水线中发生的转换详情,请点击这些框。例如,如果您点击图形表示中标有运行 SQL 查询的第一个框,则会出现一个图形,显示后台进行的操作。

最前面的两个框表示您联接的两个输入:Pub/Sub 主题 transactions 和 BigQuery 表 us_state_salesregions

包含两个输入的联接的写入输出在 25 秒内完成。

如需查看包含作业结果的输出表,请前往 BigQuery 界面。在探索器面板的项目中,点击您创建的 dataflow_sql_tutorial 数据集。然后,点击输出表 sales预览标签页会显示该输出表的内容。

sales 预览表包含 tr_time_str、first_name、last_name、city、state、product、amount、sales_region 列。

查看历史作业并修改查询

Dataflow 界面在 Dataflow 作业页面中存储历史作业和查询。

您可以使用作业历史记录列表查看之前的 SQL 查询。例如,您希望修改查询,每 15 秒按销售区域汇总销售额。您可以通过作业页面访问您在本教程前面部分启动并正在运行的作业,复制 SQL 查询,并使用修改后的查询运行另一个作业。

  1. 在 Dataflow 作业页面中,点击要修改的作业。

  2. 作业详细信息页面的作业信息面板中,在流水线选项下找到 SQL 查询。查找 queryString 所在的行。

    名为 queryString 的作业流水线选项。
  3. 将以下 SQL 查询复制并粘贴到 SQL 工作区Dataflow SQL 编辑器中,以添加翻转窗口。 请将 project-id 替换为您的项目 ID。

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. 点击创建作业以使用修改后的查询创建新作业。

清除数据

为避免因本教程中使用的资源导致您的 Cloud Billing 账号产生费用,请执行以下操作:

  1. 如果 transactions_injector.py 发布脚本仍在运行,请将其停止。

  2. 停止正在运行的 Dataflow 作业。转到 Google Cloud 控制台中的 Dataflow 网页界面。

    转到 Dataflow 网页界面

    针对您按照本演示创建的每个作业,执行以下步骤:

    1. 点击作业的名称。

    2. 作业详情页面上,点击停止。系统会显示停止作业对话框,其中包含停止作业的选项:

    3. 选择取消

    4. 点击停止作业。该服务将尽可能快地停止所有数据提取和处理操作。由于使用取消选项会立即停止处理过程,因此您可能会丢失所有“传输中”数据。停止作业可能需要几分钟。

  3. 删除 BigQuery 数据集。转到 Google Cloud 控制台中的 BigQuery 网页界面。

    转到 BigQuery 网页界面

    1. 探索器面板的资源部分中,点击您创建的 dataflow_sql_tutorial 数据集。

    2. 在详细信息面板中,点击删除。此时会打开一个确认对话框。

    3. 删除数据集对话框中,输入 delete 以确认删除命令,然后点击删除

  4. 删除 Pub/Sub 主题。转到 Google Cloud 控制台中的 Pub/Sub 主题页面。

    转到 Pub/Sub 主题页面

    1. 选择 transactions 主题。

    2. 点击删除以永久删除该主题。 此时会打开一个确认对话框。

    3. 删除主题对话框中,输入 delete 以确认删除命令,然后点击删除

    4. 转到 Pub/Sub 订阅页面

    5. 选择要 transactions 的任何剩余订阅。如果您的作业不再运行,则可能不会有任何订阅。

    6. 点击删除以永久删除该订阅。 在确认对话框中点击删除

  5. 删除 Cloud Storage 中的 Dataflow 暂存存储分区。进入 Google Cloud 控制台中的 Cloud Storage 存储桶页面。

    进入“存储桶”

    1. 选择 Dataflow 暂存存储桶。

    2. 点击删除以永久删除该存储分区。 此时会打开一个确认对话框。

    3. 删除存储桶对话框中,输入 DELETE 以确认删除命令,然后点击删除

后续步骤