Ejecuta pasos de flujo de trabajo en paralelo

Los pasos en paralelo pueden reducir el tiempo de ejecución total de un flujo de trabajo realizando varias llamadas de bloqueo al mismo tiempo.

Las llamadas de bloqueo, como sleep, llamadas HTTP y callbacks, pueden tardar desde milisegundos hasta días. Los pasos en paralelo están diseñados para ayudar con esas operaciones simultáneas de larga duración. Si un flujo de trabajo debe realizar varias llamadas de bloqueo que son independientes entre sí, el uso de ramas paralelas puede reducir el tiempo de ejecución total, ya que inicia las llamadas al mismo tiempo y espera a que se completen todas.

Por ejemplo, si tu flujo de trabajo debe recuperar datos del cliente de varios sistemas independientes antes de continuar, las ramas paralelas permiten solicitudes a la API simultáneas. Si hay cinco sistemas y cada uno tarda dos segundos en responder, realizar los pasos de forma secuencial en un flujo de trabajo podría tardar al menos 10 segundos, mientras que hacerlos en paralelo podría tardar tan solo dos.

Crea un paso en paralelo

Crea un paso parallel para definir una parte de tu flujo de trabajo en la que dos o más pasos se puedan ejecutar de forma simultánea.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Reemplaza lo siguiente:

  • PARALLEL_STEP_NAME: Es el nombre del paso en paralelo.
  • POLICY (opcional): Determina la acción que realizarán otras ramas cuando se produzca una excepción no controlada. La política predeterminada, continueAll, no genera ninguna otra acción, y se intentará ejecutar todas las demás ramas. Ten en cuenta que continueAll es la única política compatible actualmente.
  • VARIABLE_A, VARIABLE_B, etcétera: Es una lista de variables escribibles con alcance superior que permiten asignaciones dentro del paso en paralelo. Para obtener más información, consulta Variables compartidas.
  • CONCURRENCY_LIMIT (opcional): Es la cantidad máxima de ramas e iteraciones que se pueden ejecutar de forma simultánea dentro de una sola ejecución de flujo de trabajo antes de que se pongan en cola más ramas e iteraciones para esperar. Esto se aplica solo a un solo paso parallel y no se aplica en cascada. Debe ser un número entero positivo y puede ser un valor literal o una expresión. Para obtener más información, consulta Límites de simultaneidad.
  • BRANCHES_OR_FOR: Usa branches o for para indicar una de las siguientes opciones:
    • Ramas que se pueden ejecutar de forma simultánea.
    • Un bucle en el que las iteraciones pueden ejecutarse de forma simultánea.

Ten en cuenta lo siguiente:

  • Las ramas y las iteraciones en paralelo pueden ejecutarse en cualquier orden y pueden ejecutarse en un orden diferente con cada ejecución.
  • Los pasos en paralelo pueden incluir otros pasos en paralelo anidados hasta el límite de profundidad. Consulta Cuotas y límites.
  • Para obtener más detalles, consulta la página de referencia de sintaxis de los pasos en paralelo.

Reemplaza la función experimental por un paso en paralelo

Si usas experimental.executions.map para admitir el trabajo en paralelo, puedes migrar tu flujo de trabajo para usar pasos en paralelo y ejecutar bucles for ordinarios en paralelo. Para ver ejemplos, consulta Cómo reemplazar la función experimental por un paso en paralelo.

Muestras

En estos ejemplos, se muestra la sintaxis.

Cómo realizar operaciones en paralelo (con ramas)

Si tu flujo de trabajo tiene varios conjuntos de pasos diferentes que se pueden ejecutar al mismo tiempo, colocarlos en ramas paralelas puede disminuir el tiempo total necesario para completar esos pasos.

En el siguiente ejemplo, se pasa un ID de usuario como argumento al flujo de trabajo y se recuperan datos en paralelo desde dos servicios diferentes. Las variables compartidas permiten escribir valores en las ramas y leerlos después de que se completen:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Procesa elementos en paralelo (con un bucle paralelo)

Si necesitas realizar la misma acción para cada elemento de una lista, puedes completar la ejecución más rápido con un bucle en paralelo. Un bucle paralelo permite que se realicen varias iteraciones de bucle en paralelo. Ten en cuenta que, a diferencia de los bucle for normales, las iteraciones se pueden realizar en cualquier orden.

En el siguiente ejemplo, se procesa un conjunto de notificaciones del usuario en un bucle for en paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Agrega datos (con un bucle en paralelo)

Puedes procesar un conjunto de elementos mientras recopilas datos de las operaciones que se realizan en cada uno de ellos. Por ejemplo, es posible que desees hacer un seguimiento de los IDs de los elementos creados o mantener una lista de elementos con errores.

En el siguiente ejemplo, 10 consultas independientes a un conjunto de datos público de BigQuery muestran la cantidad de palabras en un documento o conjunto de documentos. Una variable compartida permite que el recuento de las palabras se acumule y se lea después de que se completen todas las iteraciones. Después de calcular la cantidad de palabras en todos los documentos, el flujo de trabajo muestra el total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

¿Qué sigue?