并行运行多个 BigQuery 作业


BigQuery 托管着许多公共数据集 供公众查询在本教程中,您将创建 并行运行多个 BigQuery 查询作业的工作流, 展现与运行作业相比性能有所提升 一个接一个地

目标

在此教程中,您将学习以下操作:

  1. Wikipedia 公共数据集运行查询 来确定特定月份观看次数最多的影视内容。
  2. 部署和执行运行多个 BigQuery 的工作流 一个接一个地依次查询作业。
  3. 部署和执行运行 BigQuery 作业的工作流 使用并行迭代, 以及并行执行普通 for 循环的位置。

您可以在 Google Cloud 控制台中运行以下命令,也可以使用 在终端或 Cloud Shell 中运行 Google Cloud CLI。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. 确保您的 Google Cloud 项目已启用结算功能

  3. 启用 Workflows API。

    启用 API

  4. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: BigQuery > BigQuery Job User, Logging > Logs Writer.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

gcloud

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  4. 确保您的 Google Cloud 项目已启用结算功能

  5. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  6. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/bigquery.jobUser, roles/logging.logWriter:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant

运行 BigQuery 查询作业

在 BigQuery 中,您可以运行交互式(按需)查询作业。 如需了解详情,请参阅运行交互式查询作业和批量查询作业

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 查询编辑器文本区域:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. 点击运行

bq

在终端中,输入以下 bq query 命令以运行 使用标准 SQL 语法的交互式查询:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

这将运行一个查询,该查询返回前 100 个 并将输出写入临时表中。

记下查询的运行时长。

部署连续运行多个查询的工作流

工作流程定义由使用 Workflows 语法。创建工作流程后 使其可供执行部署步骤还会验证 源文件可以执行

以下工作流定义了一个要运行查询的表列表,其中包含了五个表 使用 Workflows BigQuery 连接器。 这些查询会依次运行,显示观看次数最多的影视内容, 都会保存到结果映射中。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-serial-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

    您应该已经向 BigQuery > BigQuery Job UserLogging > Logs Writer IAM 角色分配给服务 。

  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

gcloud

  1. 打开终端并为工作流创建源代码文件:

    touch workflow-serial-bqjobs.yaml
    
  2. 将以下工作流复制到源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM
    

    替换 MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM 替换为您之前创建的服务账号的电子邮件地址。

    您应该已向 roles/bigquery.jobUser 和 此服务的 roles/logging.logWriter 个 IAM 角色 。

执行工作流并连续运行多个查询

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. 工作流页面上,选择 workflow-serial-bqjobs 工作流,转到其详情页面。

  3. 工作流详细信息页面上,选择 执行

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端。

  2. 执行工作流:

     gcloud workflows run workflow-serial-bqjob

工作流执行过程大约需要一分钟或五倍的时间, 。结果将包含每个表,并且看起来类似于 以下:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

部署和执行并行运行多个查询的工作流

您不必连续运行五个查询,而是可以在以下位置运行这些查询: 同时进行以下更改:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • parallel 步骤允许 for 循环的每次迭代并行运行。
  • results 变量被声明为 shared,这允许它 可由分支写入,并且每个分支的结果都可以附加到分支。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-parallel-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

  9. 工作流详细信息页面上,选择 执行

  10. 再次点击执行

  11. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端并为工作流创建源代码文件:

    touch workflow-parallel-bqjobs.yaml
    
  2. 将以下工作流复制到源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM
    

    替换 MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM 替换为您之前创建的服务账号的电子邮件地址。

  4. 执行工作流:

     gcloud workflows run workflow-parallel-bqjobs

结果将与之前的输出类似,但工作流执行 用时应该不会超过 20 秒!

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

删除在本教程中创建的工作流:

gcloud workflows delete WORKFLOW_NAME

后续步骤