ワークフローのステップを並列実行する

並列ステップによって、複数のブロッキング呼び出しを同時に実行することで、ワークフローの合計実行時間を短縮できます。

スリープHTTP 呼び出しコールバックなどのブロッキング呼び出しには、ミリ秒から数日かかることがあります。並列ステップは、このような長時間実行オペレーションの同時実行を支援することを目的としています。ワークフローが互いに独立して複数のブロック呼び出しを実行する必要がある場合、並列ブランチを使用すると、呼び出しを同時に開始してすべての呼び出しが完了するまで待機することで合計実行時間を短縮できます。

たとえば、ワークフローを続行する前に複数の独立したシステムから顧客データを取得する必要がある場合、並列ブランチを使用すると、API リクエストを同時に実行できます。5 つのシステムがあり、それぞれが応答するのに 2 秒かかる場合、ワークフローでステップを順次実行すると少なくとも 10 秒かかり、それらを並列に実行するとわずか 2 秒ですむことがあります。

並列ステップを作成する

parallel ステップを作成して、2 つ以上のステップを同時に実行できるワークフローの一部を定義します。

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

以下を置き換えます。

  • PARALLEL_STEP_NAME: 並列ステップの名前。
  • POLICY(省略可): 未処理の例外が発生したときに他のブランチが実行するアクションを決定します。デフォルトのポリシー continueAll では、それ以上のアクションは行われず、他のすべてのブランチが実行を試みます。現時点でサポートされているポリシーは continueAll のみです。
  • VARIABLE_AVARIABLE_B など: 並列ステップ内での割り当てを許可する親スコープを持つ書き込み可能な変数のリスト。詳細については、共有変数をご覧ください。
  • CONCURRENCY_LIMIT(省略可): 以降のブランチとイテレーションのクエリを待機する前に、1 つのワークフローで同時に実行できる、ブランチとイテレーションの最大数。これは単一の parallel ステップにのみ適用され、カスケードされません。正の整数である必要があり、リテラル値または式のいずれかを使用できます。詳細については、同時実行の制限をご覧ください。
  • BRANCHES_OR_FOR: branches または for を使用して、次のいずれかを指定します。
    • 同時に実行できるブランチ。
    • イテレーションが同時に実行されるループ。

次の点にご注意ください。

  • 並列ブランチと反復処理は任意の順序で実行できます。また、実行ごとに異なる順序で実行される場合があります。
  • 並列ステップには、深さの上限まで他のネストされた並列ステップを含めることができます。 割り当てと上限をご覧ください。
  • 詳細については、並列ステップの構文リファレンス ページをご覧ください。

試験運用版の関数を並列ステップに置き換える

experimental.executions.map を使用して並列処理をサポートしている場合は、ワークフローを移行して並列ステップを使用し通常の for ループを並列実行するように設定できます。例については、試験運用版の関数を並列ステップに置き換えるをご覧ください。

サンプル

ここでは、構文の例を示します。

並列でオペレーションを行う(ブランチを使用)

ご自身のワークフローに、同時実行できる複数かつ異なるステップがある場合、並列ブランチに配置することで、それらのステップの完了に必要な合計時間を短縮できます。

次の例では、ユーザー ID が引数としてワークフローに渡され、データが 2 つの異なるサービスから並行して取得されます。 共有変数を使用すると、ブランチに値を書き込み、ブランチの完了後に値を読み取れるようになります。

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

項目を並列処理する(並列ループを使用)

リスト内の各アイテムに対して同じアクションを行う必要がある場合、並列ループを使用すると、より高速に実行を完了できます。並列ループでは、複数のループのイテレーションを並行して実行できます。通常の for loop とは異なり、イテレーションは任意の順序で実行できます。

次の例では、一連のユーザー通知が並列 for ループで処理されます。

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

データを集計する(並列ループを使用)

それぞれのアイテムに対して実行されるオペレーションからデータを収集しながら、アイテムのセットを処理できます。たとえば、作成されたアイテムの ID の追跡や、エラーのあるアイテムのリスト管理を行えます。

次の例では、一般公開 BigQuery データセットに対する 10 個のクエリがそれぞれ、1 つのドキュメント内またはドキュメント セット内の単語の数を返します。共有変数を使用すると、単語のカウント数を蓄積し、すべてのイテレーションが完了した後に読み取ることができます。すべてのドキュメントにわたる単語数を計算した後、ワークフローによって合計が返されます。

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

次のステップ