使用 Dataflow SQL

本教程介绍如何使用 SQL 和 Dataflow SQL 界面运行 Dataflow 作业。为了演示这些操作,本教程将通过一个具体示例,逐步介绍如何将 Pub/Sub 的流式数据与 BigQuery 表中的数据联接起来。

目标

在本教程中,您将执行以下操作:

  • 使用 SQL 将 Pub/Sub 流式数据与 BigQuery 表数据联接起来
  • 从 Dataflow SQL 界面部署 Dataflow 作业

费用

本教程使用 Google Cloud 的以下收费组件:

  • Dataflow
  • Cloud Storage
  • Pub/Sub

您可使用价格计算器,根据您的预计使用情况来估算费用。Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 在 GCP Console 的项目选择器页面上,选择或创建 GCP 项目。

    转到项目选择器页面

  3. 确保您的 Google Cloud Platform 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

  4. 启用 Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager API。

    启用 API

  5. 设置身份验证:
    1. 在 GCP Console 中,转到创建服务帐号密钥页面。

      转到“创建服务帐号密钥”页面
    2. 服务帐号列表中,选择新的服务帐号
    3. 服务帐号名称字段中,输入一个名称。
    4. 角色列表中,选择项目 > 所有者

      注意角色字段为您的服务帐号授予资源访问权限。稍后您可以使用 GCP Console 查看和更改此字段。如果您开发的是正式版应用,请指定比项目 > 所有者更为精细的权限。如需了解详情,请参阅为服务帐号授予角色
    5. 点击创建。包含密钥的 JSON 文件就会下载到计算机。
  6. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含服务帐号密钥的 JSON 文件的文件路径。此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  7. 安装并初始化 Cloud SDK。请选择一个安装选项。 您可能需要project 属性设置为要用于本演示的项目。
  8. 转到 Cloud Console 中的 BigQuery 网页界面。这将打开您最近访问的项目。如需切换到其他项目,请点击 BigQuery 网页界面顶部的项目名称,然后搜索要使用的项目。
    转到 BigQuery 网页界面

切换到 Dataflow SQL 界面

BigQuery 网页界面中,您可以按照以下步骤切换到 Dataflow 界面。

  1. 点击更多下拉菜单,然后选择查询设置

  2. 在右侧打开的查询设置菜单中,选择 Dataflow 引擎

  3. 如果您的项目没有启用 Dataflow API 和 Data Catalog API,系统将提示您启用这些 API。 点击启用 API。启用 Dataflow API 和 Data Catalog API 可能需要几分钟时间。

  4. 这些 API 启用完成后,点击保存

创建示例来源

如果您希望按照本教程中提供的示例进行操作,请创建以下来源并在本教程的步骤中使用。

  • Pub/Sub 主题 transactions - 通过订阅 Pub/Sub 主题收到的交易信息数据流。每笔交易的数据包括购买的产品、售价以及发生购买交易的城市和州等信息。创建 Pub/Sub 主题后,您可以创建一个将消息发布到主题的脚本。您将在本教程的后续部分中运行此脚本。
  • BigQuery 表 us_state_salesregions - 提供州与销售区域对应关系的表。在创建此表之前,您需要先创建 BigQuery 数据集。

查找 Pub/Sub 来源

使用 Dataflow SQL 界面,您可以找到自己有权访问的任何项目的 Pub/Sub 数据源对象,因此您不必记住这些对象的完整名称。

对于本教程中的示例,请添加您创建的 transactions Pub/Sub 主题:

  1. 在左侧导航面板中,点击添加数据下拉列表,然后选择 Cloud Dataflow 来源

  2. 在右侧打开的添加 Cloud Dataflow 来源面板中,选择 Pub/Sub 主题。在搜索框中,搜索 transactions。 选择相应主题,然后点击添加

为您的 Pub/Sub 主题分配架构

通过分配架构,您可以对 Pub/Sub 主题数据运行 SQL 查询。Dataflow SQL 要求将 Pub/Sub 主题中的消息序列化为 JSON 格式。日后,我们会增加对其他格式(例如 Avro)的支持。

添加示例 Pub/Sub 主题作为 Dataflow 来源之后,请完成下列步骤以将架构分配给 Dataflow SQL 界面中的主题。

  1. 资源面板中选择主题。

  2. 架构标签页中,点击修改架构架构侧边面板会在页面右侧打开。

    用于添加或修改架构的侧边面板

  3. 切换以文字形式修改按钮,并将以下内嵌架构粘贴到编辑器中。然后点击提交

    [
          {
              "description": "Pub/Sub event timestamp",
              "name": "event_timestamp",
              "mode": "REQUIRED",
              "type": "TIMESTAMP"
          },
          {
              "description": "Transaction time string",
              "name": "tr_time_str",
              "type": "STRING"
          },
          {
              "description": "First name",
              "name": "first_name",
              "type": "STRING"
          },
          {
              "description": "Last name",
              "name": "last_name",
              "type": "STRING"
          },
          {
              "description": "City",
              "name": "city",
              "type": "STRING"
          },
          {
              "description": "State",
              "name": "state",
              "type": "STRING"
          },
          {
              "description": "Product",
              "name": "product",
              "type": "STRING"
          },
          {
              "description": "Amount of transaction",
              "name": "amount",
              "type": "FLOAT64"
          }
        ]
        
  4. (可选)点击预览主题以检查消息的内容并确认其匹配您定义的架构。

    即会打开“预览主题”按钮

查看架构

  1. 在 Dataflow SQL 界面的左侧导航面板中,点击 Cloud Dataflow 来源
  2. 点击 Pub/Sub 主题
  3. 点击事务
  4. 架构部分下,您可以查看分配给 transactions Pub/Sub 主题的架构。

创建 SQL 查询

您可以在 Dataflow SQL 界面上创建运行 Dataflow 作业的 SQL 查询。

以下 SQL 查询为数据扩充查询。该查询会向 Pub/Sub 事件流 (transactions) 额外添加一个字段 sales_region,该字段使用州与销售区域对应关系 BigQuery 表 (us_state_salesregions)。

复制以下 SQL 查询并粘贴到查询编辑器中。将 project-id 替换为您的项目 ID。

    SELECT tr.*, sr.sales_region
    FROM pubsub.topic.`project-id`.transactions as tr
      INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
      ON tr.state = sr.state_code
    

在 Dataflow SQL 界面中输入查询时,查询验证器会验证查询语法。如果查询有效,会显示一个绿色对勾标记图标。如果查询无效,则会显示一个红色感叹号图标。如果查询语法无效,您可以点击验证器图标,查看有关您需要解决的问题的相关信息。

以下屏幕截图展示了查询编辑器中的有效查询。验证器显示绿色对勾标记。

在修改器中输入查询。

创建 Dataflow 作业以运行 SQL 查询

如需运行 SQL 查询,请通过 Dataflow SQL 界面创建 Dataflow 作业。

  1. 查询编辑器下方,点击创建 Dataflow 作业

  2. 在右侧打开的创建 Dataflow 作业面板中,将默认的表名称更改为 dfsqltable_sales

  3. 可选:Dataflow 会自动选择最适合您的 Dataflow SQL 作业的设置,但您可以展开可选参数菜单以手动指定以下流水线选项

    • 工作器数量上限
    • 地区
    • 服务帐号电子邮件地址
    • 机器类型
    • 其他实验
    • 工作器 IP 地址配置
    • 网络
    • 子网
  4. 点击创建。Dataflow 作业将需要几分钟才能开始运行。

  5. 界面中会显示查询结果面板。稍后如需返回作业的查询结果面板,请在作业记录面板中找到该作业,然后使用在编辑器中打开查询按钮(如查看 Dataflow 作业和输出中所示)。

  6. 作业信息部分,点击作业 ID 链接。此时系统会打开一个新的浏览器标签页,其中显示 Dataflow 网页界面中的 Dataflow 作业详情页面。

查看 Dataflow 作业和输出

Dataflow 将您的 SQL 查询转换为 Apache Beam 流水线。在通过新浏览器标签页打开的 Dataflow 网页界面中,您可以看到流水线的图形表示。

您可以点击各个框,查看流水线中各转换的详细信息。例如,如果您点击图形表示中标有运行 SQL 查询的顶部框,则会出现一个图形,显示后台进行的操作。

最上面的两个方框表示您联接的两个输入:Pub/Sub 主题 transactions 和 BigQuery 表 us_state_salesregions

如需查看包含作业结果的输出表,请返回 Dataflow SQL 界面所在的浏览器标签页。在左侧导航面板中,在您的项目下方点击您创建的 dataflow_sql_dataset 数据集。然后点击输出表 dfsqltable_sales预览标签页会显示该输出表的内容。

查看历史作业并修改查询

Dataflow SQL 界面在作业历史记录面板中存储历史作业和查询。作业按照启动日期列出。作业列表首先显示运行了作业的日期,然后显示没有运行作业的日期。

您可以使用作业历史记录列表修改以前的 SQL 查询并运行新的 Dataflow 作业。例如,您希望修改查询,每 15 秒按销售区域汇总销售额。您可以通过作业历史记录面板访问您在本教程前面部分启动并正在运行的作业,更改其 SQL 查询,并使用修改后的查询运行另一个作业。

  1. 在左侧导航面板中,点击作业记录

  2. 作业历史记录部分,点击 Cloud Dataflow。此时界面会显示项目的所有历史作业。

  3. 点击您要修改的作业。然后点击在查询编辑器中打开

  4. 查询编辑器中修改 SQL 查询,以添加翻转窗口。如果复制以下查询,请将 project-id 替换为您的项目 ID。

         SELECT
           sr.sales_region,
           TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
           SUM(tr.amount) as amount
         FROM pubsub.topic.`project-id`.transactions AS tr
           INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
           ON tr.state = sr.state_code
         GROUP BY
           sr.sales_region,
           TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
        
  5. 查询编辑器下方,点击创建 Cloud Dataflow 作业,以使用修改后的查询创建新的作业。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请执行以下操作:

  1. 如果 transactions_injector.py 发布脚本仍在运行,请将其停止。

  2. 停止正在运行的 Dataflow 作业。转到 Cloud Console 中的 Dataflow 网页界面。

    转到 Dataflow 网页界面

    针对您按照本演示创建的每个作业,执行以下步骤:

    1. 点击作业的名称。

    2. 在该作业的作业摘要面板中,点击停止作业。系统会显示停止作业对话框,其中包含停止作业的选项:

    3. 点击取消

    4. 点击停止作业。该服务将尽可能快地停止所有数据提取和处理操作。由于使用取消选项会立即停止处理过程,因此您可能会丢失所有“传输中”数据。停止作业可能需要几分钟。

  3. 删除 BigQuery 数据集。转到 Cloud Console 中的 BigQuery 网页界面。

    转到 BigQuery 网页界面

    1. 在导航面板的资源部分中,点击您创建的 dataflow_sql_dataset 数据集。

    2. 在右侧的详细信息面板中,点击删除数据集。此操作会删除相关数据集、表和所有数据。

    3. 删除数据集对话框中,输入您的数据集的名称 (dataflow_sql_dataset),然后点击删除以确认删除命令。

  4. 删除 Pub/Sub 主题。转到 Cloud Console 中的 Pub/Sub 主题页面。

    转到 Pub/Sub 主题页面

    1. 选中 transactions 主题旁边的复选框。

    2. 点击删除以永久删除该主题。

    3. 转到 Pub/Sub 订阅页面

    4. 选中任何剩余 transactions 订阅旁边的复选框。 如果您的作业不再运行,则可能不会有任何订阅。

    5. 点击删除以永久删除这些订阅。

  5. 删除 Cloud Storage 中的 Dataflow 暂存存储分区。 在 Cloud Console 中转到 Cloud Storage 浏览器。

    转到 Cloud Storage 浏览器

    1. 选中 Dataflow 暂存存储分区旁边的复选框。

    2. 点击删除以永久删除该存储分区。

后续步骤