快速入门:使用模板

本快速入门介绍如何使用 Google 提供的 Dataflow 模板创建流处理流水线。具体来说,本快速入门会以 Pub/Sub Topic to BigQuery 模板为例。

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表中。


如需在控制台中直接获取有关此任务的分步指导,请点击操作演示

操作演示


以下部分将引导您完成与点击操作演示相同的步骤。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API。

    启用 API

  5. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  6. 确保您的 Google Cloud 项目已启用结算功能

  7. 启用 Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API。

    启用 API

  8. 创建 Cloud Storage 存储桶:
    1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

      进入“存储桶”页面

    2. 点击创建存储分区
    3. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
      • 指定存储分区的名称中,输入唯一的存储分区名称。请勿在存储分区名称中添加敏感信息,因为存储分区命名空间是全局性的,公开可见。
      • 对于选择数据存储位置,执行以下操作:
        • 选择位置类型选项。
        • 选择位置选项。
      • 对于为数据选择一个默认存储类别,选择以下项: Standard
      • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
      • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
    4. 点击创建
  9. 在后续部分中根据需要复制以下内容:
    • 您的 Cloud Storage 存储桶名称。
    • 您的 Google Cloud 项目 ID。

      如需查找此 ID,请参阅识别项目
  10. 为完成本快速入门中的步骤,您的用户帐号必须具有 Dataflow Admin 角色Service Account User 角色。Compute Engine 默认服务帐号必须具有 Dataflow Worker 角色。如需在控制台中添加所需的角色,请执行以下操作:

    1. 在控制台中,转到 IAM 页面。
      转到 IAM
    2. 选择您的项目。
    3. 在用户帐号所在的行中,点击 修改主帐号,然后点击 添加其他角色
    4. 在下拉列表中,选择 Dataflow Admin 角色。
    5. Service Account User 角色重复上述步骤,然后点击保存
    6. 在 Compute Engine 默认服务帐号所在的行中,点击 修改主帐号,然后点击 添加其他角色
    7. 在下拉列表中,选择 Dataflow Worker 角色。
    8. Pub/Sub EditorBigQuery Data Editor 角色重复上述步骤,然后点击保存

      如需详细了解如何授予角色,请参阅使用控制台授予 IAM 角色

  11. 除非您选择停用默认网络,否则每个新项目开始都有一个默认网络。如果您的项目没有默认网络,则项目中需要有一个您的用户帐号具有其 Compute Network User 角色 (roles/compute.networkUser) 的网络。

创建 BigQuery 数据集和表

使用控制台为您的 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和表。

在此示例中,数据集的名称为 taxirides,表的名称为 realtime。如需创建此数据集和表,请按照以下步骤操作:

  1. 在控制台中,转到 BigQuery 页面。
    前往 BigQuery
  2. 探索器面板中,点击您要创建数据集的项目旁边的 查看操作,然后点击创建数据集
  3. 创建数据集面板上,按照以下步骤操作:
    1. 数据集 ID 部分,输入 taxirides
    2. 选择美国 (US) 作为数据位置。公共数据集存储在 US 多区域位置。为简单起见,请将数据集放在同一位置。
    3. 保留其他默认设置,然后点击创建数据集
  4. 探索器面板中,展开您的项目。
  5. 点击 taxirides 数据集旁边的 查看操作,然后点击打开
  6. 在详细信息面板中,点击 创建表
  7. 创建表面板上,按照以下步骤操作:
    1. 来源部分,为基于以下数据创建表选择空表
    2. 目标位置部分的中,输入 realtime
    3. 架构部分,点击以文字形式修改开关,并将以下架构定义粘贴到相应的框中:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. 分区和集群设置部分中,对于分区,选择时间戳字段。
  8. 保留其他默认设置,然后点击创建表

运行流水线

使用 Google 提供的 Pub/Sub Topic to BigQuery 模板运行流处理流水线。流水线会从该输入主题获取传入数据。

  1. 在控制台中,转到 Dataflow 作业页面。
    转到作业
  2. 点击从模板创建作业
  3. 输入 taxi-data 作为 Dataflow 作业的名称
  4. 对于 Dataflow 模板,选择 Pub/Sub Topic to BigQuery 模板。
  5. 输入 Pub/Sub 主题部分,点击手动输入主题
  6. 在对话框中,为主题名称输入以下内容,然后点击保存
    projects/pubsub-public-data/topics/taxirides-realtime

    此公开提供的 Pub/Sub 主题基于纽约市出租车和豪华轿车委员会的开放数据集。以下是来自此主题的 JSON 格式的示例消息:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  7. 对于 BigQuery 输出表,输入以下内容:
    PROJECT_ID:taxirides.realtime

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。

  8. 对于临时位置,输入以下内容:
    BUCKET_NAME/temp/

    BUCKET_NAME 替换为您的 Cloud Storage 存储桶的名称。temp 文件夹用于存储临时文件,如暂存的流水线作业。

  9. 如果您的项目没有默认网络,请点击显示可选参数,然后输入网络子网。如需了解详情,请参阅指定网络和子网
  10. 点击运行作业

查看结果

如需查看写入 realtime 表的数据,请按照以下步骤操作:

  1. 在控制台中,转到 BigQuery 页面。

    前往 BigQuery

  2. 在查询编辑器中,粘贴以下查询:

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。数据最多可能需要一分钟才会开始显示在表中。

  3. 点击运行

    查询将返回在过去 24 小时内添加到表中的行。您还可以使用标准 SQL 运行查询。

清理

为避免因本页中使用的资源导致您的 Google Cloud 帐号产生费用,请按照以下步骤操作。

删除项目

避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:

  1. 在控制台中,转到 Dataflow 作业页面。
    转到作业
  2. 从作业列表中选择您的流处理作业。
  3. 在导航中,点击停止
  4. 停止作业对话框中,取消排空您的流水线,然后点击停止作业
  5. 在控制台中,转到 BigQuery 页面。
    前往 BigQuery
  6. 探索器面板中,展开您的项目。
  7. 在要删除的数据集旁边,点击 查看操作,然后点击打开
  8. 在详细信息面板中,点击删除数据集,然后按照相应说明操作。
  9. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”

  10. 点击要删除的存储分区对应的复选框。
  11. 如需删除存储分区,请点击删除,然后按照说明操作。

后续步骤