本页面介绍如何使用 Dataflow SQL 查询数据和写入查询结果。
Dataflow SQL 可以查询以下来源:
- 来自 Pub/Sub 主题的流式数据
- 来自 Cloud Storage 文件集的流式数据和批量数据
- 来自 BigQuery 表的批量数据
Dataflow SQL 可以将查询结果写入以下目标:
Pub/Sub
查询 Pub/Sub 主题
如需使用 Dataflow SQL 查询 Pub/Sub 主题,请完成以下步骤:
添加 Pub/Sub 主题作为 Dataflow 来源。
为 Pub/Sub 主题分配架构。
在 Dataflow SQL 查询中使用 Pub/Sub 主题。
添加 Pub/Sub 主题
您可以使用 BigQuery 网页界面添加 Pub/Sub 主题作为 Dataflow 来源。
在 Google Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。
在导航面板中,点击添加数据下拉列表,然后选择 Cloud Dataflow 来源。
在添加 Cloud Dataflow 来源面板中,选择 Cloud Pub/Sub 主题并搜索相应主题。
以下屏幕截图显示了对
transactions
Pub/Sub 主题的搜索:点击添加。
在添加相应 Pub/Sub 主题作为 Dataflow 来源后,该 Pub/Sub 主题会显示在导航菜单的资源部分中。
如需查找该主题,请展开 Cloud Dataflow 来源 > Cloud Pub/Sub 主题。
分配 Pub/Sub 主题架构
Pub/Sub 主题架构包含以下字段:
event_timestamp
字段。Pub/Sub 事件时间戳用于标识消息的发布时间。时间戳会自动添加到 Pub/Sub 消息中。
Pub/Sub 消息中每个键值对的字段。
例如,消息
{"k1":"v1", "k2":"v2"}
的架构包含两个STRING
字段,分别名为k1
和k2
。
您可以使用 Cloud Console 或 Google Cloud CLI 为 Pub/Sub 主题分配架构。
控制台
如需为 Pub/Sub 主题分配架构,请完成以下步骤:
在资源面板中选择主题。
在架构标签页中,点击修改架构,以打开显示架构字段的架构侧边面板。
点击添加字段以向架构添加字段,或切换以文本形式修改按钮以复制并粘贴整个架构文本。
例如,以下是包含销售交易的 Pub/Sub 主题的架构文本:
[ { "description": "Pub/Sub event timestamp", "name": "event_timestamp", "mode": "REQUIRED", "type": "TIMESTAMP" }, { "description": "Transaction time string", "name": "tr_time_str", "mode": "NULLABLE", "type": "STRING" }, { "description": "First name", "name": "first_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "Last name", "name": "last_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "City", "name": "city", "mode": "NULLABLE", "type": "STRING" }, { "description": "State", "name": "state", "mode": "NULLABLE", "type": "STRING" }, { "description": "Product", "name": "product", "mode": "NULLABLE", "type": "STRING" }, { "description": "Amount of transaction", "name": "amount", "mode": "NULLABLE", "type": "FLOAT64" } ]
点击提交。
(可选)点击预览主题,检查消息的内容并确认其匹配您定义的架构。
gcloud
如需为 Pub/Sub 主题分配架构,请完成以下步骤:
使用架构文本创建一个 JSON 文件。
例如,以下是包含销售交易的 Pub/Sub 主题的架构文本:
[ { "description": "Pub/Sub event timestamp", "column": "event_timestamp", "mode": "REQUIRED", "type": "TIMESTAMP" }, { "description": "Transaction time string", "column": "tr_time_str", "mode": "NULLABLE", "type": "STRING" }, { "description": "First name", "column": "first_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "Last name", "column": "last_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "City", "column": "city", "mode": "NULLABLE", "type": "STRING" }, { "description": "State", "column": "state", "mode": "NULLABLE", "type": "STRING" }, { "description": "Product", "column": "product", "mode": "NULLABLE", "type": "STRING" }, { "description": "Amount of transaction", "column": "amount", "mode": "NULLABLE", "type": "FLOAT64" } ]
使用
gcloud data-catalog entries
命令为 Pub/Sub 主题分配架构:gcloud data-catalog entries update \ --lookup-entry='pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`' \ --schema-from-file=FILE_PATH
请替换以下内容:
PROJECT_ID
:您的项目 IDTOPIC_NAME
:您的 Pub/Sub 主题名称FILE_PATH
:包含架构文本的 JSON 文件的路径
(可选)通过运行以下命令,确认已成功为 Pub/Sub 主题分配架构:
gcloud data-catalog entries lookup \ 'pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`'
使用 Pub/Sub 主题
如需在 Dataflow SQL 查询中引用 Pub/Sub,请使用以下标识符:
pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`
请替换以下内容:
PROJECT_ID
:您的项目 IDTOPIC_NAME
:您的 Pub/Sub 主题名称
例如,以下查询会从项目 dataflow-sql
中的 Dataflow 主题 daily.transactions
选择。
SELECT *
FROM pubsub.topic.`dataflow-sql`.`daily.transactions`
写入 Pub/Sub 主题
您可以使用 Cloud Console 或 Google Cloud CLI 将查询结果写入 Pub/Sub 主题。
控制台
如需将查询结果写入 Pub/Sub 主题,请使用 Dataflow SQL 运行查询:
在 Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。
在查询编辑器中输入 Dataflow SQL 查询。
点击创建 Cloud Dataflow 作业,打开作业选项面板。
在该面板的目标部分中,选择输出类型 > Cloud Pub/Sub 主题。
点击选择 Cloud Pub/Sub 主题,然后选择一个主题。
点击创建。
gcloud
如需将查询结果写入 Pub/Sub 主题,请使用 gcloud dataflow sql query
命令的 --pubsub-topic
标志:
gcloud dataflow sql query \ --job-name=JOB_NAME \ --region=REGION \ --pubsub-project=PROJECT_ID \ --pubsub-topic=TOPIC_NAME \ 'QUERY'
请替换以下内容:
JOB_NAME
:您选择的作业名称REGION
:地区端点(例如us-west1
)PROJECT_ID
:您的项目 IDTOPIC_NAME
:您的 Pub/Sub 主题名称QUERY
:Dataflow SQL 查询
目标 Pub/Sub 主题的架构必须与查询结果的架构匹配。如果目标 Pub/Sub 主题没有架构,则系统会自动分配与查询结果匹配的架构。
Cloud Storage
查询 Cloud Storage 文件集
如需使用 Dataflow SQL 查询 Cloud Storage 文件集,请完成以下步骤:
为 Dataflow SQL 创建 Data Catalog 文件集。
添加 Cloud Storage 文件集作为 Dataflow 来源。
在 Dataflow SQL 查询中使用 Cloud Storage 文件集。
创建 Cloud Storage 文件集
如需创建 Cloud Storage 文件集,请参阅创建条目组和文件集。
Cloud Storage 文件集必须具有架构,并且只能包含没有标题行的 CSV 文件。
添加 Cloud Storage 文件集
您可以使用 Dataflow SQL 添加 Cloud Storage 文件集作为 Dataflow 来源:
在 Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。
在导航面板中,点击添加数据下拉列表,然后选择 Cloud Dataflow 来源。
在添加 Cloud Dataflow 来源面板中,选择 Cloud Storage 文件集并搜索相应主题。
点击添加。
在添加相应 Cloud Storage 文件集作为 Dataflow 来源后,该 Cloud Storage 文件集会显示在导航菜单的资源部分中。
如需查找该文件集,请展开 Cloud Dataflow 来源 > Cloud Storage 主题。
使用 Cloud Storage 文件集
如需在 Dataflow SQL 查询中引用 Cloud Storage 表,请使用以下标识符:
datacatalog.entry.`PROJECT_ID`.REGION.`ENTRY_GROUP`.`FILESET_NAME`
请替换以下内容:
PROJECT_ID
:您的项目 IDREGION
:地区端点(例如us-west1
)ENTRY_GROUP
:Cloud Storage 文件集的条目组FILESET_NAME
:Cloud Storage 文件集的名称
例如,以下查询会从项目 dataflow-sql
和条目组 my-fileset-group
中的 Cloud Storage 文件集 daily.registrations
选择。
SELECT *
FROM datacatalog.entry.`dataflow-sql`.`us-central1`.`my-fileset-group`.`daily.registrations`
BigQuery
查询 BigQuery 表
如需使用 Dataflow SQL 查询 BigQuery 表,请完成以下步骤:
为 Dataflow SQL 创建 BigQuery 表。
在 Dataflow SQL 查询中使用 BigQuery 表。
您无需添加 BigQuery 表作为 Dataflow 来源。
创建 BigQuery 表
如需为 Dataflow SQL 创建 BigQuery 表,请参阅创建具有架构定义的空表。
在查询中使用 BigQuery 表
如需在 Dataflow SQL 查询中引用 BigQuery 表,请使用以下标识符:
bigquery.table.`PROJECT_ID`.`DATASET_NAME`.`TABLE_NAME`
标识符必须遵循 Dataflow SQL 词法结构。使用反引号将包含非字母、数字或下划线字符的标识符括起来。
例如,以下查询会从数据集 dataflow_sql_dataset
和项目 dataflow-sql
中的 BigQuery 表 us_state_salesregions
选择。
SELECT *
FROM bigquery.table.`dataflow-sql`.dataflow_sql_dataset.us_state_salesregions
写入 BigQuery 表
您可以使用 Cloud Console 或 Google Cloud CLI 将查询结果写入 Dataflow SQL 查询。
控制台
如需将查询结果写入 Dataflow SQL 查询,请使用 Dataflow SQL 运行查询:
在 Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。
在查询编辑器中输入 Dataflow SQL 查询。
点击创建 Cloud Dataflow 作业,打开作业选项面板。
在该面板的目标部分中,选择输出类型 > BigQuery。
点击数据集 ID,然后选择已加载的数据集 (Loaded dataset) 或新建数据集。
在表名称字段中,输入目标表。
(可选)选择如何将数据加载到 BigQuery 表中。
- 只写入空白表:(默认)仅当表为空时才写入数据。
- 附加到表:将数据附加到表的末尾。
- 覆盖表:在写入新数据之前清空表中的所有现有数据。
点击创建。
gcloud
如需将查询结果写入 BigQuery 表,请使用 gcloud dataflow sql query
命令的 --bigquery-table
标志:
gcloud dataflow sql query \ --job-name=JOB_NAME \ --region=REGION \ --bigquery-dataset=DATASET_NAME \ --bigquery-table=TABLE_NAME \ 'QUERY'
请替换以下内容:
JOB_NAME
:您选择的作业名称REGION
:地区端点(例如us-west1
)DATASET_NAME
:您的 BigQuery 数据集名称TABLE_NAME
:您的 BigQuery 表名称QUERY
:Dataflow SQL 查询
如需选择将数据写入 BigQuery 表的方式,您可以使用 --bigquery-write-disposition
标志和以下值:
write-empty
:(默认)仅当表为空时才写入数据。write-append
:将数据附加到表的末尾。write-truncate
:在写入新数据之前清空表中的所有现有数据。
gcloud dataflow sql query \ --job-name=JOB_NAME \ --region=REGION \ --bigquery-dataset=DATASET_NAME \ --bigquery-table=TABLE_NAME \ --bigquery-write-disposition=WRITE_MODE 'QUERY'
将 WRITE_MODE
替换为 BigQuery 写入处置值。
目标 BigQuery 表的架构必须与查询结果的架构匹配。如果目标 BigQuery 表没有架构,则系统会自动分配与查询结果匹配的架构。