Executar etapas do fluxo de trabalho em paralelo

As etapas paralelas podem reduzir o tempo total de execução de um fluxo de trabalho realizando várias chamadas de bloqueio ao mesmo tempo.

O bloqueio de chamadas, como sleep, chamadas HTTP e callbacks, pode levar de milissegundos a dias. As etapas paralelas servem para auxiliar nessas operações simultâneas de longa duração. Se um fluxo de trabalho precisar realizar várias chamadas de bloqueio independentes umas das outras, o uso de ramificações paralelas poderá reduzir o tempo total de execução iniciando as chamadas ao mesmo tempo e aguardando a conclusão de todas.

Por exemplo, se o fluxo de trabalho precisa recuperar dados de clientes de vários sistemas independentes antes de continuar, as ramificações paralelas permitem solicitações de API simultâneas. Se houver cinco sistemas e cada um leva dois segundos para responder, a execução das etapas sequencialmente em um fluxo de trabalho pode levar pelo menos 10 segundos. A execução delas em paralelo pode levar apenas dois.

Criar uma etapa paralela

Crie uma etapa parallel para definir uma parte do seu fluxo de trabalho em que duas ou mais etapas possam ser executadas simultaneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Substitua:

  • PARALLEL_STEP_NAME: o nome da etapa paralela.
  • POLICY (opcional): determina a ação que outras ramificações vão realizar quando ocorrer uma exceção não processada. A política padrão, continueAll, não resulta em mais nenhuma ação, e todas as outras ramificações tentarão ser executadas. No momento, continueAll é a única política compatível.
  • VARIABLE_A, VARIABLE_B e assim por diante: uma lista de variáveis graváveis com escopo pai que permitem atribuições na etapa paralela. Para mais informações, consulte Variáveis compartilhadas.
  • CONCURRENCY_LIMIT (opcional): o número máximo de ramificações e iterações que podem ser executadas simultaneamente em uma única execução de fluxo de trabalho antes que outras ramificações e iterações sejam enfileiradas para aguardar. Isso se aplica a uma única etapa parallel e não é distribuído em cascata. Precisa ser um número inteiro positivo e pode ser um valor literal ou uma expressão. Para detalhes, consulte Limites de simultaneidade.
  • BRANCHES_OR_FOR: use branches ou for para indicar uma das seguintes opções:
    • Ramificações que podem ser executadas simultaneamente.
    • Um loop em que as iterações podem ser executadas simultaneamente.

Observações:

  • As ramificações e iterações paralelas podem ser executadas em qualquer ordem e em uma ordem diferente a cada execução.
  • Elas podem incluir outras etapas paralelas aninhadas até o limite de profundidade. Consulte Cotas e limites.
  • Para mais detalhes, consulte a página de referência de sintaxe para etapas paralelas.

Substituir função experimental por etapa paralela

Se você estiver usando experimental.executions.map para dar suporte ao trabalho paralelo, poderá migrar seu fluxo de trabalho para usar etapas paralelas, executando loops for comuns em paralelo. Para exemplos, consulte Substituir a função experimental por uma etapa paralela.

Amostras

Estas amostras demonstram a sintaxe.

Realizar operações em paralelo (usando ramificações)

Se o fluxo de trabalho tiver vários conjuntos de etapas diferentes que possam ser executados ao mesmo tempo, colocá-los em ramificações paralelas pode diminuir o tempo total necessário para concluir essas etapas.

No exemplo a seguir, um ID do usuário é transmitido como um argumento para o fluxo de trabalho, e os dados são recuperados em paralelo de dois serviços diferentes. As variáveis compartilhadas permitem a gravação de valores nas ramificações e a leitura após elas serem concluídas:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Processar itens em paralelo (usando um loop paralelo)

Se você precisar realizar a mesma ação para cada item de uma lista, poderá concluir a execução mais rapidamente usando um loop paralelo. Ela permite que várias iterações de loop sejam realizadas em paralelo. Observe que, ao contrário de loops for normais, as iterações podem ser realizadas em qualquer ordem.

No exemplo a seguir, um conjunto de notificações do usuário é processado em uma repetição for paralela:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agregar dados (usando um loop paralelo)

É possível processar um conjunto de itens enquanto coleta dados das operações realizadas em cada item. Por exemplo, convém rastrear os IDs dos itens criados ou manter uma lista de itens com erros.

No exemplo a seguir, cada uma de 10 consultas separadas em um conjunto de dados público do BigQuery retorna o número de palavras em um documento ou conjunto de documentos. Uma variável compartilhada permite que a contagem das palavras se acumule e seja lida após a conclusão de todas as iterações. Depois de calcular o número de palavras em todos os documentos, o fluxo de trabalho retorna o total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

A seguir