Execute passos do fluxo de trabalho em paralelo

Os passos paralelos podem reduzir o tempo de execução total de um fluxo de trabalho através da execução de várias chamadas de bloqueio ao mesmo tempo.

O bloqueio de chamadas como sleep, chamadas HTTP e callbacks pode demorar desde milissegundos a dias. Os passos paralelos destinam-se a ajudar com essas operações de longa duração simultâneas. Se um fluxo de trabalho tiver de fazer várias chamadas de bloqueio independentes entre si, a utilização de ramificações paralelas pode reduzir o tempo de execução total iniciando as chamadas ao mesmo tempo e aguardando a conclusão de todas elas.

Por exemplo, se o seu fluxo de trabalho tiver de obter dados de clientes de vários sistemas independentes antes de continuar, as ramificações paralelas permitem pedidos de API simultâneos. Se existirem cinco sistemas e cada um demorar dois segundos a responder, a execução dos passos sequencialmente num fluxo de trabalho pode demorar, pelo menos, 10 segundos; a execução em paralelo pode demorar apenas dois segundos.

Crie um passo paralelo

Crie um passo parallel para definir uma parte do seu fluxo de trabalho onde dois ou mais passos podem ser executados em simultâneo.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Substitua o seguinte:

  • PARALLEL_STEP_NAME: o nome do passo paralelo.
  • POLICY (opcional): determina a ação que outros ramos vão tomar quando ocorrer uma exceção não processada. A política predefinida, continueAll, não resulta em nenhuma ação adicional, e todos os outros ramos tentam ser executados. Tenha em atenção que continueAll é a única política atualmente suportada.
  • VARIABLE_A, VARIABLE_B e assim sucessivamente: uma lista de variáveis graváveis com âmbito principal que permitem atribuições no passo paralelo. Para mais informações, consulte o artigo Variáveis partilhadas.
  • CONCURRENCY_LIMIT (opcional): o número máximo de ramificações e iterações que podem ser executadas em simultâneo numa única execução do fluxo de trabalho antes de serem colocadas em fila de espera mais ramificações e iterações. Isto aplica-se apenas a um único passo parallel e não se propaga. Tem de ser um número inteiro positivo e pode ser um valor literal ou uma expressão. Para ver detalhes, consulte Limites de concorrência.
  • BRANCHES_OR_FOR: use branches ou for para indicar uma das seguintes opções:
    • Ramificações que podem ser executadas em simultâneo.
    • Um ciclo em que as iterações podem ser executadas em simultâneo.

Tenha em conta o seguinte:

  • As ramificações paralelas e as iterações podem ser executadas em qualquer ordem e podem ser executadas numa ordem diferente com cada execução.
  • Os passos paralelos podem incluir outros passos paralelos aninhados até ao limite de profundidade. Consulte as quotas e os limites.
  • Para mais detalhes, consulte a página de referência da sintaxe para passos paralelos.

Substitua a função experimental pelo passo paralelo

Se estiver a usar experimental.executions.map para suportar o trabalho paralelo, pode migrar o seu fluxo de trabalho para usar passos paralelos. Em alternativa, pode executar ciclos for normais em paralelo. Para ver exemplos, consulte o artigo Substitua a função experimental por um passo paralelo.

Amostras

Estes exemplos demonstram a sintaxe.

Realizar operações em paralelo (usando ramificações)

Se o seu fluxo de trabalho tiver vários conjuntos de passos diferentes que podem ser executados ao mesmo tempo, colocá-los em ramificações paralelas pode diminuir o tempo total necessário para concluir esses passos.

No exemplo seguinte, um ID do utilizador é transmitido como um argumento para o fluxo de trabalho e os dados são obtidos em paralelo a partir de dois serviços diferentes. As variáveis partilhadas permitem que os valores sejam escritos nos ramos e lidos após a conclusão dos ramos:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Processar itens em paralelo (usando um ciclo paralelo)

Se precisar de realizar a mesma ação para cada item numa lista, pode concluir a execução mais rapidamente usando um ciclo paralelo. Um ciclo paralelo permite que sejam realizadas várias iterações do ciclo em paralelo. Tenha em atenção que, ao contrário dos ciclos for normais, as iterações podem ser realizadas em qualquer ordem.

No exemplo seguinte, um conjunto de notificações do utilizador é processado num ciclo for paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agregue dados (através de um ciclo paralelo)

Pode processar um conjunto de itens enquanto recolhe dados das operações realizadas em cada item. Por exemplo, pode querer acompanhar os IDs dos itens criados ou manter uma lista de itens com erros.

No exemplo seguinte, 10 consultas separadas a um conjunto de dados público do BigQuery devolvem cada uma o número de palavras num documento ou num conjunto de documentos. Uma variável partilhada permite que a contagem das palavras se acumule e seja lida após a conclusão de todas as iterações. Depois de calcular o número de palavras em todos os documentos, o fluxo de trabalho devolve o total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

O que se segue?