使用 Cloud Tasks 队列缓冲工作流执行


本教程介绍如何创建可调节工作流执行速率的 Cloud Tasks 队列。

可以同时进行的工作流执行次数存在上限。一旦此配额用尽,并且如果执行积压处于停用状态,或者如果积压的执行作业达到配额,则任何新的执行作业都会失败,并返回 HTTP 429 Too many requests 状态代码。通过使 Cloud Tasks 队列能够以您定义的速率执行子工作流,您可以避免与 Workflows 配额相关的问题,并实现更高的执行速率。

请注意,Cloud Tasks 旨在提供“至少一次”提交;不过,对于来自 Cloud Tasks 的重复请求,Workflows 无法确保只处理一次。

在下图中,父工作流会调用受应用了调度率的 Cloud Tasks 队列监管的子工作流。

父工作流通过 Cloud Tasks 队列调用子工作流的迭代

目标

在此教程中,您将学习以下操作:

  1. 创建充当父工作流和子工作流之间中介的 Cloud Tasks 队列。
  2. 创建并部署可从父工作流接收数据的子工作流。
  3. 创建并部署通过 Cloud Tasks 队列执行子工作流的父工作流。
  4. 运行没有调度速率限制的父工作流,该工作流会调用子工作流的执行。
  5. 对 Cloud Tasks 队列应用调度限制,然后运行父工作流。
  6. 请注意,子工作流的执行速率由 Cloud Tasks 队列定义。

您可以在 Google Cloud 控制台中运行以下命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 运行这些命令。

费用

在本文档中,您将使用 Google Cloud的以下收费组件:

如需根据您的预计使用量来估算费用,请使用价格计算器

新 Google Cloud 用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  8. 在 Google Cloud 控制台中,前往 IAM 页面,为 Compute Engine 默认服务账号设置权限。

    转到 IAM

    记下 Compute Engine 默认服务账号,因为您将在本教程中将其与工作流相关联以进行测试。启用或使用包含 Compute Engine 的 Google Cloud 服务后,系统会自动创建此服务账号,其电子邮件地址格式如下:

    PROJECT_NUMBER-compute@developer.gserviceaccount.com

    PROJECT_NUMBER 替换为您的Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上找到项目编号。

    对于生产环境,我们强烈建议创建新的服务账号,并为其授予一个或多个 IAM 角色,这些角色包含所需的最小权限并遵循最小权限原则。

  9. 选择 Compute Engine 默认服务账号,然后在该行中点击 修改主账号
  10. 在随即显示的对话框中,点击 添加其他角色,然后添加以下角色:
    1. 选择角色列表中,选择 Workflows > Workflows Invoker,以便该账号有权触发工作流执行。
    2. 选择角色列表中,选择 Cloud Tasks > Cloud Tasks Enqueuer,以便账号拥有创建任务的权限。
  11. 点击保存

gcloud

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  8. 记下 Compute Engine 默认服务账号,因为您将在本教程中将其与工作流相关联以进行测试。启用或使用包含 Compute Engine 的 Google Cloud 服务后,系统会自动创建此服务账号,其电子邮件地址格式如下:

    PROJECT_NUMBER-compute@developer.gserviceaccount.com

    PROJECT_NUMBER 替换为您的Google Cloud 项目编号。您可以通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    对于生产环境,我们强烈建议创建新的服务账号,并为其授予一个或多个 IAM 角色,这些角色包含所需的最小权限并遵循最小权限原则。

  9. 将项目的 Workflows Invoker 角色 (roles/workflows.invoker) 授予 Compute Engine 默认服务账号,以便该账号有权触发您的工作流执行。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/workflows.invoker

    替换以下内容:

    • PROJECT_ID: Google Cloud 项目 ID
    • PROJECT_NUMBER: Google Cloud 项目编号

  10. 将项目的 Cloud Tasks Enqueuer 角色 (roles/cloudtasks.enqueuer) 授予 Compute Engine 默认服务账号,以便该账号有权创建任务。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/cloudtasks.enqueuer

创建 Cloud Tasks 队列

创建可在父工作流中使用的 Cloud Tasks 队列,并允许您控制工作流执行速率。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击 创建推送队列

  3. 输入队列名称,即 queue-workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 点击创建

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

创建和部署子工作流

子工作流可以接收和处理来自父工作流的数据。创建并部署一个执行以下操作的子工作流:

  • 接收 iteration 作为实参
  • 休眠 10 秒以模拟某些处理
  • 成功执行后返回字符串

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面。

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称 workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 服务账号列表中,选择 Compute Engine 默认服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch workflow-child.yaml
  2. 在文本编辑器中打开源代码文件,然后将以下工作流复制到该文件中。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. 部署工作流:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

创建和部署父工作流

父工作流使用 for 循环执行子工作流的多个分支。

  1. 复制定义父工作流的源代码:

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    工作流包括以下部分:

    • 用于分配引用子工作流和 Cloud Tasks 队列名称的常量的映射。如需了解详情,请参阅地图

    • 一种 for 循环,用于以迭代方式调用子工作流。 如需了解详情,请参阅迭代

    • 一种工作流步骤,用于创建大量任务并将其添加到 Cloud Tasks 队列中,以执行子工作流。如需了解详情,请参阅 Cloud Tasks API 连接器

  2. 部署工作流:

    控制台

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 点击 创建

    3. 输入新工作流的名称 workflow-parent

    4. 区域列表中,选择 us-central1(爱荷华)

    5. 服务账号列表中,选择 Compute Engine 默认服务账号

    6. 点击下一步

    7. 在工作流编辑器中,粘贴父工作流的定义。

    8. 点击部署

    gcloud

    1. 为工作流创建源代码文件:

      touch workflow-parent.yaml
    2. 在文本编辑器中打开源代码文件,然后粘贴父工作流的定义。

    3. 部署工作流:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

执行无速率限制的父工作流

执行父工作流,以通过 Cloud Tasks 队列调用子工作流。执行大约需要 10 秒才能完成。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. 工作流页面上,点击 workflow-parent 工作流以转到其详情页面。

  3. 工作流详情页面上,点击 执行

  4. 再次点击执行

  5. 在父工作流运行时,返回到工作流页面,然后点击 workflow-child 工作流以转到其详情页面。

  6. 点击执行标签页。

    您应该会看到子工作流的执行情况,这些执行大约在同一时间运行,类似于以下内容:

    同时运行的子工作流执行的详细信息。

gcloud

  1. 执行工作流:

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    由于执行次数 (100) 低于 Workflows 并发限制,因此结果应与以下内容类似。如果您同时提交数千次执行,可能会出现配额问题。

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

您已创建并部署一个工作流,该工作流会调用子工作流 100 次。

执行具有速率限制的父工作流

对 Cloud Tasks 队列应用每秒一次调度的速率限制,然后执行父工作流。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击您创建的 Cloud Tasks 队列 queue-workflow-child,然后点击修改队列

  3. 任务分派速率限制部分中,为最大分派数字段输入 1

  4. 点击保存

  5. 前往 Workflows 页面:

    进入 Workflows

  6. 点击 workflow-parent 工作流以转到其详情页面。

  7. 工作流详情页面上,点击 执行

  8. 再次点击执行

  9. 在父工作流运行时,返回到工作流页面,然后点击 workflow-child 工作流以转到其详情页面。

  10. 点击执行标签页。

    您应该会看到子工作流的执行情况,以每秒一次请求的速度运行,如下所示:

    以每秒请求数执行的子工作流的详细信息。

gcloud

  1. 更新 Cloud Tasks 队列,以应用每秒一次调度的速率限制:

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. 执行工作流:

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    结果应类似于以下内容,每秒执行一次工作流:

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

您已成功部署一个工作流,该工作流以每秒一次的调度速率调用子工作流 100 次迭代。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除教程资源

删除在本教程中创建的工作流和 Cloud Tasks 资源:

控制台

  • 如需删除工作流,请按以下步骤操作:

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 从工作流列表中,点击工作流以转至其工作流详情页面。

    3. 点击 删除

    4. 输入工作流的名称,然后点击确认

  • 如需删除 Cloud Tasks 队列,请按以下步骤操作:

    1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

      转至 Cloud Tasks

    2. 选择要删除的队列的名称,然后点击删除队列

    3. 确认该操作。

gcloud

  • 如需删除工作流,请运行以下命令:

    gcloud workflows delete workflow-child
    gcloud workflows delete workflow-parent

  • 如需删除 Cloud Tasks 队列,请运行以下命令:

    gcloud tasks queues delete queue-workflow-child

后续步骤