对 BigQuery 结果集进行分页

如果您尝试检索的 BigQuery 数据集查询结果大于 Workflows 内存限制,则可以使用页面令牌逐页浏览结果。页面令牌表示结果集中的位置,并会在有更多结果可用时返回。这样,您就可以一次循环浏览一页结果。

BigQuery 托管了许多公共数据集,供公众查询。在以下示例中,您将查询美国姓名数据公共数据集,以确定 1910 年至 2013 年间美国人最常用的姓名。

YAML

# Use a page token to loop through a page of results at a time when
# querying a BigQuery dataset larger than the Workflows memory limit
# This workflow queries a public dataset to determine the most common
# names in the US between 1910 and 2013
main:
  params: [input]
  steps:
    - init:
        assign:
          - pageToken: null
    - startQuery:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          body:
            configuration:
              query:
                useLegacySql: false
                # Remove LIMIT from the query to iterate through all results
                query: SELECT name, SUM(number) AS total FROM `bigquery-public-data.usa_names.usa_1910_2013` GROUP BY name ORDER BY total DESC LIMIT 50
        result: query
    - getPage:
        call: googleapis.bigquery.v2.jobs.getQueryResults
        args:
          projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          jobId: ${query.jobReference.jobId}
          maxResults: 10
          pageToken: ${pageToken}
        result: page
    - processPage:
        for:
          value: row
          in: ${page.rows}
          steps:
            - processRow:
                call: sys.log
                args:
                  data: ${row}
    - checkIfDone:
        switch:
          - condition: ${"pageToken" in page and page.pageToken != ""}
            assign:
              - pageToken: ${page.pageToken}
            next: getPage

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "pageToken": null
            }
          ]
        }
      },
      {
        "startQuery": {
          "call": "googleapis.bigquery.v2.jobs.insert",
          "args": {
            "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
            "body": {
              "configuration": {
                "query": {
                  "useLegacySql": false,
                  "query": "SELECT name, SUM(number) AS total FROM `bigquery-public-data.usa_names.usa_1910_2013` GROUP BY name ORDER BY total DESC LIMIT 50"
                }
              }
            }
          },
          "result": "query"
        }
      },
      {
        "getPage": {
          "call": "googleapis.bigquery.v2.jobs.getQueryResults",
          "args": {
            "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
            "jobId": "${query.jobReference.jobId}",
            "maxResults": 10,
            "pageToken": "${pageToken}"
          },
          "result": "page"
        }
      },
      {
        "processPage": {
          "for": {
            "value": "row",
            "in": "${page.rows}",
            "steps": [
              {
                "processRow": {
                  "call": "sys.log",
                  "args": {
                    "data": "${row}"
                  }
                }
              }
            ]
          }
        }
      },
      {
        "checkIfDone": {
          "switch": [
            {
              "condition": "${\"pageToken\" in page and page.pageToken != \"\"}",
              "assign": [
                {
                  "pageToken": "${page.pageToken}"
                }
              ],
              "next": "getPage"
            }
          ]
        }
      }
    ]
  }
}

后续步骤