Executar etapas do fluxo de trabalho em paralelo

Etapas paralelas podem reduzir o tempo total de execução de um fluxo de trabalho em realizar várias chamadas de bloqueio ao mesmo tempo.

Chamadas de bloqueio, como sleep, chamadas HTTP e chamadas de retorno, podem levar de milissegundos a dias. As etapas paralelas têm como objetivo ajudar com essas operações longas simultâneas. Se um fluxo de trabalho precisar executar várias chamadas de bloqueio que são independentes umas das outras, o uso de ramificações paralelas pode reduzir o tempo total de execução iniciando as chamadas ao mesmo tempo e esperando que todas sejam concluídas.

Por exemplo, se seu fluxo de trabalho precisar recuperar dados de clientes de vários independentes antes de continuar, as ramificações paralelas permitem solicitações de API. Se houver cinco sistemas e cada um levar dois segundos para responder, a execução das etapas sequencialmente em um fluxo de trabalho poderá levar pelo menos 10 segundos. A execução delas em paralelo poderá levar apenas dois segundos.

Criar uma etapa paralela

Crie uma etapa parallel para definir uma parte do fluxo de trabalho em que duas ou mais etapas podem ser executadas simultaneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Substitua:

  • PARALLEL_STEP_NAME: o nome da etapa paralela.
  • POLICY (opcional): determina a ação ramificações ocorrem quando ocorre uma exceção não processada. A política padrão, continueAll, não resulta em mais nenhuma ação, e todas as outras ramificações que tenta executar. No momento, continueAll é a única política compatível.
  • VARIABLE_A, VARIABLE_B e assim por diante: uma lista de variáveis graváveis com escopo pai que permitem atribuições na etapa paralela. Para mais informações, consulte Variáveis compartilhadas.
  • CONCURRENCY_LIMIT (opcional): o número máximo de ramificações e iterações que podem ser executadas simultaneamente em uma única execução de fluxo de trabalho antes que outras ramificações e iterações sejam enfileiradas para aguardar. Isso aplica-se a uma única etapa parallel e não é aplicada em cascata. Precisa ser um número inteiro positivo e pode ser um valor literal ou uma expressão. Para detalhes, consulte Limites de simultaneidade.
  • BRANCHES_OR_FOR: use branches ou for para indicar uma das seguintes opções:
    • Ramos que podem ser executados simultaneamente.
    • Um loop em que as iterações podem ser executadas simultaneamente.

Observe o seguinte:

  • As iterações e as ramificações paralelas podem ser executadas em qualquer ordem e podem ser executadas em uma ordem diferente a cada execução.
  • As etapas paralelas podem incluir outras etapas paralelas aninhadas até o limite de profundidade. Consulte Cotas e limites.
  • Para mais detalhes, consulte a página de referência de sintaxe de etapas paralelas.

Substituir função experimental por etapa paralela

Se você estiver usando experimental.executions.map para oferecer suporte a trabalhos paralelos, poderá migrar seu fluxo de trabalho para usar etapas paralelas, executando loops for comuns em paralelo. Para conferir exemplos, consulte Substituir função experimental por etapa paralela.

Amostras

Estas amostras demonstram a sintaxe.

Realizar operações em paralelo (usando ramificações)

Se o fluxo de trabalho tiver vários e diferentes conjuntos de etapas que podem ser executados ao mesmo tempo, colocá-los em ramificações paralelas pode diminuir o tempo total necessário para concluir essas etapas.

No exemplo a seguir, um ID de usuário é passado como um argumento para o fluxo de trabalho e os dados são recuperados em paralelo por dois serviços diferentes. Variáveis compartilhadas permitem gravar valores nas ramificações e ler após elas completar:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Processar itens em paralelo (usando um loop paralelo)

Se você precisar realizar a mesma ação para cada item de uma lista, poderá concluir a execução mais rapidamente usando um loop paralelo. Um loop paralelo permite várias iterações de loop a serem realizadas em paralelo. Observe que, ao contrário loops regulares, as iterações podem pode ser realizado em qualquer ordem.

No exemplo a seguir, um conjunto de notificações de usuário é processado em um loop for paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agregar dados (usando um loop paralelo)

É possível processar um conjunto de itens enquanto coleta dados das operações realizadas em cada item. Por exemplo, você pode acompanhar os IDs dos itens criados ou manter uma lista de itens com erros.

No exemplo a seguir, 10 consultas separadas para uma instância pública do BigQuery conjunto de dados retorna o número de palavras em um documento ou conjunto de documentos. Uma variável compartilhada permite que a contagem de palavras seja acumulada e lida após a conclusão de todas as iterações. Após calcular o número de palavras em todos os documentos, o o fluxo de trabalho retorna o total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

A seguir