并行运行多个 BigQuery 作业


BigQuery 托管许多公共数据集,公众可以查询这些数据集。在本教程中,您将创建一个并行运行多个 BigQuery 查询作业的工作流,这表明与逐个连续运行作业相比,性能有所提升。

目标

在此教程中,您将学习以下操作:

  1. 维基百科公共数据集运行查询,以确定特定月份中浏览次数最多的影视内容。
  2. 部署和执行连续运行多个 BigQuery 查询作业的工作流。
  3. 部署并执行一个工作流,该工作流使用并行迭代运行 BigQuery 作业,并且普通 for 循环并行执行。

您可以在 Google Cloud 控制台中运行以下命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 来运行以下命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Workflows API。

    启用 API

  5. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 向服务帐号授予以下角色: BigQuery > BigQuery Job User, Logging > Logs Writer.

      如需授予角色,请找到选择角色列表,然后选择相应角色。

      如需授予其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

  6. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  7. 确保您的 Google Cloud 项目已启用结算功能

  8. 启用 Workflows API。

    启用 API

  9. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 向服务帐号授予以下角色: BigQuery > BigQuery Job User, Logging > Logs Writer.

      如需授予角色,请找到选择角色列表,然后选择相应角色。

      如需授予其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

gcloud

  1. 登录您的 Google 账号。

    如果您还没有 Google 账号,请注册新账号

  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  7. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:roles/bigquery.jobUser, roles/logging.logWriter

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • ROLE:要授予的角色
  8. 安装 Google Cloud CLI。
  9. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  10. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  11. 确保您的 Google Cloud 项目已启用结算功能

  12. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  13. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:roles/bigquery.jobUser, roles/logging.logWriter

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • ROLE:要授予的角色

运行 BigQuery 查询作业

在 BigQuery 中,您可以运行交互式(按需)查询作业。 如需了解详情,请参阅运行交互式查询作业和批量查询作业

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 查询编辑器文本区域中输入以下 BigQuery SQL 查询:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. 点击运行

bq

在终端中,输入以下 bq query 命令,以使用标准 SQL 语法运行交互式查询:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

此查询会运行一个查询,该查询返回特定月份浏览次数最多的前 100 个维基百科标题,并将输出写入临时表中。

请注意查询运行所需的时间。

部署依次运行多个查询的工作流

工作流定义由使用 Workflows 语法描述的一系列步骤组成。创建工作流后,可以对其进行部署,使其可供执行。部署步骤还会验证源文件能否执行。

以下工作流定义了使用 Workflows BigQuery 连接器运行查询的五个表的列表。这些查询会逐个连续运行,并将每个表中查看次数最多的标题保存到结果映射中。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    进入 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-serial-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务帐号

    您应该已经向服务帐号授予了 BigQuery > BigQuery Job UserLogging > Logs Writer IAM 角色。

  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

gcloud

  1. 打开终端并为工作流创建源代码文件:

    touch workflow-serial-bqjobs.yaml
    
  2. 将以下工作流复制到源代码文件:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM
    

    MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM 替换为您之前创建的服务帐号的电子邮件地址。

    您应该已向服务帐号授予 roles/bigquery.jobUserroles/logging.logWriter IAM 角色。

执行工作流并连续运行多个查询

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    进入 Workflows

  2. 工作流页面上,选择 workflow-serial-bqjobs 工作流以转到其详情页面。

  3. 工作流详细信息页面上,选择 执行

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端。

  2. 执行工作流:

     gcloud workflows run workflow-serial-bqjob

工作流执行时间应大约为先前运行时间的 1 分钟或 5 倍。结果将包含每个表,类似于以下内容:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

部署和执行并行运行多个查询的工作流

您可以通过进行一些更改来并行运行查询,而不是依序运行五个查询:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • parallel 步骤允许 for 循环的每次迭代并行运行。
  • results 变量声明为 shared,这样便可由分支写入,且每个分支的结果可以附加到其中。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    进入 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-parallel-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务帐号

  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

  9. 工作流详细信息页面上,选择 执行

  10. 再次点击执行

  11. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端并为工作流创建源代码文件:

    touch workflow-parallel-bqjobs.yaml
    
  2. 将以下工作流复制到源代码文件:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM
    

    MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM 替换为您之前创建的服务帐号的电子邮件地址。

  4. 执行工作流:

     gcloud workflows run workflow-serial-bqjob

结果与上述输出类似,但工作流的执行时间应该约为 20 秒或更短!

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

删除本教程中创建的工作流:

gcloud workflows delete WORKFLOW_NAME

后续步骤