如果您尝试检索的 BigQuery 数据集查询结果大于 Workflows 内存限制,则可以使用页面令牌逐页浏览结果。页面令牌表示结果集中的位置,并会在有更多结果可用时返回。这样,您就可以一次循环浏览一页结果。
BigQuery 托管了许多公共数据集,供公众查询。在以下示例中,您将查询美国姓名数据公共数据集,以确定 1910 年至 2013 年间美国人最常用的姓名。
YAML
# Use a page token to loop through a page of results at a time when
# querying a BigQuery dataset larger than the Workflows memory limit
# This workflow queries a public dataset to determine the most common
# names in the US between 1910 and 2013
main:
params: [input]
steps:
- init:
assign:
- pageToken: null
- startQuery:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
body:
configuration:
query:
useLegacySql: false
# Remove LIMIT from the query to iterate through all results
query: SELECT name, SUM(number) AS total FROM `bigquery-public-data.usa_names.usa_1910_2013` GROUP BY name ORDER BY total DESC LIMIT 50
result: query
- getPage:
call: googleapis.bigquery.v2.jobs.getQueryResults
args:
projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
jobId: ${query.jobReference.jobId}
maxResults: 10
pageToken: ${pageToken}
result: page
- processPage:
for:
value: row
in: ${page.rows}
steps:
- processRow:
call: sys.log
args:
data: ${row}
- checkIfDone:
switch:
- condition: ${"pageToken" in page and page.pageToken != ""}
assign:
- pageToken: ${page.pageToken}
next: getPage
JSON
{
"main": {
"params": [
"input"
],
"steps": [
{
"init": {
"assign": [
{
"pageToken": null
}
]
}
},
{
"startQuery": {
"call": "googleapis.bigquery.v2.jobs.insert",
"args": {
"projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
"body": {
"configuration": {
"query": {
"useLegacySql": false,
"query": "SELECT name, SUM(number) AS total FROM `bigquery-public-data.usa_names.usa_1910_2013` GROUP BY name ORDER BY total DESC LIMIT 50"
}
}
}
},
"result": "query"
}
},
{
"getPage": {
"call": "googleapis.bigquery.v2.jobs.getQueryResults",
"args": {
"projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
"jobId": "${query.jobReference.jobId}",
"maxResults": 10,
"pageToken": "${pageToken}"
},
"result": "page"
}
},
{
"processPage": {
"for": {
"value": "row",
"in": "${page.rows}",
"steps": [
{
"processRow": {
"call": "sys.log",
"args": {
"data": "${row}"
}
}
}
]
}
}
},
{
"checkIfDone": {
"switch": [
{
"condition": "${\"pageToken\" in page and page.pageToken != \"\"}",
"assign": [
{
"pageToken": "${page.pageToken}"
}
],
"next": "getPage"
}
]
}
}
]
}
}