并行运行多个 BigQuery 作业


BigQuery 托管了许多公共数据集,供公众查询。在本教程中,您将创建一个并行运行多个 BigQuery 查询作业的工作流,并证明与依次串行运行作业相比,性能有所提升。

目标

在此教程中,您将学习以下操作:

  1. Wikipedia 公共数据集运行查询 来确定特定月份观看次数最多的影视内容。
  2. 部署并执行一个工作流,该工作流会依次串行运行多个 BigQuery 查询作业。
  3. 部署和执行运行 BigQuery 作业的工作流 使用并行迭代, 以及并行执行普通 for 循环的位置。

您可以在 Google Cloud 控制台中运行以下命令,也可以使用 在终端或 Cloud Shell 中运行 Google Cloud CLI。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. Enable the Workflows API.

    Enable the API

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: BigQuery > BigQuery Job User, Logging > Logs Writer.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. 确保您的 Google Cloud 项目已启用结算功能

  8. Enable the Workflows API.

    Enable the API

  9. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: BigQuery > BigQuery Job User, Logging > Logs Writer.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

gcloud

  1. 登录您的 Google 账号。

    如果您还没有 Google 账号,请注册新账号

  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/bigquery.jobUser, roles/logging.logWriter:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. 确保您的 Google Cloud 项目已启用结算功能

  12. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/bigquery.jobUser, roles/logging.logWriter:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant

运行 BigQuery 查询作业

在 BigQuery 中,您可以运行交互式(按需)查询作业。如需了解详情,请参阅运行交互式查询作业和批量查询作业

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 查询编辑器文本区域:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. 点击运行

bq

在终端中,输入以下 bq query 命令以运行 使用标准 SQL 语法的交互式查询:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

这将运行一个查询,该查询返回前 100 个 并将输出写入临时表中。

记下查询的运行时长。

部署连续运行多个查询的工作流

工作流定义由一系列使用 Workflows 语法描述的步骤组成。创建工作流程后 使其可供执行部署步骤还会验证 源文件可以执行

以下工作流定义了一个要运行查询的表列表,其中包含了五个表 使用 Workflows BigQuery 连接器。 这些查询会依次运行,显示观看次数最多的影视内容, 都会保存到结果映射中。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-serial-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

    您应该已向服务账号授予 BigQuery > BigQuery Job UserLogging > Logs Writer IAM 角色。

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

gcloud

  1. 打开终端并为工作流创建源代码文件:

    touch workflow-serial-bqjobs.yaml
  2. 将以下工作流复制到您的源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    替换 MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com 替换为您之前创建的服务账号的电子邮件地址。

    您应该已向服务账号授予 roles/bigquery.jobUserroles/logging.logWriter IAM 角色。

执行工作流并连续运行多个查询

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. Workflows 页面上,选择 workflow-serial-bqjobs 工作流以转到其详情页面。

  3. 工作流详细信息页面上,选择 执行

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端。

  2. 执行工作流:

     gcloud workflows run workflow-serial-bqjob

工作流执行过程大约需要一分钟或五倍的时间, 。结果将包含每个表,并且看起来类似于 以下:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

部署和执行并行运行多个查询的工作流

您可以通过进行一些更改,以并行方式运行这五个查询,而不是顺序运行:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • parallel 步骤允许 for 循环的每个迭代并行运行。
  • results 变量被声明为 shared,这允许它 可由分支写入,并且每个分支的结果都可以附加到该分支。

控制台

  1. 在 Google Cloud 控制台中,转到 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-parallel-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

  9. 工作流详细信息页面上,选择 执行

  10. 再次点击执行

  11. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端并为工作流创建源代码文件:

    touch workflow-parallel-bqjobs.yaml
  2. 将以下工作流复制到您的源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com 替换为您之前创建的服务账号的电子邮件地址。

  4. 执行工作流:

     gcloud workflows run workflow-parallel-bqjobs

结果将与上一个输出类似,但工作流执行时间应该不超过 20 秒!

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除教程资源

删除在本教程中创建的工作流:

gcloud workflows delete WORKFLOW_NAME

后续步骤