Exécuter les étapes du workflow en parallèle

Des étapes parallèles peuvent réduire le temps d'exécution total d'un workflow en effectuant plusieurs appels bloquants en même temps.

Le blocage des appels tels que sleep, appels HTTP et callbacks peut prendre du temps, de quelques millisecondes à quelques jours. Les étapes parallèles sont destinées à faciliter ces opérations simultanées de longue durée. Si un workflow doit effectuer plusieurs appels bloquants indépendants les uns des autres, l'utilisation de branches parallèles peut réduire la durée d'exécution totale en démarrant les appels en même temps et en attendant qu'ils se terminent tous.

Par exemple, si votre workflow doit récupérer des données client à partir de plusieurs systèmes indépendants avant de continuer, les branches parallèles autorisent les requêtes API simultanées. S'il existe cinq systèmes et que chacun met deux secondes à répondre, l'exécution séquentielle des étapes d'un workflow peut prendre au moins 10 secondes. Leur exécution en parallèle peut prendre à peine deux secondes.

Créer une étape parallèle

Créez une étape parallel pour définir une partie de votre workflow dans laquelle deux étapes ou plus peuvent s'exécuter simultanément.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Remplacez les éléments suivants :

  • PARALLEL_STEP_NAME: nom de l'étape parallèle.
  • POLICY (facultatif): détermine l'action effectuée par les autres branches lorsqu'une exception non gérée se produit. La règle par défaut, continueAll, n'entraîne aucune autre action et toutes les autres branches tentent de s'exécuter. Notez que continueAll est la seule règle actuellement compatible.
  • VARIABLE_A, VARIABLE_B, etc. : liste de variables accessibles en écriture avec un champ d'application parent autorisant les attributions au cours de l'étape parallèle. Pour en savoir plus, consultez la section Variables partagées.
  • CONCURRENCY_LIMIT (facultatif): nombre maximal de branches et d'itérations pouvant s'exécuter simultanément dans une même exécution de workflow avant que d'autres branches et itérations ne soient mises en file d'attente. Cela ne s'applique qu'à une seule étape parallel et n'est pas répercutée en cascade. Doit être un entier positif et peut être une valeur littérale ou une expression. Pour en savoir plus, consultez la section Limites de simultanéité.
  • BRANCHES_OR_FOR: utilisez branches ou for pour indiquer l'un des éléments suivants :
    • Branches pouvant s'exécuter simultanément.
    • Boucle dans laquelle des itérations peuvent s'exécuter simultanément.

Veuillez noter les points suivants :

  • Les branches et itérations parallèles peuvent s'exécuter dans n'importe quel ordre, dans un ordre différent à chaque exécution.
  • Les étapes parallèles peuvent inclure d'autres étapes parallèles imbriquées, jusqu'à la limite de profondeur. Consultez la page Quotas et limites pour en savoir plus.
  • Pour en savoir plus, consultez la page de référence de la syntaxe pour connaître les étapes parallèles.

Remplacer la fonction expérimentale par une étape parallèle

Si vous utilisez experimental.executions.map pour gérer le travail en parallèle, vous pouvez migrer votre workflow afin d'utiliser des étapes parallèles à la place, en exécutant les boucles for ordinaires en parallèle. Pour obtenir des exemples, consultez la section Remplacer la fonction expérimentale par une étape parallèle.

Exemples

Ces exemples illustrent la syntaxe.

Effectuer des opérations en parallèle (à l'aide de branches)

Si votre workflow comporte plusieurs ensembles d'étapes différents pouvant être exécutés en même temps, le fait de les placer dans des branches parallèles peut réduire le temps total nécessaire pour effectuer ces étapes.

Dans l'exemple suivant, un ID utilisateur est transmis en tant qu'argument au workflow, et les données sont récupérées en parallèle à partir de deux services différents. Les variables partagées permettent d'écrire des valeurs dans les branches et de les lire une fois les branches terminées:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Traiter des éléments en parallèle (à l'aide d'une boucle parallèle)

Si vous devez effectuer la même action pour chaque élément d'une liste, vous pouvez terminer l'exécution plus rapidement en utilisant une boucle parallèle. Une boucle parallèle permet d'effectuer plusieurs itérations de boucle en parallèle. Notez que, contrairement aux boucles For standards, les itérations peuvent être effectuées dans n'importe quel ordre.

Dans l'exemple suivant, un ensemble de notifications utilisateur est traité dans une boucle for parallèle:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agréger des données (à l'aide d'une boucle parallèle)

Vous pouvez traiter un ensemble d'éléments tout en collectant des données à partir des opérations effectuées sur chacun d'eux. Par exemple, vous pouvez suivre les ID des éléments créés ou conserver une liste des éléments comportant des erreurs.

Dans l'exemple suivant, 10 requêtes distinctes envoyées à un ensemble de données BigQuery public renvoient chacune le nombre de mots d'un document ou d'un ensemble de documents. Une variable partagée permet d'accumuler le nombre de mots et d'être lu une fois toutes les itérations terminées. Après avoir calculé le nombre de mots dans tous les documents, le workflow renvoie le total.

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # numWords is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Étapes suivantes