Ejecutar pasos de flujo de trabajo en paralelo

Los pasos paralelos pueden reducir el tiempo total de ejecución de un flujo de trabajo realizando varias llamadas de bloqueo al mismo tiempo.

Las llamadas de bloqueo, como sleep, las llamadas HTTP y las retrollamadas, pueden tardar desde milisegundos hasta días. Los pasos paralelos están diseñados para ayudar con este tipo de operaciones simultáneas de larga duración. Si un flujo de trabajo debe realizar varias llamadas de bloqueo que sean independientes entre sí, el uso de ramas paralelas puede reducir el tiempo total de ejecución iniciando las llamadas al mismo tiempo y esperando a que se completen todas.

Por ejemplo, si tu flujo de trabajo debe obtener datos de clientes de varios sistemas independientes antes de continuar, las ramas paralelas permiten hacer solicitudes de API simultáneas. Si hay cinco sistemas y cada uno tarda dos segundos en responder, realizar los pasos de forma secuencial en un flujo de trabajo podría llevar al menos 10 segundos, mientras que si se realizan en paralelo, podrían tardar solo dos.

Crear un paso paralelo

Crea un paso parallel para definir una parte de tu flujo de trabajo en la que se puedan ejecutar dos o más pasos simultáneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Haz los cambios siguientes:

  • PARALLEL_STEP_NAME: el nombre del paso paralelo.
  • POLICY (opcional): determina la acción que llevarán a cabo otras ramas cuando se produzca una excepción no controlada. La política predeterminada, continueAll, no conlleva ninguna otra acción y se intentarán ejecutar todas las demás ramas. Ten en cuenta que continueAll es la única política que se admite actualmente.
  • VARIABLE_A, VARIABLE_B, etc.: una lista de variables de escritura con ámbito principal que permiten asignaciones en el paso paralelo. Para obtener más información, consulta Variables compartidas.
  • CONCURRENCY_LIMIT (opcional): número máximo de ramas e iteraciones que se pueden ejecutar simultáneamente en una sola ejecución de flujo de trabajo antes de que se pongan en cola otras ramas e iteraciones. Esto solo se aplica a un único paso parallel y no se propaga. Debe ser un número entero positivo y puede ser un valor literal o una expresión. Para obtener más información, consulta Límites de simultaneidad.
  • BRANCHES_OR_FOR: usa branches o for para indicar una de las siguientes opciones:
    • Ramas que se pueden ejecutar simultáneamente.
    • Un bucle en el que las iteraciones se pueden ejecutar simultáneamente.

Ten en cuenta lo siguiente:

  • Las ramas e iteraciones paralelas se pueden ejecutar en cualquier orden y pueden hacerlo en un orden diferente en cada ejecución.
  • Los pasos paralelos pueden incluir otros pasos paralelos anidados hasta el límite de profundidad. Consulta Cuotas y límites.
  • Para obtener más información, consulta la página de referencia de la sintaxis de los pasos paralelos.

Sustituir una función experimental por un paso paralelo

Si usas experimental.executions.map para admitir el trabajo en paralelo, puedes migrar tu flujo de trabajo para que use pasos paralelos y ejecute bucles for normales en paralelo. Para ver ejemplos, consulta Sustituir una función experimental por un paso paralelo.

Ejemplos

En estos ejemplos se muestra la sintaxis.

Realizar operaciones en paralelo (con ramificaciones)

Si tu flujo de trabajo tiene varios conjuntos de pasos diferentes que se pueden ejecutar al mismo tiempo, colocarlos en ramas paralelas puede reducir el tiempo total necesario para completarlos.

En el siguiente ejemplo, se transfiere un ID de usuario como argumento al flujo de trabajo y se recuperan datos en paralelo de dos servicios diferentes. Las variables compartidas permiten escribir valores en las ramas y leerlos después de que se completen las ramas:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Procesar elementos en paralelo (con un bucle paralelo)

Si necesitas realizar la misma acción para cada elemento de una lista, puedes completar la ejecución más rápidamente usando un bucle paralelo. Un bucle paralelo permite que se realicen varias iteraciones de bucle en paralelo. Ten en cuenta que, a diferencia de los bucles for normales, las iteraciones se pueden realizar en cualquier orden.

En el siguiente ejemplo, se procesa un conjunto de notificaciones de usuario en un bucle for paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agrega datos (con un bucle paralelo)

Puedes procesar un conjunto de elementos mientras recoges datos de las operaciones realizadas en cada elemento. Por ejemplo, puede que quieras registrar los IDs de los elementos creados o mantener una lista de elementos con errores.

En el siguiente ejemplo, 10 consultas independientes a un conjunto de datos público de BigQuery devuelven el número de palabras de un documento o de un conjunto de documentos. Una variable compartida permite que el recuento de palabras se acumule y se lea una vez que se hayan completado todas las iteraciones. Después de calcular el número de palabras de todos los documentos, el flujo de trabajo devuelve el total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Siguientes pasos