使用 Dataflow 模板创建流处理流水线

本快速入门介绍如何使用 Google 提供的 Dataflow 模板创建流处理流水线。具体来说,本快速入门会以 Pub/Sub to BigQuery 模板为例。

Pub/Sub to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表中。


如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示

操作演示


准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. 确保您的 Google Cloud 项目已启用结算功能

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  8. 创建 Cloud Storage 存储桶:
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.
  9. 在后续部分中根据需要复制以下内容:
    • 您的 Cloud Storage 存储桶名称。
    • 您的 Google Cloud 项目 ID。

      如需查找此 ID,请参阅识别项目
  10. 为完成本快速入门中的步骤,您的用户账号必须具有 Dataflow Admin 角色Service Account User 角色。Compute Engine 默认服务账号必须具有 Dataflow Worker 角色。如需在 Google Cloud 控制台中添加所需的角色,请执行以下操作:

    1. 转到 IAM 页面。
      转到 IAM
    2. 选择您的项目。
    3. 在用户账号所在的行中,点击 修改主账号,然后点击 添加其他角色
    4. 在下拉列表中,选择 Dataflow Admin 角色。
    5. Service Account User 角色重复上述步骤,然后点击保存
    6. 在 Compute Engine 默认服务账号所在的行中,点击 修改主账号,然后点击 添加其他角色
    7. 在下拉列表中,选择 Dataflow Worker 角色。
    8. Pub/Sub EditorBigQuery Data Editor 角色重复上述步骤,然后点击保存

      如需详细了解如何授予角色,请参阅使用控制台授予 IAM 角色

  11. 默认情况下,每个新项目起初都有一个默认网络。 如果您的项目的默认网络已停用或者已被删除,则您需要在自己的用户账号具备 Compute Network User 角色 (roles/compute.networkUser) 的项目中拥有网络。

创建 BigQuery 数据集和表

使用 Google Cloud 控制台,为您的 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和表。

在此示例中,数据集的名称为 taxirides,表的名称为 realtime。如需创建此数据集和表,请按照以下步骤操作:

  1. 转到 BigQuery 页面。
    前往 BigQuery
  2. 探索器面板中,点击您要创建数据集的项目旁边的 查看操作,然后点击创建数据集
  3. 创建数据集面板上,按照以下步骤操作:
    1. 数据集 ID 部分,输入 taxirides。 每个 Google Cloud 项目的数据集 ID 都是唯一的。
    2. 对于位置类型,请选择多区域,然后选择美国(美国的多个区域)。公共数据集存储在 US 多区域位置。为简单起见,请将数据集放在同一位置。
    3. 保留其他默认设置,然后点击创建数据集
  4. 探索器面板中,展开您的项目。
  5. taxirides 数据集旁边,点击 查看操作,然后点击创建表
  6. 创建表面板上,按照以下步骤操作:
    1. 来源部分,为基于以下数据创建表选择空表
    2. 目标位置部分的中,输入 realtime
    3. 架构部分,点击以文字形式修改开关,并将以下架构定义粘贴到相应的框中:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. 分区和集群设置部分中,对于分区,选择时间戳字段。
  7. 保留其他默认设置,然后点击创建表

运行流水线

使用 Google 提供的 Pub/Sub to BigQuery 模板运行流处理流水线。流水线会从该输入主题获取传入数据。

  1. 转到 Dataflow 作业页面。
    转到作业
  2. 点击从模板创建作业
  3. 输入 taxi-data 作为 Dataflow 作业的名称
  4. 对于 Dataflow 模板,选择 Pub/Sub to BigQuery 模板。
  5. 对于 BigQuery 输出表,输入以下内容:
    PROJECT_ID:taxirides.realtime

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。

  6. 展开可选参数
  7. 输入 Pub/Sub 主题部分,点击手动输入主题
  8. 在对话框中,为主题名称输入以下内容,然后点击保存
    projects/pubsub-public-data/topics/taxirides-realtime

    此公开提供的 Pub/Sub 主题基于纽约市出租车和豪华轿车委员会的开放数据集。以下是来自此主题的 JSON 格式的示例消息:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  9. 临时位置部分,输入以下内容:
    gs://BUCKET_NAME/temp/

    BUCKET_NAME 替换为您的 Cloud Storage 存储桶的名称。temp 文件夹用于存储临时文件,如暂存的流水线作业。

  10. 如果您的项目没有默认网络,请输入网络子网。如需了解详情,请参阅指定网络和子网
  11. 点击运行作业

查看结果

如需查看写入 realtime 表的数据,请按照以下步骤操作:

  1. 转到 BigQuery 页面。

    前往 BigQuery

  2. 点击 编写新查询。随即会打开一个新的编辑器标签页。

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。数据最多可能需要一分钟才会开始显示在表中。

  3. 点击运行

    查询将返回在过去 24 小时内添加到表中的行。您还可以使用标准 SQL 运行查询。

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

删除项目

避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

逐个删除资源

如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:

  1. 转到 Dataflow 作业页面。
    转到作业
  2. 从作业列表中选择您的流处理作业。
  3. 在导航中,点击停止
  4. 停止作业对话框中,取消排空您的流水线,然后点击停止作业
  5. 转到 BigQuery 页面。
    前往 BigQuery
  6. 探索器面板中,展开您的项目。
  7. 在要删除的数据集旁边,点击 查看操作,然后点击打开
  8. 在详细信息面板中,点击删除数据集,然后按照相应说明操作。
  9. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  10. Click the checkbox for the bucket that you want to delete.
  11. To delete the bucket, click Delete, and then follow the instructions.

后续步骤