Esegui i passaggi del flusso di lavoro in parallelo

I passaggi paralleli possono ridurre il tempo di esecuzione totale di un flusso di lavoro eseguendo più chiamate di blocco contemporaneamente.

Il blocco di chiamate come sleep, chiamate HTTP e callback può richiedere tempo, da millisecondi a giorni. I passaggi paralleli hanno lo scopo di assistere in operazioni simultanee di lunga durata. Se un flusso di lavoro deve eseguire più chiamate di blocco indipendenti l'una dall'altra, l'utilizzo di rami paralleli può ridurre il tempo di esecuzione totale avviando le chiamate contemporaneamente e attendendo il completamento di tutte.

Ad esempio, se il tuo flusso di lavoro deve recuperare i dati dei clienti da diversi sistemi indipendenti prima di continuare, i rami paralleli consentono richieste API simultanee. Se ci sono cinque sistemi e ognuno impiega due secondi per rispondere, l'esecuzione sequenziale dei passaggi in un flusso di lavoro potrebbe richiedere almeno 10 secondi, mentre l'esecuzione in parallelo potrebbe richiederne solo due.

Creare un passaggio parallelo

Crea un passaggio parallel per definire una parte del flusso di lavoro in cui due o più passaggi possono essere eseguiti contemporaneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Sostituisci quanto segue:

  • PARALLEL_STEP_NAME: il nome del passaggio parallelo.
  • POLICY (facoltativo): determina l'azione che gli altri rami intraprenderanno quando si verifica un'eccezione non gestita. Il criterio predefinito, continueAll, non comporta ulteriori azioni e tutti gli altri rami tentano di essere eseguiti. Tieni presente che continueAll è l'unica policy attualmente supportata.
  • VARIABLE_A, VARIABLE_B e così via: un elenco di variabili scrivibili con ambito principale che consentono assegnazioni all'interno del passaggio parallelo. Per ulteriori informazioni, vedi Variabili condivise.
  • CONCURRENCY_LIMIT (facoltativo): il numero massimo di rami e iterazioni che possono essere eseguiti contemporaneamente all'interno di una singola esecuzione del flusso di lavoro prima che altri rami e iterazioni vengano messi in coda in attesa. Questo si applica solo a un singolo passaggio parallel e non viene applicato a cascata. Deve essere un numero intero positivo e può essere un valore letterale o un'espressione. Per maggiori dettagli, consulta Limiti di concorrenza.
  • BRANCHES_OR_FOR: utilizza branches o for per indicare una delle seguenti opzioni:
    • Branch che possono essere eseguiti contemporaneamente.
    • Un ciclo in cui le iterazioni possono essere eseguite contemporaneamente.

Tieni presente quanto segue:

  • I rami e le iterazioni paralleli possono essere eseguiti in qualsiasi ordine e potrebbero essere eseguiti in un ordine diverso a ogni esecuzione.
  • I passaggi paralleli possono includere altri passaggi paralleli nidificati fino al limite di profondità. Consulta la sezione Quote e limiti.
  • Per maggiori dettagli, consulta la pagina di riferimento della sintassi per i passaggi paralleli.

Sostituisci la funzione sperimentale con un passaggio parallelo

Se utilizzi experimental.executions.map per supportare il lavoro parallelo, puoi migrare il flusso di lavoro per utilizzare passaggi paralleli, eseguendo cicli for ordinari in parallelo. Per esempi, vedi Sostituisci la funzione sperimentale con un passaggio parallelo.

Esempi

Questi esempi mostrano la sintassi.

Eseguire operazioni in parallelo (utilizzando i rami)

Se il tuo flusso di lavoro ha più set di passaggi diversi che possono essere eseguiti contemporaneamente, inserirli in rami paralleli può ridurre il tempo totale necessario per completare questi passaggi.

Nell'esempio seguente, un ID utente viene passato come argomento al flusso di lavoro e i dati vengono recuperati in parallelo da due servizi diversi. Le variabili condivise consentono di scrivere valori nei rami e di leggerli dopo il completamento dei rami:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Elaborare gli elementi in parallelo (utilizzando un loop parallelo)

Se devi eseguire la stessa azione per ogni elemento di un elenco, puoi completare l'esecuzione più rapidamente utilizzando un ciclo parallelo. Un ciclo parallelo consente di eseguire più iterazioni del ciclo in parallelo. Tieni presente che, a differenza dei normali cicli for, le iterazioni possono essere eseguite in qualsiasi ordine.

Nell'esempio seguente, un insieme di notifiche utente viene elaborato in un ciclo for parallelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Aggrega i dati (utilizzando un loop parallelo)

Puoi elaborare un insieme di elementi raccogliendo i dati delle operazioni eseguite su ciascun elemento. Ad esempio, potresti voler monitorare gli ID degli elementi creati o mantenere un elenco di elementi con errori.

Nell'esempio seguente, 10 query separate a un set di dati pubblico BigQuery restituiscono ciascuna il numero di parole in un documento o in un insieme di documenti. Una variabile condivisa consente di accumulare il conteggio delle parole e di leggerlo al termine di tutte le iterazioni. Dopo aver calcolato il numero di parole in tutti i documenti, il flusso di lavoro restituisce il totale.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Passaggi successivi