本页面介绍了如何使用 Dataflow SQL 的 gcloud
命令行工具创建 Dataflow 作业。Dataflow 作业将 Dataflow SQL 查询的结果写入 BigQuery 数据集中的表格。
准备工作
-
登录您的 Google 帐号。
如果您还没有 Google 帐号,请注册新帐号。
-
在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能。
- 启用 Dataflow、Compute Engine、Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Pub/Sub、Resource Manager 和 Data Catalog API。
- 安装并初始化 Cloud SDK。
创建 BigQuery 数据集
创建名为 taxirides
的 BigQuery 数据集。
bq mk taxirides
查询 Pub/Sub 主题
查询公共 Pub/Sub 主题 taxirides-realtime
以获取每 10 秒钟搭车乘客的人数。
gcloud beta dataflow sql query \
--job-name=dataflow-sql-quickstart \
--region=us-central1 \
--bigquery-dataset=taxirides \
--bigquery-table=passengers_per_minute \
'SELECT
TUMBLE_START("INTERVAL 10 SECOND") as period_start,
SUM(passenger_count) AS pickup_count,
FROM pubsub.topic.`pubsub-public-data`.`taxirides-realtime`
WHERE
ride_status = "pickup"
GROUP BY
TUMBLE(event_timestamp, "INTERVAL 10 SECOND")'
查看查询结果
确认 Dataflow 作业正在运行。
转到 Dataflow 监控界面。
在作业列表中,点击 dataflow-sql-quickstart。
在作业信息面板中,确认作业状态字段设置为正在运行。
启动作业可能需要几分钟时间。在作业启动之前,作业状态设置为已加入队列。
在作业图标签页中,确认每个步骤都运行至少 1 秒。
作业启动后,这些步骤可能需要几分钟才能开始运行。
从
passengers_per_minute
表中返回最繁忙的间隔。bq query \ 'SELECT * FROM taxirides.passengers_per_minute ORDER BY pickup_count DESC LIMIT 5'
清理
为避免系统因本快速入门中使用的资源向您的 Google Cloud 帐号收取费用,请按照以下步骤操作。
删除
taxirides
数据集。运行
bq rm
命令:bq rm taxirides
如需确认,请输入
y
。
取消 Dataflow 作业。
转到 Dataflow 监控界面。
在作业列表中,点击 dataflow-sql-quickstart。
点击停止 > 取消 > 停止作业。
后续步骤
- 详细了解如何使用 Dataflow SQL。
- 阅读使用 Dataflow SQL 联接流式数据教程。
- 了解如何在 Dataflow SQL 查询中使用数据来源和目的地。
- 探索 Dataflow SQL 的
gcloud
命令行工具。