워크플로 단계 병렬 실행

병렬 단계에서 여러 차단 호출을 동시에 수행하면 워크플로의 총 실행 시간을 줄일 수 있습니다.

sleep, HTTP 호출, 콜백 등의 차단 호출에는 밀리초 단위에서 일 단위까지 시간이 걸릴 수 있습니다. 병렬 단계의 용도는 이러한 동시 장기 실행 작업을 지원하는 것입니다. 워크플로에서 서로 독립된 여러 차단 호출을 수행해야 하는 경우 병렬 브랜치를 사용하면 호출을 동시에 시작하고 모든 호출이 완료되기를 기다리므로 총 실행 시간을 줄일 수 있습니다.

예를 들어 워크플로가 진행되기 전에 여러 독립 시스템에서 고객 데이터를 검색해야 하는 경우 병렬 브랜치로 API 요청을 동시에 수행할 수 있습니다. 시스템이 5개 있고 각 시스템의 응답 시간이 2초라면 워크플로에서 단계를 순차적으로 수행하는 데 최소 10초가 걸리지만, 단계를 병렬로 수행하면 2초만 걸릴 수 있습니다.

병렬 단계 만들기

parallel 단계를 만들어 두 개 이상의 단계가 동시에 실행될 수 있는 워크플로 부분을 정의합니다.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

다음을 바꿉니다.

  • PARALLEL_STEP_NAME: 병렬 단계의 이름입니다.
  • POLICY(선택사항): 미처리 예외가 발생할 때 다른 브랜치에서 수행할 작업을 결정합니다. 기본 정책인 continueAll은 추가 조치를 취하지 않으며 다른 모든 브랜치에서 실행을 시도합니다. 현재는 continueAll 정책만 지원됩니다.
  • VARIABLE_A, VARIABLE_B 등: 병렬 단계 내에서 할당을 허용하는 상위 범위가 포함된 쓰기 가능한 변수의 목록입니다. 자세한 내용은 공유 변수를 참조하세요.
  • CONCURRENCY_LIMIT(선택사항): 추가 브랜치 및 반복이 큐에 추가될 때까지 단일 워크플로 실행 내에서 동시에 실행될 수 있는 최대 브랜치 및 반복 횟수입니다. 이는 단일 parallel 단계에만 적용되며 하위 단계로는 적용되지 않습니다. 양의 정수여야 하며 리터럴 값 또는 표현식일 수 있습니다. 자세한 내용은 동시 실행 제한을 참조하세요.
  • BRANCHES_OR_FOR: branches 또는 for를 사용하여 다음 중 하나를 나타냅니다.
    • 동시에 실행될 수 있는 브랜치
    • 반복이 동시에 실행될 수 있는 루프

다음에 유의하세요.

  • 병렬 브랜치와 반복은 순서에 관계없이 실행될 수 있으며 실행할 때마다 실행 순서가 달라질 수 있습니다.
  • 병렬 단계에는 깊이 한도까지 다른 중첩 병렬 단계가 포함될 수 있습니다. 할당량 및 한도를 참조하세요.
  • 자세한 내용은 병렬 단계의 문법 참조 페이지를 참조하세요.

실험용 함수를 병렬 단계로 교체

experimental.executions.map을 사용하여 병렬 작업을 지원하는 경우 병렬 단계를 대신 사용하도록 워크플로를 마이그레이션하여 일반적인 for 루프를 병렬로 실행할 수 있습니다. 실험용 함수를 병렬 단계로 교체의 예시를 참조하세요.

샘플

이러한 샘플은 문법을 보여줍니다.

병렬로 작업 수행(브랜치 사용)

워크플로에 동시에 실행 가능한 서로 다른 여러 단계 집합이 있는 경우, 이러한 집합을 병렬 브랜치에 배치하여 단계를 완료하는 데 필요한 총 시간을 줄일 수 있습니다.

다음 예시에서는 사용자 ID를 워크플로에 인수로 전달하고 서로 다른 두 서비스에서 데이터를 병렬로 검색합니다. 공유 변수를 사용하면 브랜치에서 값을 기록하고 브랜치가 완료된 후에 읽을 수 있습니다.

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

병렬로 항목 처리(병렬 루프 사용)

목록의 각 항목에 동일한 작업을 수행해야 하는 경우 병렬 루프를 사용하여 실행을 더 빠르게 완료할 수 있습니다. 병렬 루프를 사용하면 여러 루프 반복을 동시에 수행할 수 있습니다. 일반적인 for 루프와 달리 반복은 순서에 관계없이 수행할 수 있습니다.

다음 예시에서는 병렬 for 루프에서 사용자 알림 집합을 처리합니다.

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

합산 데이터(병렬 루프 사용)

항목 집합을 처리하면서 각 항목에 수행된 작업의 데이터를 수집할 수 있습니다. 예를 들어 생성된 항목의 ID를 추적하거나 오류가 있는 항목 목록을 유지할 수 있습니다.

다음 예시에서는 공개 BigQuery 데이터 세트에 대한 개별 쿼리 10개가 문서 또는 문서 집합의 단어 수를 각각 반환합니다. 공유 변수를 사용하면 단어 수를 누산하고 모든 반복이 완료된 후에 읽을 수 있습니다. 워크플로는 모든 문서의 단어 수를 계산한 후 합계를 반환합니다.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

다음 단계