Workflowschritte parallel ausführen

Parallele Schritte können die Gesamtausführungszeit für einen Workflow reduzieren, indem mehrere blockierende Aufrufe gleichzeitig ausgeführt werden.

Blockierende Aufrufe wie sleep, HTTP-Aufrufe und Callbacks können von Millisekunden bis zu Tagen dauern. Parallele Schritte sollen bei solchen gleichzeitigen, langwierigen Vorgängen helfen. Wenn in einem Workflow mehrere blockierende Aufrufe ausgeführt werden müssen, die unabhängig voneinander sind, kann die Gesamtausführungszeit durch parallele Verzweigungen reduziert werden. Dazu werden die Aufrufe gleichzeitig gestartet und es wird gewartet, bis alle abgeschlossen sind.

Wenn in Ihrem Workflow beispielsweise Kundendaten aus mehreren unabhängigen Systemen abgerufen werden müssen, bevor fortgefahren werden kann, ermöglichen parallele Verzweigungen gleichzeitige API-Anfragen. Wenn es fünf Systeme gibt und jedes zwei Sekunden für die Antwort benötigt, kann die Ausführung der Schritte in einem Workflow mindestens 10 Sekunden dauern. Wenn sie parallel ausgeführt werden, kann das nur zwei Sekunden dauern.

Parallelen Schritt erstellen

Erstellen Sie einen parallel-Schritt, um einen Teil Ihres Workflows zu definieren, in dem zwei oder mehr Schritte gleichzeitig ausgeführt werden können.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Ersetzen Sie Folgendes:

  • PARALLEL_STEP_NAME: der Name des parallelen Schritts.
  • POLICY (optional): Bestimmt die Aktion, die andere Verzweigungen ausführen, wenn eine nicht behandelte Ausnahme auftritt. Die Standardrichtlinie continueAll führt zu keiner weiteren Aktion und alle anderen Verzweigungen werden ausgeführt. Derzeit wird nur continueAll unterstützt.
  • VARIABLE_A, VARIABLE_B usw.: eine Liste von beschreibbaren Variablen mit übergeordnetem Gültigkeitsbereich, die Zuweisungen innerhalb des parallelen Schritts zulassen Weitere Informationen finden Sie unter Gemeinsam genutzte Variablen.
  • CONCURRENCY_LIMIT (optional): Die maximale Anzahl von Verzweigungen und Iterationen, die innerhalb einer einzelnen Workflowausführung gleichzeitig ausgeführt werden können, bevor weitere Verzweigungen und Iterationen in die Warteschlange gestellt werden. Dies gilt nur für einen einzelnen parallel-Schritt und wird nicht kaskadiert. Muss eine positive Ganzzahl sein und kann entweder ein Literalwert oder ein Ausdruck sein. Weitere Informationen finden Sie unter Grenzwerte für die Parallelität.
  • BRANCHES_OR_FOR: Verwenden Sie entweder branches oder for, um eine der folgenden Optionen anzugeben:
    • Verzweigungen, die gleichzeitig ausgeführt werden können.
    • Eine Schleife, in der Iterationen gleichzeitig ausgeführt werden können.

Wichtige Hinweise:

  • Parallele Verzweigungen und Iterationen können in beliebiger Reihenfolge ausgeführt werden und bei jeder Ausführung in einer anderen Reihenfolge.
  • Parallele Schritte können andere, verschachtelte parallele Schritte bis zur Tiefeneinschränkung enthalten. Siehe Kontingente und Limits.
  • Weitere Informationen finden Sie auf der Syntaxreferenzseite für parallele Schritte.

Experimentelle Funktion durch parallelen Schritt ersetzen

Wenn Sie experimental.executions.map für parallele Arbeit verwenden, können Sie Ihren Workflow migrieren, um stattdessen parallele Schritte zu verwenden, bei denen normale for-Schleifen parallel ausgeführt werden. Beispiele finden Sie unter Experimentelle Funktion durch parallelen Schritt ersetzen.

Beispiele

Diese Beispiele veranschaulichen die Syntax.

Vorgänge parallel ausführen (mit Verzweigungen)

Wenn Ihr Workflow mehrere und unterschiedliche Schritte enthält, die gleichzeitig ausgeführt werden können, kann die Gesamtzeit für die Ausführung dieser Schritte durch das Platzieren in parallelen Verzweigungen verkürzt werden.

Im folgenden Beispiel wird eine Nutzer-ID als Argument an den Workflow übergeben und Daten werden parallel aus zwei verschiedenen Diensten abgerufen. Mit gemeinsamen Variablen können Werte in die Verzweigungen geschrieben und nach Abschluss der Verzweigungen gelesen werden:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Elemente parallel verarbeiten (mit einer parallelen Schleife)

Wenn Sie dieselbe Aktion für jeden Artikel in einer Liste ausführen müssen, können Sie die Ausführung mit einer parallelen Schleife schneller abschließen. Mit einer parallelen Schleife können mehrere Schleifeniterationen parallel ausgeführt werden. Im Gegensatz zu normalen For-Schleifen können Iterationen in beliebiger Reihenfolge ausgeführt werden.

Im folgenden Beispiel werden mehrere Nutzerbenachrichtigungen in einer parallelen for-Schleife verarbeitet:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Daten mithilfe einer parallelen Schleife aggregieren

Sie können mehrere Elemente verarbeiten und dabei Daten aus den Vorgängen erfassen, die auf den einzelnen Elementen ausgeführt werden. So können Sie beispielsweise die IDs der erstellten Elemente im Blick behalten oder eine Liste mit Elementen mit Fehlern führen.

Im folgenden Beispiel geben 10 separate Abfragen an ein öffentliches BigQuery-Dataset jeweils die Anzahl der Wörter in einem Dokument oder einer Gruppe von Dokumenten zurück. Mit einer freigegebenen Variablen kann die Anzahl der Wörter nach Abschluss aller Iterationen summiert und gelesen werden. Nachdem die Anzahl der Wörter in allen Dokumenten berechnet wurde, gibt der Workflow die Gesamtzahl zurück.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Nächste Schritte