使用模板快速入门

本页介绍了如何使用 Google 提供的 Dataflow 模板创建流处理流水线。具体来说,本页会以 Pub/Sub Topic to BigQuery 模板为例。

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API。

    启用 API

  5. 创建 Cloud Storage 存储分区:
    1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

      转到“Cloud Storage 浏览器”页面

    2. 点击创建存储分区
    3. 创建存储分区对话框中,指定以下特性:
      • 名称:唯一的存储分区名称。请勿在存储分区名称中添加敏感信息,因为存储分区命名空间属于全局性质,并会公开显示。
      • 默认存储类别Standard
      • 存储分区数据的存储位置。
    4. 点击创建

创建主题

  1. 转到网页界面中的“Pub/Sub 主题”页面。
    转到“Pub/Sub 主题”页面
  2. 点击 创建主题

    显示控制台中的“创建主题”对话框的屏幕截图

  3. 主题 ID 字段中,提供一个唯一的主题名称,例如 taxirides-realtime
  4. 点击保存

创建 BigQuery 数据集和表

使用 Cloud Shell 或 Cloud Console 为 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集表格

在此示例中,数据集的名称为 taxirides表格的名称为 realtime

使用 Cloud Shell

使用 Cloud Shell 创建数据集和表格。

  1. 运行以下命令来创建数据集:
    bq mk taxirides
    您的输出应如下所示:
    Dataset “myprojectid:taxirides” successfully created
  2. 运行以下命令来创建表格:
    bq mk \
    --time_partitioning_field timestamp \
    --schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
    timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
    passenger_count:integer -t taxirides.realtime
    您的输出应与以下内容类似:
    Table “myprojectid:taxirides.realtime” successfully created

    对表格进行分区以降低查询费用并提高性能。

使用 Google Cloud Platform Console

使用 Google Cloud Console 创建数据集和表格。

  1. 转到 BigQuery 网页界面。
    转到 BigQuery 网页界面
  2. 在导航窗格中,点击您的项目名称旁边的向下箭头图标,然后点击 Create dataset。输入 taxirides 作为您的数据集 ID。

    BigQuery 界面中的“Create dataset”按钮。

    数据集 ID 在每个项目中都是唯一的。点击问号图标可查看 ID 限制。

  3. 保留其他默认设置,然后点击 OK
  4. 在导航菜单中,将鼠标指针放在您刚刚创建的数据集 ID 上。点击相应 ID 旁边的向下箭头图标,然后点击 Create new table
  5. Source Data 旁边,选择 Create empty table 选项。
  6. Destination Table 下,选择 taxirides 并输入 realtime
  7. Schema 下,选择 Edit as Text,然后输入以下内容:
    ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
    meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
  8. Options 下的“Partitioning type”字段中,选择 Day 选项。
  9. Options 下的“Partitioning field”选择器中,选择 timestamp 列。
  10. 点击 Create Table 按钮。
  11. BigQuery 设置

运行流水线

使用 Google 提供的 Pub/Sub Topic to BigQuery 模板运行流处理流水线。

  1. 转到 Dataflow 网页界面。
    转到 Cloud Dataflow 网页界面
  2. 点击基于模板创建作业
  3. 为 Dataflow 作业输入作业名称
  4. Dataflow 模板下,选择“Pub/Sub Topic to BigQuery”模板。
  5. Pub/Sub 输入主题下,输入 projects/pubsub-public-data/topics/taxirides-realtime。流水线会从该输入主题获取传入数据。
  6. BigQuery 输出表下,输入 myprojectid:taxirides.realtime
  7. 临时位置下,输入 gs://mybucket/temp/。这是用于存储临时文件(如暂存的流水线作业)的子文件夹。
  8. 点击运行作业按钮。
  9. Cloud Dataflow 创建作业
  10. 查看已写入 BigQuery 的数据。转到 BigQuery 网页界面。
    转到 BigQuery 网页界面
    您可以使用标准 SQL 提交查询。例如,以下查询会选择在过去 24 小时内添加的所有行:
    SELECT * FROM `myprojectid.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

清理

为避免系统因本快速入门中使用的资源向您的 Google Cloud 帐号收取费用,请按照以下步骤操作。

  1. 转到 Dataflow 网页界面。
    转到 Cloud Dataflow 网页界面
    1. 您可能需要从 Google Cloud Console 的作业列表中选择流处理作业。
    2. 在导航菜单中,点击取消
    3. 取消对话框中,选择取消排空流水线。
  2. 转到 BigQuery 网页界面。
    转到 BigQuery 网页界面
    1. 在导航菜单中,将鼠标悬停在您创建的 taxirides 数据集上。
    2. 在导航菜单中,点击您的数据集名称旁边的向下箭头图标,然后点击删除数据集
    3. 删除数据集对话框中,输入数据集的名称 (`taxirides`),并点击确定,以确认删除命令。

后续步骤