快速入门:使用模板

本快速入门介绍了如何使用 Google 提供的 Dataflow 模板创建流处理流水线。具体来说,本快速入门会以 Pub/Sub Topic to BigQuery 模板为例。

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中。

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager API。

    启用 API

  5. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  6. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  7. 启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager API。

    启用 API

  8. 创建 Cloud Storage 存储分区:
    1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

      转到浏览器

    2. 点击创建存储分区
    3. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
      • 指定存储分区的名称中,输入唯一的存储分区名称。请勿在存储分区名称中添加敏感信息,因为存储分区命名空间是全局性的,公开可见。
      • 对于选择数据存储位置,执行以下操作:
        • 选择位置类型选项。
        • 选择位置选项。
      • 对于为数据选择一个默认存储类别,选择以下项: Standard
      • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
      • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
    4. 点击创建
  9. 在后续部分中根据需要复制以下内容:
    • 您的 Cloud Storage 存储分区名称。
    • 您的 Google Cloud 项目 ID。 如需查找此 ID,请参阅识别项目

创建 BigQuery 数据集和表

使用 Cloud Console,为您的 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和表。

在此示例中,数据集的名称为 taxirides,表名称为 realtime。如需创建此数据集和表,请按照以下步骤操作:

  1. 在 Cloud Console 中,转到 BigQuery 页面。
    转到 BigQuery
  2. 探索器面板中,点击您要创建数据集的项目旁边的 查看操作,然后点击打开
  3. 在详细信息面板中,点击 创建数据集
  4. 创建数据集面板上,按照以下步骤操作:
    1. 数据集 ID 部分,输入 taxirides
    2. 选择美国 (US) 作为数据位置。公共数据集存储在 US 多区域位置。为简单起见,请将数据集放在同一位置。
  5. 保留其他默认设置,然后点击创建数据集
  6. 探索器面板中,展开您的项目。
  7. 点击 taxirides 数据集旁边的 查看操作,然后点击打开
  8. 在详细信息面板中,点击 创建表
  9. 创建表面板上,按照以下步骤操作:
    1. 来源部分,为基于以下数据创建表选择空表
    2. 目标位置部分的表名称中,输入 realtime
    3. 架构部分,点击以文字形式修改开关,并将以下架构定义粘贴到相应的框中。
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. 分区和集群设置部分中,对于分区,选择时间戳字段。
  10. 保留其他默认设置,然后点击创建表

运行流水线

使用 Google 提供的 Pub/Sub Topic to BigQuery 模板运行流处理流水线。流水线会从该输入主题获取传入数据。

  1. 在 Cloud Console 中,转到 Dataflow 作业页面。
    转到作业
  2. 点击基于模板创建作业
  3. 为 Dataflow 作业输入作业名称
  4. Dataflow 模板下,选择 Pub/Sub Topic to BigQuery 模板。
  5. 对于输入 Pub/Sub 主题,输入以下内容:
    projects/pubsub-public-data/topics/taxirides-realtime

    此公开提供的 Pub/Sub 主题基于纽约市出租车和豪华轿车委员会的开放数据集。以下是来自此主题的 JSON 格式的示例消息:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  6. 对于 BigQuery 输出表,输入以下内容:
    PROJECT_ID:taxirides.realtime

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。

  7. 对于临时位置,输入以下内容:
    gs://BUCKET_NAME/temp/

    BUCKET_NAME 替换为您的 Cloud Storage 存储桶的名称。temp 文件夹用于存储临时文件,如暂存的流水线作业。

  8. 点击运行作业

查看结果

如需查看写入 realtime 表的数据,请按照以下步骤操作:

  1. 在 Cloud Console 中,转到 BigQuery 页面。
    转到 BigQuery
  2. 在查询编辑器中,运行以下查询:
    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。 数据最多可能需要一分钟才会开始显示在表中。

    查询将返回在过去 24 小时内添加到表中的行。您还可以使用标准 SQL 运行查询。

清理

为避免系统因本页中使用的资源向您的 Google Cloud 帐号收取费用,请按照以下步骤操作。

删除项目

避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:

  1. 在 Cloud Console 中,转到 Dataflow 作业页面。
    转到作业
  2. 从作业列表中选择您的流处理作业。
  3. 在导航中,点击停止
  4. 停止作业对话框中,取消排空您的流水线,然后点击停止作业
  5. 在 Cloud Console 中,转到 BigQuery 页面。
    转到 BigQuery
  6. 探索器面板中,展开您的项目。
  7. 在要删除的数据集旁边,点击 查看操作,然后点击打开
  8. 在详细信息面板中,点击删除数据集,然后按照相应说明操作。
  9. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

    转到浏览器

  10. 点击要删除的存储分区对应的复选框。
  11. 如需删除存储分区,请点击删除,然后按照说明操作。

后续步骤