Executar etapas do fluxo de trabalho em paralelo

As etapas paralelas podem reduzir o tempo total de execução de um fluxo de trabalho realizando várias chamadas de bloqueio ao mesmo tempo.

Chamadas de bloqueio, como sleep, chamadas HTTP e chamadas de retorno, podem levar de milissegundos a dias. As etapas paralelas têm como objetivo ajudar com essas operações longas simultâneas. Se um fluxo de trabalho precisar executar várias chamadas de bloqueio que são independentes umas das outras, o uso de ramificações paralelas pode reduzir o tempo total de execução iniciando as chamadas ao mesmo tempo e esperando que todas sejam concluídas.

Por exemplo, se o fluxo de trabalho precisar recuperar dados do cliente de vários sistemas independentes antes de continuar, as ramificações paralelas permitem solicitações de API simultâneas. Se houver cinco sistemas e cada um levar dois segundos para responder, a execução das etapas em sequência em um fluxo de trabalho pode levar pelo menos 10 segundos. A execução em paralelo pode levar apenas dois segundos.

Criar uma etapa paralela

Crie uma etapa parallel para definir uma parte do fluxo de trabalho em que duas ou mais etapas podem ser executadas simultaneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Substitua:

  • PARALLEL_STEP_NAME: o nome da etapa paralela.
  • POLICY (opcional): determina a ação que outras filiais vão realizar quando ocorrer uma exceção não tratada. A política padrão, continueAll, não resulta em nenhuma outra ação, e todas as outras ramificações vão tentar ser executadas. continueAll é a única política compatível no momento.
  • VARIABLE_A, VARIABLE_B e assim por diante: uma lista de variáveis graváveis com escopo pai que permitem atribuições na etapa paralela. Para mais informações, consulte Variáveis compartilhadas.
  • CONCURRENCY_LIMIT (opcional): o número máximo de ramificações e iterações que podem ser executadas simultaneamente em uma única execução de fluxo de trabalho antes que outras ramificações e iterações sejam enfileiradas para aguardar. Isso se aplica apenas a uma única etapa parallel e não é aplicada em cascata. Precisa ser um número inteiro positivo e pode ser um valor literal ou uma expressão. Para detalhes, consulte Limites de simultaneidade.
  • BRANCHES_OR_FOR: use branches ou for para indicar uma das seguintes opções:
    • Ramos que podem ser executados simultaneamente.
    • Um loop em que as iterações podem ser executadas simultaneamente.

Observe o seguinte:

  • As iterações e as ramificações paralelas podem ser executadas em qualquer ordem e podem ser executadas em uma ordem diferente a cada execução.
  • As etapas paralelas podem incluir outras etapas paralelas aninhadas até o limite de profundidade. Consulte Cotas e limites.
  • Para mais detalhes, consulte a página de referência de sintaxe de etapas paralelas.

Substituir a função experimental por uma etapa paralela

Se você estiver usando experimental.executions.map para oferecer suporte a trabalhos paralelos, poderá migrar seu fluxo de trabalho para usar etapas paralelas, executando loops for comuns em paralelo. Para conferir exemplos, consulte Substituir função experimental por etapa paralela.

Amostras

Estas amostras demonstram a sintaxe.

Realizar operações em paralelo (usando ramificações)

Se o fluxo de trabalho tiver vários e diferentes conjuntos de etapas que podem ser executados ao mesmo tempo, colocá-los em filiais paralelas pode diminuir o tempo total necessário para concluir essas etapas.

No exemplo abaixo, um ID do usuário é transmitido como um argumento para o fluxo de trabalho e os dados são recuperados em paralelo de dois serviços diferentes. As variáveis compartilhadas permitem que os valores sejam gravados nas ramificações e lidos após a conclusão delas:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Processar itens em paralelo (usando um loop paralelo)

Se você precisar realizar a mesma ação para cada item de uma lista, poderá concluir a execução mais rapidamente usando um loop paralelo. Um loop paralelo permite que várias iterações de loop sejam realizadas em paralelo. Ao contrário dos loops for comuns, as iterações podem ser realizadas em qualquer ordem.

No exemplo abaixo, um conjunto de notificações do usuário é processado em um loop for paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agrupar dados (usando um loop paralelo)

É possível processar um conjunto de itens enquanto coleta dados das operações realizadas em cada item. Por exemplo, você pode acompanhar os IDs dos itens criados ou manter uma lista de itens com erros.

No exemplo a seguir, 10 consultas separadas para um conjunto de dados público do BigQuery retornam o número de palavras em um documento ou conjunto de documentos. Uma variável compartilhada permite que a contagem de palavras seja acumulada e lida após a conclusão de todas as iterações. Depois de calcular o número de palavras em todos os documentos, o fluxo de trabalho retorna o total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

A seguir