使用 Cloud Tasks 队列缓冲工作流执行


本教程介绍如何创建 Cloud Tasks 队列, 可以调节工作流的执行速率

同时可以发生的有效工作流执行的数量上限。此配额用尽后, 执行积压消息 或者,如果已达到 执行失败,并返回 HTTP 429 Too many requests 状态代码。通过启用 使用 Cloud Tasks 队列以您指定的速率执行子工作流 您可以避免 Workflows 配额相关问题,并实现 提高执行速率

请注意,Cloud Tasks 会提供“至少一次”提交;不过,Workflows 无法确保对来自 Cloud Tasks 的重复请求进行精确一次处理。

在下图中,父级工作流会调用子工作流,这些子工作流由应用了调度速率的 Cloud Tasks 队列进行监管。

父级工作流通过 Cloud Tasks 队列调用子工作流的迭代

目标

在此教程中,您将学习以下操作:

  1. 创建充当中间方的 Cloud Tasks 队列 如何在父级工作流与子级工作流之间建立适当的差异。
  2. 创建和部署从父级接收数据的子级工作流 工作流。
  3. 创建和部署执行子工作流的父工作流 通过 Cloud Tasks 队列传输。
  4. 运行没有调度速率限制的父级工作流,该限制会调用 子工作流的执行情况。
  5. 对 Cloud Tasks 队列应用调度限制,然后运行父级工作流。
  6. 您会发现子工作流的执行速率是通过 Cloud Tasks 队列。

您可以在 Google Cloud 控制台中运行以下命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 运行这些命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  8. 在 Google Cloud 控制台中,前往 IAM 页面进行设置 为 Compute Engine 默认服务账号。

    转到 IAM

    记下 Compute Engine 默认服务账号,因为您将把它与本教程中的工作流相关联以进行测试。此服务账号会自动 是在启用或使用 Google Cloud 服务后创建的, 并使用以下电子邮件格式:

    PROJECT_NUMBER-compute@developer.gserviceaccount.com

    PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上找到项目编号。

    对于生产环境,我们强烈建议创建新的服务账号,并为其授予一个或多个 IAM 角色,这些角色包含所需的最小权限并遵循最小权限原则。

  9. 选择 Compute Engine 默认服务账号,然后在该行中点击 修改主账号
  10. 在随即显示的对话框中,点击 Add another role(添加其他角色),然后添加以下角色:
    1. 选择角色列表中,依次选择 Workflows > Workflows Invoker,以便该账号有权触发工作流执行。
    2. 选择角色列表中,依次选择 Cloud Tasks > Cloud Tasks Enqueuer,以便该账号有权创建任务。
  11. 点击保存

gcloud

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Tasks, Compute Engine, and Workflows APIs:

    gcloud services enable cloudtasks.googleapis.com compute.googleapis.com workflows.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Cloud Tasks, Compute Engine, and Workflows APIs:

    gcloud services enable cloudtasks.googleapis.com compute.googleapis.com workflows.googleapis.com
  12. 记下 Compute Engine 默认服务账号,因为您将把它与本教程中的工作流相关联以进行测试。启用或使用包含 Compute Engine 的 Google Cloud 服务后,系统会自动创建此服务账号,其电子邮件地址格式如下:

    PROJECT_NUMBER-compute@developer.gserviceaccount.com

    PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以通过以下方式查找项目编号: 运行以下命令:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    对于生产环境,我们强烈建议创建新的服务账号,并为其授予一个或多个 IAM 角色,这些角色包含所需的最小权限并遵循最小权限原则。

  13. 将项目的 Workflows Invoker 角色 (roles/workflows.invoker) 授予 Compute Engine 默认服务账号,以便该账号有权触发您的工作流执行。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/workflows.invoker

    替换以下内容:

    • PROJECT_ID:Google Cloud 项目 ID
    • PROJECT_NUMBER:Google Cloud 项目编号

  14. 将项目的 Cloud Tasks Enqueuer 角色 (roles/cloudtasks.enqueuer) 授予 Compute Engine 默认服务账号,以便该账号有权创建任务。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/cloudtasks.enqueuer

创建 Cloud Tasks 队列

创建一个 Cloud Tasks 队列,以便在父级工作流中使用,并通过该队列来调节工作流执行速率。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击 创建推送队列

  3. 输入队列名称 queue-workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 点击创建

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

创建和部署子工作流

子工作流可以接收和处理来自父工作流的数据。创建并 部署执行以下操作的子工作流:

  • 接收 iteration 作为实参
  • 休眠 10 秒以模拟某些处理
  • 成功执行后返回一个字符串

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 点击 创建

  3. 为新工作流输入名称 workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 服务账号列表中,选择 Compute Engine 默认服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch workflow-child.yaml
  2. 在文本编辑器中打开源代码文件,然后将以下工作流复制到该文件中。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. 部署工作流:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

创建和部署父级工作流

父级工作流使用 for 循环来执行子工作流的多个分支。

  1. 复制用于定义父级工作流的源代码:

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    该工作流包含以下部分:

    • 用于分配引用子工作流和 Cloud Tasks 队列名称的常量的映射。如需了解详情,请参阅 Google 地图

    • 执行以迭代方式调用子工作流的 for 循环。如需了解详情,请参阅 迭代

    • 一个工作流程步骤,可创建大量任务并将其添加到 用于执行子工作流的 Cloud Tasks 队列。有关 请参阅 Cloud Tasks API 连接器

  2. 部署工作流:

    控制台

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 点击 创建

    3. 为新工作流输入名称 workflow-parent

    4. 区域列表中,选择 us-central1(爱荷华)

    5. 服务账号列表中,选择 Compute Engine 默认服务账号

    6. 点击下一步

    7. 在工作流编辑器中,粘贴父级工作流的定义。

    8. 点击部署

    gcloud

    1. 为您的工作流创建源代码文件:

      touch workflow-parent.yaml
    2. 在文本编辑器中打开源代码文件,然后粘贴相应定义 父级工作流程

    3. 部署工作流:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

执行没有速率限制的父级工作流

执行父级工作流,即可通过 Cloud Tasks 队列。执行操作大约需要 10 秒钟才能完成。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. Workflows 页面上,点击 workflow-parent 工作流以转到其详情页面。

  3. 工作流详情页面上,点击 执行

  4. 再次点击执行

  5. 在父级工作流运行时,返回到 工作流页面,然后点击 workflow-child 工作流,转到其详情页面。

  6. 点击执行标签页。

    您应该会看到子工作流的执行,其运行情况大致相同, 类似如下代码:

    大约同时运行的子工作流执行的详细信息。

gcloud

  1. 执行工作流:

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    由于执行次数(100 次)低于工作流并发数上限,因此结果应与以下内容类似。如果您提交 成千上万次执行

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

您已创建并部署了一个工作流,该工作流会调用子工作流的 100 次迭代。

执行具有速率限制的父级工作流

将每秒调度一次的速率限制应用于 Cloud Tasks 队列,然后执行父级工作流。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击 queue-workflow-child(您创建的 Cloud Tasks 队列),然后点击修改队列

  3. 任务调度速率限制部分, Max 调度(调度次数上限)字段,类型为 1

  4. 点击保存

  5. 前往 Workflows 页面:

    进入 Workflows

  6. 点击 workflow-parent 工作流以转到其详情页面。

  7. 工作流详情页面上,点击 执行

  8. 再次点击执行

  9. 在父级工作流运行时,返回到 工作流页面,然后点击 workflow-child 工作流,转到其详情页面。

  10. 点击执行标签页。

    您应该会看到子工作流的执行情况(针对一个请求运行) 类似于以下内容:

    每秒执行请求的子工作流的详细信息。

gcloud

  1. 更新 Cloud Tasks 队列,以应用每秒调度一次的速率限制:

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. 执行工作流:

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    结果应类似于以下内容,每个工作流执行一个工作流 秒:

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

您已成功部署了一个工作流,该工作流会调用 的调度速率为每秒一次的子工作流。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除教程资源

删除在本教程中创建的工作流和 Cloud Tasks 资源:

控制台

  • 如需删除工作流,请按以下步骤操作:

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 从工作流列表中,点击工作流以转至其工作流详情页面。

    3. 点击 删除

    4. 输入工作流的名称,然后点击确认

  • 如需删除 Cloud Tasks 队列,请按以下步骤操作:

    1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

      转至 Cloud Tasks

    2. 选择要删除的队列的名称,然后点击 删除队列

    3. 确认该操作。

gcloud

  • 如需删除工作流,请运行以下命令:

    gcloud workflows delete workflow-child
    gcloud workflows delete workflow-parent

  • 如需删除 Cloud Tasks 队列,请运行以下命令:

    gcloud tasks queues delete queue-workflow-child

后续步骤