并行步骤可以同时执行多个阻塞调用,从而减少工作流的总执行时间。
sleep、HTTP 调用和callbacks等阻塞调用可能需要几秒到数天的时间。并行步骤旨在协助执行此类长时间运行的并发操作。如果某个工作流必须执行多个相互独立的阻塞调用,则使用并行分支可以同时启动这些调用并等待所有调用完成,从而减少总执行时间。
例如,如果您的工作流必须先从多个独立系统中检索客户数据,然后才能继续,则并行分支允许并发 API 请求。如果有 5 个系统,每个系统需要 2 秒响应,则在工作流中依序执行这些步骤可能需要至少 10 秒;并行执行这些步骤只需要 2 秒。
创建并行步骤
创建 parallel
步骤,定义可同时执行两个或多个步骤的工作流部分。
YAML
- PARALLEL_STEP_NAME: parallel: exception_policy: POLICY shared: [VARIABLE_A, VARIABLE_B, ...] concurrency_limit: CONCURRENCY_LIMIT BRANCHES_OR_FOR: ...
JSON
[ { "PARALLEL_STEP_NAME": { "parallel": { "exception_policy": "POLICY", "shared": [ "VARIABLE_A", "VARIABLE_B", ... ], "concurrency_limit": "CONCURRENCY_LIMIT", "BRANCHES_OR_FOR": ... } } } ]
请替换以下内容:
PARALLEL_STEP_NAME
:并行步骤的名称。POLICY
(可选):确定在发生未处理的异常时其他分支将执行的操作。默认政策continueAll
不会产生进一步的操作,所有其他分支都将尝试运行。请注意,continueAll
是目前唯一支持的政策。VARIABLE_A
、VARIABLE_B
等:具有父作用域的可写变量列表,允许进行并行步骤中赋值。如需了解详情,请参阅共享变量。CONCURRENCY_LIMIT
(可选):在将其他分支和迭代排队等待之前,可以在单个工作流执行中并发执行的分支和迭代的最大数量。这仅适用于单个parallel
步骤,不级联。必须是正整数,可以是字面量值或表达式。如需了解详情,请参阅并发限制。BRANCHES_OR_FOR
:使用branches
或for
表示以下其中一项:- 可以同时运行的分支。
- 迭代可以并发运行的循环。
请注意以下几点:
用并行步骤替换实验函数
如果您使用 experimental.executions.map
支持并行工作,则可以迁移工作流以改用并行步骤,从而并行执行普通的 for
循环。如需查看示例,请参阅将实验函数替换为并行步骤。
示例
这些示例演示了语法。
并行执行操作(使用分支)
如果您的工作流有多组不同的步骤可以同时执行,则将它们放在并行分支中可以减少完成这些步骤所需的总时间。
在以下示例中,用户 ID 作为参数传递给工作流,并从两个不同的服务并行检索数据。共享变量允许在分支中写入值,并在分支完成后读取:
YAML
JSON
并行处理项(使用并行循环)
如果您需要对列表中的每一项执行相同的操作,可以使用并行循环更快速地完成执行。并行循环允许并行执行多次循环迭代。请注意,与常规的 for 循环不同,迭代可按任何顺序执行。
在以下示例中,一组用户通知在并行 for
循环中进行处理:
YAML
JSON
汇总数据(使用并行循环)
您可以处理一组数据,同时从对各项内容执行的操作中收集数据。例如,您可能想要跟踪所创建项的 ID,或者维护存在错误的项的列表。
在以下示例中,针对公共 BigQuery 数据集的 10 个单独查询均会返回一个文档或一组文档中的字词数。共享变量允许累积单词的计数,并在所有迭代完成后读取该单词的计数。计算所有文档中的字数后,工作流会返回总字数。