Replace experimental function with parallel step

The Workflows experimental function, experimental.executions.map, starts a workflow execution for each corresponding argument, and waits for all of the executions to finish, returning a list where each element is the result of an execution.

If you are using experimental.executions.map to support parallel work, you can migrate your workflow to use parallel steps instead, executing ordinary for loops in parallel.

A parallel step defines a part of your workflow where two or more steps can execute concurrently. A parallel step waits until all the steps defined within it have completed or are interrupted by an unhandled exception; execution then continues. Like experimental.executions.map, the execution order is not guaranteed. For details, see the syntax reference page for parallel steps.

Note that the use of experimental.executions.map or workflows.executions.run requires additional concurrent executions quota. However, when using parallel steps with calls to connectors inlined (see the translation connector example), no additional execution quota is required.

The following examples are intended to assist you when replacing the use of experimental.executions.map with a parallel step.

Translation workflow

Given a source and target language, the following workflow, named translate, uses the Cloud Translation connector to translate some input text and return the result. Note that the Cloud Translation API must be enabled.

YAML

  main:
    params: [args]
    steps:
    - basic_translate:
        call: googleapis.translate.v2.translations.translate
        args:
          body:
            q: ${args.text}
            target: ${args.target}
            format: "text"
            source: ${args.source}
        result: r
    - return_step:
        return: ${r}

JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "basic_translate": {
            "call": "googleapis.translate.v2.translations.translate",
            "args": {
              "body": {
                "q": "${args.text}",
                "target": "${args.target}",
                "format": "text",
                "source": "${args.source}"
              }
            },
            "result": "r"
          }
        },
        {
          "return_step": {
            "return": "${r}"
          }
        }
      ]
    }
  }

Input for the preceding example:

{
  "text": "Bonjour",
  "target": "en",
  "source": "fr"
}

The output should resemble the following:

{
 "data": {
   "translations": [
     {
       "translatedText": "Hello"
     }
   ]
 }
}

Batch translation workflow using experimental.executions.map

The following workflow translates a batch of texts. For each input, experimental.executions.map executes the translate workflow created previously.

YAML

  main:
     steps:
     - init:
         assign:
         - workflow_id: "translate"
         - texts_to_translate:
             - text: "hello world!"
               source: "en"
               target: "fr"
             - text: "你好 世界!"
               source: "zh-CN"
               target: "en"
             - text: "No hablo español!"
               source: "es"
               target: "en"
     - translate_texts:
         call: experimental.executions.map
         args:
             workflow_id: ${workflow_id}
             arguments: ${texts_to_translate}
         result: translated
     - return:
             return: ${translated}

JSON

  {
    "main": {
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "translate"
              },
              {
                "texts_to_translate": [
                  {
                    "text": "hello world!",
                    "source": "en",
                    "target": "fr"
                  },
                  {
                    "text": "你好 世界!",
                    "source": "zh-CN",
                    "target": "en"
                  },
                  {
                    "text": "No hablo español!",
                    "source": "es",
                    "target": "en"
                  }
                ]
              }
            ]
          }
        },
        {
          "translate_texts": {
            "call": "experimental.executions.map",
            "args": {
              "workflow_id": "${workflow_id}",
              "arguments": "${texts_to_translate}"
            },
            "result": "translated"
          }
        },
        {
          "return": {
            "return": "${translated}"
          }
        }
      ]
    }
  }

The output should resemble the following:

[
  {
    "data": {
      "translations": [
        {
          "translatedText": "Bonjour le monde!"
        }
      ]
    }
  },
  {
    "data": {
      "translations": [
        {
          "translatedText": "Hello world!"
        }
      ]
    }
  },
  {
    "data": {
      "translations": [
        {
          "translatedText": "I don't speak Spanish!"
        }
      ]
    }
  }
]

Replace experimental.executions.map with a for:in loop

Instead of using the experimental function, you can use a parallel for:in loop to translate the text. In the following example, the secondary workflow, translate, can be used as is and the output should remain unchanged. You also have the option of starting other secondary workflow executions in parallel branches.

A shared variable, translated, is used to store the results and is populated with empty strings to enable static array indexing. If ordering is not required, you can use list.concat to append the results. All assignments in parallel steps are atomic.

YAML

main:
  params: []
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]  # to write to this variable, you must share it
    - parallel_translate:
        parallel:
          shared: [translated]
          for:
            in: ${texts_to_translate}
            index: i  # optional, use if index is required
            value: arg
            steps:
              - translate:
                  call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                  args:
                    workflow_id: ${workflow_id}
                    argument: ${arg}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "in": "${texts_to_translate}",
              "index": "i",
              "value": "arg",
              "steps": [
                {
                  "translate": {
                    "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                    "args": {
                      "workflow_id": "${workflow_id}",
                      "argument": "${arg}"
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

Replace experimental.executions.map with a for:range loop

Instead of using the experimental function, you can use a parallel for:range loop to translate the text. By using a for:range loop, you can specify the beginning and end of an iteration range. The output should remain unchanged.

YAML

main:
  params: []
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]  # to write to this variable, you must share it
    - parallel_translate:
        parallel:
          shared: [translated]
          for:
            range: ${[0, len(texts_to_translate) - 1]}
            value: i
            steps:
              - translate:
                  call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                  args:
                    workflow_id: ${workflow_id}
                    argument: ${texts_to_translate[i]}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "range": "${[0, len(texts_to_translate) - 1]}",
              "value": "i",
              "steps": [
                {
                  "translate": {
                    "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                    "args": {
                      "workflow_id": "${workflow_id}",
                      "argument": "${texts_to_translate[i]}"
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

Inline the workflow source code

If the secondary workflow is relatively brief, you might want to include it directly within the main workflow for easier readability. For example, in the following workflow, the source code for the translate workflow has been inlined.

YAML

main:
  params: [args]
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]
    - parallel_translate:
        parallel:
          shared: [translated]  # to write to this variable, you must share it
          for:
            range: ${[0, len(texts_to_translate) - 1]}
            value: i
            steps:
              - basic_translate:
                  call: googleapis.translate.v2.translations.translate
                  args:
                    body:
                      q: ${args.text}
                      target: ${args.target}
                      format: "text"
                      source: ${args.source}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [
      "args"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "range": "${[0, len(texts_to_translate) - 1]}",
              "value": "i",
              "steps": [
                {
                  "basic_translate": {
                    "call": "googleapis.translate.v2.translations.translate",
                    "args": {
                      "body": {
                        "q": "${args.text}",
                        "target": "${args.target}",
                        "format": "text",
                        "source": "${args.source}"
                      }
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

What's next