用平行步骤替换实验函数

Workflows 实验函数 experimental.executions.map 会为每个参数启动工作流执行,并等待所有执行完成,同时返回一个列表,其中每个元素都是执行结果。

如果您使用 experimental.executions.map 支持并行工作,则可以迁移工作流以改用并行步骤,从而并行执行普通 for 循环

parallel 步骤定义工作流的一部分,其中两个或多个步骤可以并发执行。parallel 步骤会等到其中定义的所有步骤都完成或被未处理的异常中断时执行,然后继续执行。与 experimental.executions.map 一样,无法保证执行顺序。如需了解详情,请参阅并行步骤的语法参考页面。

请注意,使用 experimental.executions.mapworkflows.executions.run 需要额外的并发执行配额。但是,如果并行步骤中内嵌了对连接器的调用(请参阅转换连接器示例),则无需额外的执行配额。

以下示例旨在帮助您将 experimental.executions.map 的用法替换为 parallel 步骤。

翻译工作流程

在给定源语言和目标语言的情况下,以下名为 translate 的工作流使用 Cloud Translation 连接器翻译一些输入文本并返回结果。请注意,必须启用 Cloud Translation API。

YAML

  main:
    params: [args]
    steps:
    - basic_translate:
        call: googleapis.translate.v2.translations.translate
        args:
          body:
            q: ${args.text}
            target: ${args.target}
            format: "text"
            source: ${args.source}
        result: r
    - return_step:
        return: ${r}

JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "basic_translate": {
            "call": "googleapis.translate.v2.translations.translate",
            "args": {
              "body": {
                "q": "${args.text}",
                "target": "${args.target}",
                "format": "text",
                "source": "${args.source}"
              }
            },
            "result": "r"
          }
        },
        {
          "return_step": {
            "return": "${r}"
          }
        }
      ]
    }
  }

上述示例的输入:

{
  "text": "Bonjour",
  "target": "en",
  "source": "fr"
}

您应该会看到类似如下所示的输出:

{
 "data": {
   "translations": [
     {
       "translatedText": "Hello"
     }
   ]
 }
}

使用 experimental.executions.map 的批量翻译工作流

以下工作流可翻译一批文本。对于每个输入,experimental.executions.map 会执行之前创建的 translate 工作流。

YAML

  main:
     steps:
     - init:
         assign:
         - workflow_id: "translate"
         - texts_to_translate:
             - text: "hello world!"
               source: "en"
               target: "fr"
             - text: "你好 世界!"
               source: "zh-CN"
               target: "en"
             - text: "No hablo español!"
               source: "es"
               target: "en"
     - translate_texts:
         call: experimental.executions.map
         args:
             workflow_id: ${workflow_id}
             arguments: ${texts_to_translate}
         result: translated
     - return:
             return: ${translated}

JSON

  {
    "main": {
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "translate"
              },
              {
                "texts_to_translate": [
                  {
                    "text": "hello world!",
                    "source": "en",
                    "target": "fr"
                  },
                  {
                    "text": "你好 世界!",
                    "source": "zh-CN",
                    "target": "en"
                  },
                  {
                    "text": "No hablo español!",
                    "source": "es",
                    "target": "en"
                  }
                ]
              }
            ]
          }
        },
        {
          "translate_texts": {
            "call": "experimental.executions.map",
            "args": {
              "workflow_id": "${workflow_id}",
              "arguments": "${texts_to_translate}"
            },
            "result": "translated"
          }
        },
        {
          "return": {
            "return": "${translated}"
          }
        }
      ]
    }
  }

您应该会看到类似如下所示的输出:

[
  {
    "data": {
      "translations": [
        {
          "translatedText": "Bonjour le monde!"
        }
      ]
    }
  },
  {
    "data": {
      "translations": [
        {
          "translatedText": "Hello world!"
        }
      ]
    }
  },
  {
    "data": {
      "translations": [
        {
          "translatedText": "I don't speak Spanish!"
        }
      ]
    }
  }
]

experimental.executions.map 替换为 for:in 循环

您可以不使用实验性函数,而是使用并行 for:in 循环来翻译文本。在以下示例中,辅助工作流 translate 可以按原样使用,并且输出应保持不变。您还可以选择在并行分支中启动其他辅助工作流执行。

共享变量 translated 用于存储结果,并填充有空字符串以启用静态数组索引。如果不需要排序,您可以使用 list.concat 附加结果。并行步骤中的所有赋值都是原子化的。

YAML

main:
  params: []
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]  # to write to this variable, you must share it
    - parallel_translate:
        parallel:
          shared: [translated]
          for:
            in: ${texts_to_translate}
            index: i  # optional, use if index is required
            value: arg
            steps:
              - translate:
                  call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                  args:
                    workflow_id: ${workflow_id}
                    argument: ${arg}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "in": "${texts_to_translate}",
              "index": "i",
              "value": "arg",
              "steps": [
                {
                  "translate": {
                    "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                    "args": {
                      "workflow_id": "${workflow_id}",
                      "argument": "${arg}"
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

experimental.executions.map 替换为 for:range 循环

您可以不使用实验性函数,而是使用并行的 for:range 循环来翻译文本。通过使用 for:range 循环,您可以指定迭代范围的开始和结束。输出应保持不变。

YAML

main:
  params: []
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]  # to write to this variable, you must share it
    - parallel_translate:
        parallel:
          shared: [translated]
          for:
            range: ${[0, len(texts_to_translate) - 1]}
            value: i
            steps:
              - translate:
                  call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                  args:
                    workflow_id: ${workflow_id}
                    argument: ${texts_to_translate[i]}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "range": "${[0, len(texts_to_translate) - 1]}",
              "value": "i",
              "steps": [
                {
                  "translate": {
                    "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                    "args": {
                      "workflow_id": "${workflow_id}",
                      "argument": "${texts_to_translate[i]}"
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

内嵌工作流源代码

如果辅助工作流相对简短,您可能希望将其直接包含在主工作流中,以提高可读性。例如,在以下工作流中,translate 工作流的源代码已内嵌。

YAML

main:
  params: [args]
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]
    - parallel_translate:
        parallel:
          shared: [translated]  # to write to this variable, you must share it
          for:
            range: ${[0, len(texts_to_translate) - 1]}
            value: i
            steps:
              - basic_translate:
                  call: googleapis.translate.v2.translations.translate
                  args:
                    body:
                      q: ${args.text}
                      target: ${args.target}
                      format: "text"
                      source: ${args.source}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [
      "args"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "range": "${[0, len(texts_to_translate) - 1]}",
              "value": "i",
              "steps": [
                {
                  "basic_translate": {
                    "call": "googleapis.translate.v2.translations.translate",
                    "args": {
                      "body": {
                        "q": "${args.text}",
                        "target": "${args.target}",
                        "format": "text",
                        "source": "${args.source}"
                      }
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

后续步骤