試験運用版の関数を並列ステップに置き換える

Workflows の試験運用版関数である experimental.executions.map は、対応する引数ごとにワークフローの実行を開始し、すべての実行が完了するまで待機して実行の結果として、各要素が記載されたリストを返します。

experimental.executions.map を使用して並列処理をサポートしている場合は、ワークフローを移行して並列ステップを使用し通常の for ループを並列実行するように設定できます。

parallel ステップは、2 つ以上のステップを同時に実行できるワークフローの一部を定義します。parallel ステップは、その中で定義されたすべてのステップが完了するか、未処理の例外によって中断されるまで待機してから、実行が続行されます。experimental.executions.map と同様に、実行順序は保証されません。詳細については、並列ステップの構文リファレンス ページをご覧ください。

experimental.executions.map または workflows.executions.run を使用するには、追加の同時実行割り当てが必要です。ただし、インライン化されたコネクタの呼び出しで並列ステップを使用する場合は(変換コネクタの例を参照)、追加の実行割り当ては必要ありません。

次の例は、experimental.executions.map の使用を parallel ステップに置き換える場合に活用できることを想定しています。

翻訳ワークフロー

ソース言語とターゲット言語を指定すると、次の translate という名前のワークフローが Cloud Translation コネクタを使用して、入力テキストを翻訳し、結果を返します。Cloud Translation API が有効になっている必要があります。

YAML

  main:
    params: [args]
    steps:
    - basic_translate:
        call: googleapis.translate.v2.translations.translate
        args:
          body:
            q: ${args.text}
            target: ${args.target}
            format: "text"
            source: ${args.source}
        result: r
    - return_step:
        return: ${r}

JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "basic_translate": {
            "call": "googleapis.translate.v2.translations.translate",
            "args": {
              "body": {
                "q": "${args.text}",
                "target": "${args.target}",
                "format": "text",
                "source": "${args.source}"
              }
            },
            "result": "r"
          }
        },
        {
          "return_step": {
            "return": "${r}"
          }
        }
      ]
    }
  }

前の例の入力:

{
  "text": "Bonjour",
  "target": "en",
  "source": "fr"
}

出力は次のようになります。

{
 "data": {
   "translations": [
     {
       "translatedText": "Hello"
     }
   ]
 }
}

experimental.executions.map を使用した一括翻訳ワークフロー

次のワークフローでは、テキストのバッチが翻訳されます。experimental.executions.map は、入力ごとに、以前に作成した translate ワークフローを実行します。

YAML

  main:
     steps:
     - init:
         assign:
         - workflow_id: "translate"
         - texts_to_translate:
             - text: "hello world!"
               source: "en"
               target: "fr"
             - text: "你好 世界!"
               source: "zh-CN"
               target: "en"
             - text: "No hablo español!"
               source: "es"
               target: "en"
     - translate_texts:
         call: experimental.executions.map
         args:
             workflow_id: ${workflow_id}
             arguments: ${texts_to_translate}
         result: translated
     - return:
             return: ${translated}

JSON

  {
    "main": {
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "translate"
              },
              {
                "texts_to_translate": [
                  {
                    "text": "hello world!",
                    "source": "en",
                    "target": "fr"
                  },
                  {
                    "text": "你好 世界!",
                    "source": "zh-CN",
                    "target": "en"
                  },
                  {
                    "text": "No hablo español!",
                    "source": "es",
                    "target": "en"
                  }
                ]
              }
            ]
          }
        },
        {
          "translate_texts": {
            "call": "experimental.executions.map",
            "args": {
              "workflow_id": "${workflow_id}",
              "arguments": "${texts_to_translate}"
            },
            "result": "translated"
          }
        },
        {
          "return": {
            "return": "${translated}"
          }
        }
      ]
    }
  }

出力は次のようになります。

[
  {
    "data": {
      "translations": [
        {
          "translatedText": "Bonjour le monde!"
        }
      ]
    }
  },
  {
    "data": {
      "translations": [
        {
          "translatedText": "Hello world!"
        }
      ]
    }
  },
  {
    "data": {
      "translations": [
        {
          "translatedText": "I don't speak Spanish!"
        }
      ]
    }
  }
]

experimental.executions.mapfor:in ループに置き換える

試験運用版の関数を使用する代わりに、並列の for:in ループを使用してテキストを翻訳できます。次の例では、セカンダリ ワークフロー translate をそのまま使用でき、出力は変更されません。並列ブランチで他のセカンダリ ワークフローの実行を開始することもできます。

共有変数 translated は結果を保存するために使用され、静的配列インデックスを有効にするために空の文字列が入力されます。順序付けが不要な場合は、list.concat を使用して結果を追加できます。並列ステップのすべての割り当てはアトミックです。

YAML

main:
  params: []
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]  # to write to this variable, you must share it
    - parallel_translate:
        parallel:
          shared: [translated]
          for:
            in: ${texts_to_translate}
            index: i  # optional, use if index is required
            value: arg
            steps:
              - translate:
                  call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                  args:
                    workflow_id: ${workflow_id}
                    argument: ${arg}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "in": "${texts_to_translate}",
              "index": "i",
              "value": "arg",
              "steps": [
                {
                  "translate": {
                    "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                    "args": {
                      "workflow_id": "${workflow_id}",
                      "argument": "${arg}"
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

experimental.executions.mapfor:range ループに置き換える

試験運用版の関数を使用する代わりに、並列の for:range ループを使用してテキストを翻訳できます。for:range ループを使用することで、イテレーション範囲の開始と終了を指定できます。出力は変更されません。

YAML

main:
  params: []
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]  # to write to this variable, you must share it
    - parallel_translate:
        parallel:
          shared: [translated]
          for:
            range: ${[0, len(texts_to_translate) - 1]}
            value: i
            steps:
              - translate:
                  call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                  args:
                    workflow_id: ${workflow_id}
                    argument: ${texts_to_translate[i]}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "range": "${[0, len(texts_to_translate) - 1]}",
              "value": "i",
              "steps": [
                {
                  "translate": {
                    "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                    "args": {
                      "workflow_id": "${workflow_id}",
                      "argument": "${texts_to_translate[i]}"
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

ワークフローのソースコードをインライン化する

セカンダリ ワークフローが比較的短い場合は、読みやすくするために、直接メイン ワークフローに含めることをおすすめします。たとえば、次のワークフローでは、translate ワークフローのソースコードがインライン化されています。

YAML

main:
  params: [args]
  steps:
    - init:
        assign:
          - workflow_id: "translate"
          - texts_to_translate:
              - text: "hello world!"
                source: "en"
                target: "fr"
              - text: "你好 世界!"
                source: "zh-CN"
                target: "en"
              - text: "No hablo español!"
                source: "es"
                target: "en"
          - translated: ["", "", ""]
    - parallel_translate:
        parallel:
          shared: [translated]  # to write to this variable, you must share it
          for:
            range: ${[0, len(texts_to_translate) - 1]}
            value: i
            steps:
              - basic_translate:
                  call: googleapis.translate.v2.translations.translate
                  args:
                    body:
                      q: ${args.text}
                      target: ${args.target}
                      format: "text"
                      source: ${args.source}
                  result: r
              - set_result:
                  assign:
                    - translated[i]: ${r}
    - return:
        return: ${translated}

JSON

{
  "main": {
    "params": [
      "args"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "workflow_id": "translate"
            },
            {
              "texts_to_translate": [
                {
                  "text": "hello world!",
                  "source": "en",
                  "target": "fr"
                },
                {
                  "text": "你好 世界!",
                  "source": "zh-CN",
                  "target": "en"
                },
                {
                  "text": "No hablo español!",
                  "source": "es",
                  "target": "en"
                }
              ]
            },
            {
              "translated": [
                "",
                "",
                ""
              ]
            }
          ]
        }
      },
      {
        "parallel_translate": {
          "parallel": {
            "shared": [
              "translated"
            ],
            "for": {
              "range": "${[0, len(texts_to_translate) - 1]}",
              "value": "i",
              "steps": [
                {
                  "basic_translate": {
                    "call": "googleapis.translate.v2.translations.translate",
                    "args": {
                      "body": {
                        "q": "${args.text}",
                        "target": "${args.target}",
                        "format": "text",
                        "source": "${args.source}"
                      }
                    },
                    "result": "r"
                  }
                },
                {
                  "set_result": {
                    "assign": [
                      {
                        "translated[i]": "${r}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "return": {
          "return": "${translated}"
        }
      }
    ]
  }
}

次のステップ