Executar etapas do fluxo de trabalho em paralelo

Etapas paralelas podem reduzir o tempo total de execução de um fluxo de trabalho em realizar várias chamadas de bloqueio ao mesmo tempo.

Chamadas de bloqueio como sleep, Chamadas HTTP callbacks podem levar algum tempo, milissegundos para dias. O objetivo das etapas paralelas é ajudar nesses casos operações de longa duração. Se um fluxo de trabalho precisar realizar várias chamadas de bloqueio que sejam independentes umas das outras, o uso de ramificações paralelas pode reduzir o ambiente de execução iniciando as chamadas ao mesmo tempo e aguardando todos os para serem concluídas.

Por exemplo, se seu fluxo de trabalho precisar recuperar dados de clientes de vários independentes antes de continuar, as ramificações paralelas permitem solicitações de API. Se houver cinco sistemas e cada um levar dois segundos para responder, a execução das etapas sequencialmente em um fluxo de trabalho pode levar pelo menos 10 segundos; para executá-las em paralelo pode levar apenas duas.

Criar uma etapa paralela

Crie uma etapa parallel para definir uma parte do seu fluxo de trabalho em que dois ou mais etapas podem ser executadas simultaneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Substitua:

  • PARALLEL_STEP_NAME: o nome da etapa paralela.
  • POLICY (opcional): determina a ação ramificações ocorrem quando ocorre uma exceção não processada. A política padrão, continueAll, não resulta em mais nenhuma ação, e todas as outras ramificações que você tentou executar. No momento, continueAll é a única política compatível.
  • VARIABLE_A, VARIABLE_B e assim por diante on: uma lista de variáveis graváveis com escopo pai que permitem atribuições na etapa paralela. Para mais informações, consulte Variáveis compartilhadas.
  • CONCURRENCY_LIMIT (opcional): o número máximo de ramificações e iterações que podem ser executadas simultaneamente em um único fluxo de trabalho a execução antes que outras ramificações e iterações fiquem na fila. Isso aplica-se a uma única etapa parallel e não é aplicada em cascata. Precisa ser um número inteiro positivo e pode ser um valor literal ou uma expressão. Para detalhes, consulte Limites de simultaneidade.
  • BRANCHES_OR_FOR: use branches ou for para indique uma das opções a seguir:
    • Ramificações que podem ser executadas simultaneamente.
    • Um loop em que as iterações podem ser executadas simultaneamente.

Observe o seguinte:

  • As ramificações e iterações paralelas podem ser executadas em qualquer ordem uma ordem diferente a cada execução.
  • Elas podem incluir outras etapas paralelas aninhadas até o limite de profundidade. Consulte Cotas e limites.
  • Para obter mais detalhes, consulte a página de referência de sintaxe para etapas paralelas.
.

Substituir função experimental por etapa paralela

Se você estiver usando experimental.executions.map para dar suporte ao trabalho paralelo, será possível migrar seu fluxo de trabalho para usar etapas paralelas, executando tarefas comuns for faz uma repetição em paralelo. Para ver exemplos, consulte Substitua a função experimental por uma etapa paralela.

Amostras

Estas amostras demonstram a sintaxe.

Realizar operações em paralelo (usando ramificações)

Caso seu fluxo de trabalho tenha vários conjuntos de etapas diferentes que podem ser executados ao mesmo tempo, colocá-las em ramificações paralelas pode diminuir o tempo total necessárias para concluir essas etapas.

No exemplo a seguir, um ID de usuário é passado como um argumento para o fluxo de trabalho e os dados são recuperados em paralelo por dois serviços diferentes. Variáveis compartilhadas permitem gravar valores nas ramificações e ler após elas completar:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Processar itens em paralelo (usando um loop paralelo)

Se você precisar realizar a mesma ação para cada item em uma lista, poderá concluir a execução mais rapidamente usando um loop paralelo. Um loop paralelo permite várias iterações de loop a serem realizadas em paralelo. Observe que, ao contrário loops regulares, as iterações podem pode ser realizado em qualquer ordem.

No exemplo a seguir, um conjunto de notificações de usuário é processado em um loop for paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agregar dados (usando um loop paralelo)

É possível processar um conjunto de itens enquanto coleta dados das operações realizada em cada item. Por exemplo, convém acompanhar os IDs dos grupos itens ou manter uma lista de itens com erros.

No exemplo a seguir, 10 consultas separadas para uma instância pública do BigQuery conjunto de dados retorna o número de palavras em um documento ou conjunto de documentos. Um variável compartilhada permite que a contagem das palavras acumule e seja lida após todas as iterações concluído. Após calcular o número de palavras em todos os documentos, o o fluxo de trabalho retorna o total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

A seguir