使用 Dataflow 模板创建流处理流水线
本快速入门介绍如何使用 Google 提供的 Dataflow 模板创建流处理流水线。具体来说,本快速入门会以 Pub/Sub to BigQuery 模板为例。
Pub/Sub to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表中。
如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示:
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
- 创建 Cloud Storage 存储桶:
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select the following: Standard.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
- 在后续部分中根据需要复制以下内容:
- 您的 Cloud Storage 存储桶名称。
- 您的 Google Cloud 项目 ID。
如需查找此 ID,请参阅识别项目。
为完成本快速入门中的步骤,您的用户账号必须具有 Dataflow Admin 角色和 Service Account User 角色。Compute Engine 默认服务账号必须具有 Dataflow Worker 角色。如需在 Google Cloud 控制台中添加所需的角色,请执行以下操作:
- 转到 IAM 页面。
转到 IAM - 选择您的项目。
- 在用户账号所在的行中,点击 修改主账号,然后点击 添加其他角色。
- 点击 添加其他角色,然后在下拉列表中选择 Dataflow Admin。
- 点击 添加其他角色,然后在下拉列表中选择 Service Account User。
- 点击保存。
- 在 Compute Engine 默认服务账号所在的行中,点击 修改主账号。
- 点击 添加其他角色,然后在下拉列表中选择 Dataflow Worker。
- 点击 添加其他角色,然后在下拉列表中选择 Pub/Sub Editor。
- 点击 添加其他角色,然后在下拉列表中选择 BigQuery Data Editor。
点击保存。
如需详细了解如何授予角色,请参阅使用控制台授予 IAM 角色。
- 转到 IAM 页面。
- 默认情况下,每个新项目起初都有一个默认网络。如果您的项目的默认网络已停用或者已被删除,则您需要在自己的用户账号具备 Compute Network User 角色 (
roles/compute.networkUser
) 的项目中拥有网络。
创建 BigQuery 数据集和表
使用 Google Cloud 控制台,为您的 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和表。
在此示例中,数据集的名称为 taxirides
,表的名称为 realtime
。如需创建此数据集和表,请按照以下步骤操作:
- 转到 BigQuery 页面。
前往 BigQuery - 在探索器面板中,点击您要创建数据集的项目旁边的 查看操作,然后点击创建数据集。
- 在创建数据集面板上,按照以下步骤操作:
- 在数据集 ID 部分,输入
taxirides
。 每个 Google Cloud 项目的数据集 ID 都是唯一的。 - 对于位置类型,请选择多区域,然后选择美国(美国的多个区域)。公共数据集存储在
US
多区域位置。为简单起见,请将数据集放在同一位置。 - 保留其他默认设置,然后点击创建数据集
- 在
探索器 面板中,展开您的项目。 - 在
taxirides
数据集旁边,点击 查看操作,然后点击创建表。 - 在创建表面板上,按照以下步骤操作:
- 在来源部分,为基于以下数据创建表选择空表。
- 在目标位置部分的表中,输入
realtime
。 - 在架构部分,点击以文字形式修改开关,并将以下架构定义粘贴到相应的框中:
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- 在分区和集群设置部分中,对于分区,选择时间戳字段。
- 保留其他默认设置,然后点击创建表。
运行流水线
使用 Google 提供的 Pub/Sub to BigQuery 模板运行流处理流水线。流水线会从该输入主题获取传入数据。
- 转到 Dataflow 作业页面。
转到作业 - 点击
从模板创建作业 。 - 输入
taxi-data
作为 Dataflow 作业的名称。 - 对于 Dataflow 模板,选择 Pub/Sub to BigQuery 模板。
- 对于 BigQuery 输出表,输入以下内容:
PROJECT_ID:taxirides.realtime
将
PROJECT_ID
替换为创建了 BigQuery 数据集的项目的 ID。 - 展开可选参数。
- 在输入 Pub/Sub 主题部分,点击手动输入主题。
- 在对话框中,为主题名称输入以下内容,然后点击保存:
projects/pubsub-public-data/topics/taxirides-realtime
此公开提供的 Pub/Sub 主题基于纽约市出租车和豪华轿车委员会的开放数据集。以下是来自此主题的 JSON 格式的示例消息:
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- 在临时位置部分,输入以下内容:
gs://BUCKET_NAME/temp/
将
BUCKET_NAME
替换为您的 Cloud Storage 存储桶的名称。temp
文件夹用于存储临时文件,如暂存的流水线作业。 - 如果您的项目没有默认网络,请输入网络和子网。如需了解详情,请参阅指定网络和子网。
- 点击运行作业。
查看结果
如需查看写入realtime
表的数据,请按照以下步骤操作:
转到 BigQuery 页面。
点击
编写新查询。随即会打开一个新的编辑器标签页。SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
将
PROJECT_ID
替换为创建了 BigQuery 数据集的项目的 ID。数据最多可能需要一分钟才会开始显示在表中。点击运行。
查询将返回在过去 24 小时内添加到表中的行。您还可以使用标准 SQL 运行查询。
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
删除项目
避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
逐个删除资源
如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:
- 转到 Dataflow 作业页面。
转到作业 - 从作业列表中选择您的流处理作业。
- 在导航中,点击停止。
- 在停止作业对话框中,取消或排空您的流水线,然后点击停止作业。
- 转到 BigQuery 页面。
前往 BigQuery - 在探索器面板中,展开您的项目。
- 在要删除的数据集旁边,点击 查看操作,然后点击打开。
- 在详细信息面板中,点击删除数据集,然后按照相应说明操作。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.