Ejecuta los pasos del flujo de trabajo en paralelo

Los pasos paralelos pueden reducir el tiempo total de ejecución de un flujo de trabajo Realizar varias llamadas de bloqueo al mismo tiempo.

El bloqueo de llamadas, como sleep, Llamadas HTTP las devoluciones de llamada pueden llevar tiempo, desde milisegundos a días. Los pasos paralelos están destinados a ayudar con esas operaciones de larga duración. Si un flujo de trabajo debe realizar varias llamadas de bloqueo que son independientes entre sí, el uso de ramas paralelas puede reducir el total tiempo de ejecución iniciando las llamadas al mismo tiempo y esperando todos que las completen.

Por ejemplo, si tu flujo de trabajo debe recuperar datos de clientes de varias sistemas independientes antes de continuar, las ramas paralelas permiten que Solicitudes a la API. Si hay cinco sistemas y cada uno tarda dos segundos en responder, realizar los pasos de forma secuencial en un flujo de trabajo podría tardar al menos 10 segundos; realizarlas en paralelo podría tardar tan solo dos.

Crea un paso paralelo

Crea un paso parallel para definir una parte de tu flujo de trabajo en la que dos o más los pasos pueden ejecutarse simultáneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Reemplaza lo siguiente:

  • PARALLEL_STEP_NAME: Es el nombre del paso paralelo.
  • POLICY (opcional): Determina la acción que otra que tomarán las ramas cuando ocurra una excepción no controlada. La política predeterminada, continueAll, no generará ninguna otra acción, y todas las demás ramas tendrán un intento de ejecución. Ten en cuenta que continueAll es la única política compatible por el momento.
  • VARIABLE_A, VARIABLE_B, etc. on: una lista de variables que admiten escritura con alcance superior que permiten asignaciones dentro del paso paralelo. Para obtener más información, consulta Variables compartidas.
  • CONCURRENCY_LIMIT (opcional): La cantidad máxima de iteraciones y ramas que pueden ejecutarse simultáneamente dentro de un solo flujo de trabajo ejecución antes de que más iteraciones y ramas queden en cola para esperar. Esta se aplica a un solo paso parallel y no se aplica en cascada. Debe ser un un número entero positivo y puede ser un valor literal o una expresión. Para en detalle, consulta Límites de simultaneidad.
  • BRANCHES_OR_FOR: Usa branches o for para indicar una de las siguientes opciones:
    • Ramas que pueden ejecutarse simultáneamente
    • Un bucle en el que las iteraciones pueden ejecutarse de forma simultánea.

Ten en cuenta lo siguiente:

  • Las iteraciones y ramas paralelas pueden ejecutarse en cualquier orden y pueden ejecutarse en un orden diferente con cada ejecución.
  • Los pasos paralelos pueden incluir otros pasos paralelos anidados hasta el límite de profundidad. Consulta Cuotas y límites.
  • Para obtener más detalles, consulta la página de referencia de sintaxis para pasos paralelos.

Reemplaza la función experimental por un paso paralelo

Si usas experimental.executions.map para admitir el trabajo paralelo, puedes migrar tu flujo de trabajo para usar pasos paralelos, ejecutando tareas de for se repiten en paralelo. Para ver ejemplos, consulta Reemplaza la función experimental por un paso paralelo.

Muestras

En estos ejemplos, se demuestra la sintaxis.

Ejecuta operaciones en paralelo (con ramas)

Si tu flujo de trabajo tiene varios conjuntos de pasos diferentes que se pueden ejecutar al mismo tiempo, colocarlas en ramas paralelas puede disminuir el tiempo total necesarios para completar esos pasos.

En el siguiente ejemplo, se pasa un ID de usuario como argumento al flujo de trabajo y los datos se recuperan en paralelo desde dos servicios diferentes. Variables compartidas permiten escribir valores en las ramas y leerlos después de estas completa:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Procesa elementos en paralelo (con un bucle paralelo)

Si necesitas realizar la misma acción para cada elemento de una lista, puedes completar la ejecución más rápido con un bucle paralelo. Un bucle paralelo permite varias iteraciones de bucle que se realizarán en paralelo. Ten en cuenta que, a diferencia de regulares bucles for, las iteraciones pueden realizar en cualquier orden.

En el siguiente ejemplo, se procesa un conjunto de notificaciones de usuario en un bucle for paralelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Datos agregados (con un bucle paralelo)

Puedes procesar un conjunto de elementos mientras recopilas datos de las operaciones en cada elemento. Por ejemplo, tal vez quieras hacer un seguimiento de los IDs artículos o mantener una lista de artículos con errores.

En el siguiente ejemplo, se muestran 10 consultas distintas a BigQuery pública conjunto de datos muestra la cantidad de palabras en un documento o conjunto de documentos. R variable compartida permite que el recuento de palabras se acumule y se lea después de todas las iteraciones que se completó. Después de calcular el número de palabras en todos los documentos, la el flujo de trabajo devuelve el total.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

¿Qué sigue?