并行步骤可以通过同时执行多个阻塞调用来缩短工作流的总执行时间。
阻塞调用,例如 sleep、 HTTP 调用以及 回调可能需要一定的时间 从几毫秒到几天。并行步骤旨在协助执行此类并发长时间运行的操作。如果工作流必须执行多次阻塞调用 彼此独立,使用并行分支可以减少 同时启动多个调用,并等待所有 完成这些任务
例如,如果您的工作流必须先从多个独立系统检索客户数据,然后才能继续,并行分支可支持并发 API 请求。如果有 5 个系统,每个系统的响应时间为 2 秒,那么在工作流中按顺序执行这些步骤可能需要至少 10 秒;并行执行这些步骤可能只需要 2 秒。
创建并行步骤
创建 parallel
步骤,以定义工作流的某个部分,其中两个或更多步骤可以并发执行。
YAML
- PARALLEL_STEP_NAME: parallel: exception_policy: POLICY shared: [VARIABLE_A, VARIABLE_B, ...] concurrency_limit: CONCURRENCY_LIMIT BRANCHES_OR_FOR: ...
JSON
[ { "PARALLEL_STEP_NAME": { "parallel": { "exception_policy": "POLICY", "shared": [ "VARIABLE_A", "VARIABLE_B", ... ], "concurrency_limit": "CONCURRENCY_LIMIT", "BRANCHES_OR_FOR": ... } } } ]
替换以下内容:
PARALLEL_STEP_NAME
:并行步骤的名称。POLICY
(可选):确定其他操作 分支将发生未处理的异常。默认政策continueAll
,不会产生进一步的操作,所有其他分支都会 错误。请注意,continueAll
是目前唯一受支持的政策。VARIABLE_A
、VARIABLE_B
等 on:父作用域允许赋值的可写变量列表 并行执行的步骤。如需了解详情,请参阅共享变量。CONCURRENCY_LIMIT
(可选):在将其他分支和迭代加入队列等待之前,单个工作流执行过程中可以并发执行的分支和迭代的数量上限。这仅适用于单个parallel
步骤,不会级联。必须是正整数,可以是字面量值或表达式。对于 请参阅 并发限制。BRANCHES_OR_FOR
:使用branches
或for
指示以下某项:- 可以同时运行的分支。
- 迭代可以并发运行的循环。
请注意以下几点:
用并行步骤替换实验函数
如果您使用 experimental.executions.map
来支持并行工作,则可以改为迁移工作流以使用并行步骤,并并行执行常规 for
循环。如需查看示例,请参阅将实验性函数替换为并行步骤。
示例
这些示例演示了语法。
并行执行操作(使用分支)
如果您的工作流包含多组不同的步骤 同时,将它们放在并行分支中可以减少总时间 完成这些步骤所需的资源。
在以下示例中,系统会将用户 ID 作为参数传递给工作流,并从两项不同的服务并行检索数据。共享变量允许在分支中写入值,并在分支完成后读取:
YAML
JSON
并行处理项(使用并行循环)
如果您需要对列表中的每一项执行相同的操作,可以完成 使用并行循环更快地完成执行。并行循环允许并行执行多个循环迭代。请注意,不同于 常规 for 循环,迭代可以 可按任意顺序执行
在以下示例中,系统会以单个 ID 的形式处理一组用户通知,
并行 for
循环:
YAML
JSON
汇总数据(使用并行循环)
您可以处理一组项,同时从对每项执行的操作中收集数据。例如,您可能想跟踪 或维护出错项的列表。
在以下示例中,对一个公共 BigQuery 数据集进行的 10 次单独查询分别返回文档或文档集中的字数。借助共享变量,您可以累积字词数,并在所有迭代完成后读取该数值。在计算完所有文档的字数之后, 就会返回总数