Workflowschritte parallel ausführen

Parallele Schritte können die Gesamtausführungszeit für einen Workflow reduzieren, da mehrere blockierende Aufrufe gleichzeitig ausgeführt werden.

Das Blockieren von Aufrufen wie sleep, HTTP-Aufrufe und callbacks kann einige Zeit in Anspruch nehmen, von Millisekunden bis Tage. Parallele Schritte sollen bei solchen gleichzeitigen Vorgängen mit langer Ausführungszeit hilfreich sein. Wenn ein Workflow mehrere voneinander unabhängige blockierende Aufrufe ausführen muss, können Sie durch die Verwendung von parallelen Zweigen die Gesamtausführungszeit reduzieren. Dazu werden die Aufrufe zur gleichen Zeit gestartet und auf den Abschluss aller Zweige gewartet.

Wenn Ihr Workflow beispielsweise Kundendaten aus mehreren unabhängigen Systemen abrufen muss, bevor Sie fortfahren, ermöglichen parallele Zweige gleichzeitige API-Anfragen. Wenn es fünf Systeme gibt und jedes System zwei Sekunden zum Antworten benötigt, kann die sequenzielle Ausführung in einem Workflow mindestens 10 Sekunden in Anspruch nehmen. Eine parallele Ausführung kann nur zwei Sekunden in Anspruch nehmen.

Parallelen Schritt erstellen

Erstellen Sie einen parallel-Schritt, um einen Teil Ihres Workflows zu definieren, in dem zwei oder mehr Schritte gleichzeitig ausgeführt werden können.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Ersetzen Sie Folgendes:

  • PARALLEL_STEP_NAME: der Name des parallelen Schritts.
  • POLICY (optional): Legt die Aktion fest, die andere Zweige ausführen, wenn eine unbehandelte Ausnahme auftritt. Die Standardrichtlinie continueAll führt zu keinen weiteren Maßnahmen und alle anderen Zweige werden versuchen, ausgeführt zu werden. Derzeit wird nur die Richtlinie „continueAll“ unterstützt.
  • VARIABLE_A, VARIABLE_B usw.: Eine Liste von beschreibbaren Variablen mit übergeordnetem Bereich, die Zuweisungen im parallelen Schritt zulassen. Weitere Informationen finden Sie unter Freigegebene Variablen.
  • CONCURRENCY_LIMIT (optional): Die maximale Anzahl von Zweigen und Iterationen, die innerhalb einer einzelnen Workflowausführung gleichzeitig ausgeführt werden können, bevor weitere Zweige und Iterationen in die Warteschlange gestellt werden. Dies gilt nur für einen einzelnen parallel-Schritt und wird nicht kaskadiert. Muss eine positive Ganzzahl sein und kann entweder ein Literalwert oder ein Ausdruck sein. Weitere Informationen finden Sie unter Limits für die Gleichzeitigkeit.
  • BRANCHES_OR_FOR: Verwenden Sie entweder branches oder for für einen der folgenden Werte:
    • Zweige, die gleichzeitig ausgeführt werden können.
    • Eine Schleife, in der Iterationen gleichzeitig ausgeführt werden können.

Wichtige Hinweise:

  • Parallele Zweige und Iterationen können in beliebiger Reihenfolge und bei jeder Ausführung in einer anderen Reihenfolge ausgeführt werden.
  • Parallele Schritte können andere, verschachtelte parallele Schritte bis zur Tiefenbeschränkung umfassen. Siehe Kontingente und Limits.
  • Weitere Informationen finden Sie auf der Seite zur Syntaxreferenz für parallele Schritte.

Experimentelle Funktion durch parallelen Schritt ersetzen

Wenn Sie experimental.executions.map zur Unterstützung der parallelen Arbeit verwenden, können Sie Ihren Workflow migrieren, um stattdessen parallele Schritte zu verwenden und normale for-Schleifen parallel auszuführen. Beispiele finden Sie unter Experimentelle Funktion durch parallelen Schritt ersetzen.

Beispiele

Diese Beispiele veranschaulichen die Syntax.

Vorgänge parallel ausführen (mit Zweigen)

Wenn Ihr Workflow mehrere und unterschiedliche Gruppen von Schritten enthält, die gleichzeitig ausgeführt werden können, kann das Platzieren in parallelen Zweigen die für die Durchführung dieser Schritte erforderliche Gesamtzeit verringern.

Im folgenden Beispiel wird eine Nutzer-ID als Argument an den Workflow übergeben und Daten werden parallel von zwei verschiedenen Diensten abgerufen. Mit gemeinsam genutzten Variablen können Werte in die Zweige geschrieben und nach Abschluss der Zweige gelesen werden:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Elemente parallel verarbeiten (mit einer parallelen Schleife)

Wenn Sie für jedes Element in einer Liste dieselbe Aktion ausführen müssen, können Sie die Ausführung mithilfe einer parallelen Schleife schneller abschließen. Mit einer parallelen Schleife können mehrere Schleifeniterationen parallel ausgeführt werden. Beachten Sie, dass Iterationen im Gegensatz zu regulären for Loops in beliebiger Reihenfolge ausgeführt werden können.

Im folgenden Beispiel wird eine Reihe von Nutzerbenachrichtigungen in einer parallelen for-Schleife verarbeitet:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Daten mithilfe einer parallelen Schleife aggregieren

Sie können eine Reihe von Elementen verarbeiten, während Sie Daten aus den für jedes Element ausgeführten Vorgängen erfassen. Sie können beispielsweise die IDs erstellter Elemente verfolgen oder eine Liste der fehlerhaften Elemente verwalten.

Im folgenden Beispiel geben 10 separate Abfragen an ein öffentliches BigQuery-Dataset jeweils die Anzahl der Wörter in einem Dokument oder eine Gruppe von Dokumenten zurück. Mit einer gemeinsam genutzten Variablen kann die Anzahl der Wörter akkumuliert und nach allen Iterationen gelesen werden. Nachdem die Anzahl der Wörter in allen Dokumenten berechnet wurde, gibt der Workflow die Gesamtsumme zurück.

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # numWords is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Nächste Schritte