Exécuter les étapes du workflow en parallèle

Les étapes parallèles peuvent réduire le temps total d'exécution d'un workflow de effectuer plusieurs appels bloquants ; en même temps.

bloquer des appels tels que la mise en veille ; Appels HTTP les rappels peuvent prendre du temps, de millisecondes à plusieurs jours. Les étapes parallèles ont pour but de faciliter les opérations de longue durée. Si un workflow doit effectuer plusieurs appels bloquants sont indépendantes les unes des autres. L'utilisation de branches parallèles peut donc réduire le total le temps d'exécution en lançant les appels en même temps et en attendant pour qu'ils le terminent.

Par exemple, si votre flux de travail doit récupérer les données client de plusieurs systèmes indépendants avant de continuer, les branches parallèles permettent des connexions Requêtes API. S'il y a cinq systèmes et que chacun prend deux secondes pour répondre, l'exécution séquentielle des étapes d'un workflow pouvait prendre au moins 10 secondes. les exécuter en parallèle peut prendre à peine deux.

Créer une étape parallèle

Créez une étape parallel pour définir une partie de votre workflow dans laquelle deux étapes ou plus étapes peuvent s'exécuter simultanément.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Remplacez les éléments suivants :

  • PARALLEL_STEP_NAME: nom de l'étape parallèle.
  • POLICY (facultatif): détermine l'action branches lorsqu'une exception non gérée se produit. La stratégie par défaut, continueAll, n'entraîne aucune autre action, et toutes les autres branches s'exécuter. Notez que continueAll est la seule règle actuellement acceptée.
  • VARIABLE_A, VARIABLE_B, etc. on: liste de variables accessibles en écriture avec un champ d'application parent qui autorisent les attributions lors de l'étape parallèle. Pour en savoir plus, consultez Variables partagées.
  • CONCURRENCY_LIMIT (facultatif): le nombre maximal de branches et itérations pouvant s'exécuter simultanément dans un même workflow avant que d'autres branches et itérations soient mises en file d'attente. Ce ne s'applique qu'à une seule étape parallel et n'est pas en cascade. Doit être un entier positif pouvant être une valeur littérale ou une expression. Pour détails, consultez Limites de simultanéité.
  • BRANCHES_OR_FOR: utilisez branches ou for pour indiquent l'un des éléments suivants: <ph type="x-smartling-placeholder">
      </ph>
    • Branches pouvant s'exécuter simultanément
    • Boucle dans laquelle des itérations peuvent s'exécuter simultanément.

Veuillez noter les points suivants :

  • Les branches et itérations parallèles peuvent s'exécuter dans n'importe quel ordre, un ordre différent à chaque exécution.
  • Les étapes parallèles peuvent inclure d'autres étapes parallèles imbriquées jusqu'à la limite de profondeur. Consultez la page Quotas et limites pour en savoir plus.
  • Pour en savoir plus, consultez la page de référence sur la syntaxe pour étapes parallèles.

Remplacer la fonction expérimentale par une étape parallèle

Si vous utilisez experimental.executions.map pour effectuer des tâches parallèles, vous pouvez migrer votre workflow de façon à utiliser des étapes parallèles, en exécutant for boucles en parallèle. Pour obtenir des exemples, consultez Remplacer la fonction expérimentale par une étape parallèle.

Exemples

Ces exemples illustrent la syntaxe.

Effectuer des opérations en parallèle (à l'aide de branches)

Si votre workflow comporte plusieurs ensembles d'étapes pouvant être exécutés en même temps, les placer dans des branches parallèles peut réduire le temps total nécessaires pour effectuer ces étapes.

Dans l'exemple suivant, un ID utilisateur est transmis en tant qu'argument au workflow et les données sont récupérées en parallèle à partir de deux services différents. Variables partagées permettre l'écriture de valeurs dans les branches et la lecture après les branches terminer:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Traiter les éléments en parallèle (à l'aide d'une boucle parallèle)

Si vous devez effectuer la même action pour chaque élément d'une liste, vous pouvez effectuer l'exécution plus rapidement à l'aide d'une boucle parallèle. Une boucle parallèle permet plusieurs itérations de boucle à effectuer en parallèle. Notez que, contrairement des boucles for classiques, les itérations peuvent être effectuées dans n'importe quel ordre.

Dans l'exemple suivant, un ensemble de notifications utilisateur est traité dans un boucle for parallèle:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agréger des données (à l'aide d'une boucle parallèle)

Vous pouvez traiter un ensemble d'éléments tout en collectant des données à partir des opérations effectuées sur chaque élément. Par exemple, vous pouvez effectuer le suivi des identifiants des ou gérer une liste d'éléments comportant des erreurs.

Dans l'exemple suivant, 10 requêtes distinctes sont envoyées à une instance jeu de données renvoient chacune le nombre de mots dans un document, ou un ensemble de documents. A variable partagée permet d'accumuler le nombre de mots et d'être lu après toutes les itérations terminé. Après avoir calculé le nombre de mots dans tous les documents, le le workflow renvoie le total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Étape suivante