Executar etapas do fluxo de trabalho em paralelo

Etapas paralelas podem reduzir o tempo total de execução de um fluxo de trabalho por meio da realização de várias chamadas de bloqueio ao mesmo tempo.

O bloqueio de chamadas, como sleep, chamadas HTTP e callbacks, pode levar de milissegundos a dias. As etapas paralelas visam ajudar nessas operações simultâneas de longa duração. Se um fluxo de trabalho precisar realizar várias chamadas de bloqueio independentes umas das outras, o uso de ramificações paralelas poderá reduzir o tempo total de execução iniciando as chamadas ao mesmo tempo e aguardando a conclusão de todas elas.

Por exemplo, se o fluxo de trabalho precisa recuperar dados do cliente de vários sistemas independentes antes de continuar, as ramificações paralelas permitem solicitações de API simultâneas. Se houver cinco sistemas e cada um levar dois segundos para responder, a execução das etapas sequencialmente em um fluxo de trabalho poderá levar pelo menos 10 segundos. A execução em paralelo pode levar apenas dois.

Criar uma etapa paralela

Crie uma etapa parallel para definir uma parte do fluxo de trabalho em que duas ou mais etapas podem ser executadas simultaneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Substitua:

  • PARALLEL_STEP_NAME: o nome da etapa paralela.
  • POLICY (opcional): determina a ação que outras ramificações vão realizar quando ocorrer uma exceção não processada. A política padrão, continueAll, não resulta em mais ações, e todas as outras ramificações tentarão ser executadas. Observe que continueAll é a única política compatível no momento.
  • VARIABLE_A, VARIABLE_B e assim por diante: uma lista de variáveis graváveis com escopo pai que permite atribuições na etapa paralela. Para mais informações, consulte Variáveis compartilhadas.
  • CONCURRENCY_LIMIT (opcional): o número máximo de ramificações e iterações que podem ser executadas simultaneamente em uma única execução de fluxo de trabalho antes que outras ramificações e iterações sejam enfileiradas para aguardar. Isso se aplica a uma única etapa de parallel e não é aplicada em cascata. Precisa ser um número inteiro positivo e pode ser um valor literal ou uma expressão. Para detalhes, consulte Limites de simultaneidade.
  • BRANCHES_OR_FOR: use branches ou for para indicar uma das seguintes opções:
    • Ramificações que podem ser executadas simultaneamente.
    • Um loop em que as iterações podem ser executadas simultaneamente.

Observações:

  • Ramificações e iterações paralelas podem ser executadas em qualquer ordem e em uma ordem diferente a cada execução.
  • Etapas paralelas podem incluir outras etapas paralelas aninhadas até o limite de profundidade. Consulte Cotas e limites.
  • Para mais detalhes, consulte a página de referência de sintaxe para etapas paralelas.

Substituir função experimental por uma etapa paralela

Se você estiver usando experimental.executions.map para oferecer suporte ao trabalho paralelo, poderá migrar seu fluxo de trabalho para usar etapas paralelas, executando loops comuns de for em paralelo. Por exemplo, consulte Substituir a função experimental por uma etapa paralela.

Amostras

Estas amostras demonstram a sintaxe.

Realizar operações em paralelo (usando ramificações)

Se o fluxo de trabalho tem vários conjuntos diferentes de etapas que podem ser executados ao mesmo tempo, colocá-los em ramificações paralelas pode diminuir o tempo total necessário para concluir essas etapas.

No exemplo a seguir, um ID do usuário é transmitido como um argumento para o fluxo de trabalho, e os dados são recuperados em paralelo de dois serviços diferentes. As variáveis compartilhadas permitem que os valores sejam gravados nas ramificações e lidos após a conclusão delas:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Processar itens em paralelo (usando um loop paralelo)

Se você precisar realizar a mesma ação para cada item de uma lista, conclua a execução mais rapidamente usando um loop paralelo. Com um loop paralelo, várias iterações de loop podem ser realizadas em paralelo. Ao contrário dos loops for regulares, as iterações podem ser realizadas em qualquer ordem.

No exemplo a seguir, um conjunto de notificações do usuário é processado em um loop for paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agregar dados (usando um loop paralelo)

É possível processar um conjunto de itens enquanto coleta dados das operações realizadas em cada item. Por exemplo, é possível rastrear os IDs dos itens criados ou manter uma lista de itens com erros.

No exemplo a seguir, 10 consultas separadas em um conjunto de dados público do BigQuery retornam o número de palavras em um documento ou conjunto de documentos. Uma variável compartilhada permite que a contagem das palavras acumule e seja lida após a conclusão de todas as iterações. Depois de calcular o número de palavras em todos os documentos, o fluxo de trabalho retorna o total.

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # numWords is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

A seguir