Workflowschritte parallel ausführen

Parallele Schritte können die Gesamtausführungszeit für einen Workflow reduzieren, indem Mehrere blockierende Anrufe gleichzeitig.

Anrufe wie Ruhemodus, HTTP-Aufrufe und Callbacks können Zeit in Anspruch nehmen, von Millisekunden bis Tage. Parallele Schritte sollen bei solchen gleichzeitigen lang andauernden Vorgängen. Wenn ein Workflow mehrere blockierende Aufrufe ausführen muss, unabhängig voneinander sind, kann die Verwendung paralleler Zweige die Gesamtzahl indem Sie die Aufrufe zur gleichen Zeit starten und auf alle zu erledigen.

Wenn Ihr Workflow beispielsweise Kundendaten von mehreren unabhängige Systeme, bevor der Vorgang fortgesetzt wird, ermöglichen parallele Zweige gleichzeitige API-Anfragen. Wenn es fünf Systeme gibt und jedes zwei Sekunden benötigt, um zu antworten, die sequenzielle Ausführung der Schritte in einem Workflow könnte mindestens 10 Sekunden dauern; können in nur zwei Schritten gleichzeitig durchgeführt werden.

Parallelen Schritt erstellen

Erstellen Sie einen parallel-Schritt, um einen Teil Ihres Workflows zu definieren, in dem mindestens Schritte gleichzeitig ausgeführt werden können.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Ersetzen Sie Folgendes:

  • PARALLEL_STEP_NAME: der Name des parallelen Schritts.
  • POLICY (optional): Legt die Aktion für „Sonstiges“ fest. Zweige dauern, wenn eine unbehandelte Ausnahme auftritt. Die Standardrichtlinie, continueAll führt zu keiner weiteren Aktion. Alle anderen Zweige werden ausgeführt werden soll. Hinweis: continueAll ist derzeit die einzige Richtlinie, die unterstützt wird.
  • VARIABLE_A, VARIABLE_B usw. on: eine Liste der beschreibbaren Variablen mit übergeordnetem Bereich, die Zuweisungen ermöglichen in den parallelen Schritt ein. Weitere Informationen finden Sie unter Gemeinsam genutzte Variablen:
  • CONCURRENCY_LIMIT (optional): Die maximale Anzahl von Zweige und Iterationen, die gleichzeitig in einem einzelnen Workflow ausgeführt werden können Ausführung, bevor weitere Zweige und Iterationen in die Warteschlange gestellt werden. Dieses gilt nur für einen einzelnen parallel-Schritt und erfolgt nicht kaskadierend. Muss ein positive Ganzzahl und kann entweder ein Literalwert oder ein Ausdruck sein. Für finden Sie unter Beschränkungen für die Gleichzeitigkeit
  • BRANCHES_OR_FOR: Verwenden Sie entweder branches oder for, um Wählen Sie eine der folgenden Optionen aus:
    • Zweige, die gleichzeitig ausgeführt werden können.
    • Eine Schleife, in der Iterationen gleichzeitig ausgeführt werden können.

Wichtige Hinweise:

  • Parallele Zweige und Iterationen können in beliebiger Reihenfolge ausgeführt werden. bei jeder Ausführung in unterschiedlicher Reihenfolge anordnen.
  • Parallele Schritte können andere verschachtelte parallele Schritte bis zum Tiefenlimit umfassen. Siehe Kontingente und Limits.
  • Weitere Informationen finden Sie auf der Syntaxreferenzseite zu parallele Schritte.

Experimentelle Funktion durch parallelen Schritt ersetzen

Wenn Sie experimental.executions.map verwenden, um paralleles Arbeiten zu unterstützen, können Sie Ihren Workflow auf parallele Schritte umstellen, for wird parallel wiederholt. Beispiele finden Sie unter Experimentelle Funktion durch parallelen Schritt ersetzen

Beispiele

Diese Beispiele veranschaulichen die Syntax.

Vorgänge parallel ausführen (mit Zweigen)

Wenn Ihr Workflow mehrere verschiedene Gruppen von Schritten umfasst, die ausgeführt werden können Gleichzeitig kann eine Platzierung in parallelen Zweigen die Gesamtzeit verringern, die für diese Schritte erforderlich sind.

Im folgenden Beispiel wird eine User-ID als Argument an den Workflow übergeben und Daten parallel von zwei verschiedenen Diensten abgerufen. Gemeinsam genutzte Variablen Zulassen, dass Werte in den Zweigen geschrieben und nach den Zweigen gelesen werden Abgeschlossen:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Elemente parallel verarbeiten (mithilfe einer parallelen Schleife)

Wenn Sie für jedes Element in einer Liste dieselbe Aktion ausführen müssen, können Sie mit einer Parallelschleife schneller ausführen. Eine parallele Schleife ermöglicht mehrere Schleifeniterationen gleichzeitig auszuführen. Im Gegensatz zu regulär für Schleifen, können Iterationen in beliebiger Reihenfolge durchgeführt werden.

Im folgenden Beispiel werden Nutzerbenachrichtigungen in einem Parallele for-Schleife:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Daten aggregieren (mithilfe einer parallelen Schleife)

Sie können eine Reihe von Elementen verarbeiten, während Sie Daten aus den Vorgängen erfassen. die für jedes Element ausgeführt werden. Sie können z. B. die IDs erstellter oder eine Liste mit fehlerhaften Elementen führen.

Im folgenden Beispiel werden 10 separate Abfragen an eine öffentliche BigQuery- Dataset gibt jeweils die Anzahl der Wörter in einem Dokument oder einer Gruppe von Dokumenten zurück. A gemeinsam genutzte Variable ermöglicht die Anzahl der Wörter, die sich angesammelt und nach allen Iterationen gelesen werden abgeschlossen ist. Nachdem wir die Anzahl der Wörter in allen Dokumenten berechnet haben, Workflow gibt die Gesamtsumme zurück.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Nächste Schritte