Workflowschritte parallel ausführen

Parallele Schritte können die Gesamtausführungszeit für einen Workflow reduzieren, indem Mehrere blockierende Anrufe gleichzeitig.

Anrufe wie Ruhemodus, HTTP-Aufrufe und Callbacks können Zeit in Anspruch nehmen, von Millisekunden bis Tage. Parallele Schritte sollen bei solchen gleichzeitigen lang andauernden Vorgängen. Wenn ein Workflow mehrere blockierende Aufrufe ausführen muss, unabhängig voneinander sind, kann die Verwendung paralleler Zweige die Gesamtzahl indem Sie die Aufrufe zur gleichen Zeit starten und auf alle zu erledigen.

Wenn in Ihrem Workflow beispielsweise Kundendaten aus mehreren unabhängigen Systemen abgerufen werden müssen, bevor fortgefahren werden kann, ermöglichen parallele Verzweigungen gleichzeitige API-Anfragen. Wenn es fünf Systeme gibt und jedes zwei Sekunden benötigt, um zu antworten, die sequenzielle Ausführung der Schritte in einem Workflow könnte mindestens 10 Sekunden dauern; können nur zwei Aktionen gleichzeitig in Anspruch genommen werden.

Parallelen Schritt erstellen

Erstellen Sie einen parallel-Schritt, um einen Teil Ihres Workflows zu definieren, in dem mindestens Schritte gleichzeitig ausgeführt werden können.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Ersetzen Sie Folgendes:

  • PARALLEL_STEP_NAME: der Name des parallelen Schritts.
  • POLICY (optional): Legt die Aktion für „Sonstiges“ fest. Zweige dauern, wenn eine unbehandelte Ausnahme auftritt. Die Standardrichtlinie, continueAll führt zu keiner weiteren Aktion. Alle anderen Zweige werden ausgeführt werden soll. Hinweis: continueAll ist derzeit die einzige Richtlinie, die unterstützt wird.
  • VARIABLE_A, VARIABLE_B usw. on: eine Liste der beschreibbaren Variablen mit übergeordnetem Bereich, die Zuweisungen ermöglichen in den parallelen Schritt ein. Weitere Informationen finden Sie unter Freigegebene Variablen.
  • CONCURRENCY_LIMIT (optional): Die maximale Anzahl von Zweige und Iterationen, die gleichzeitig in einem einzelnen Workflow ausgeführt werden können Ausführung, bevor weitere Zweige und Iterationen in die Warteschlange gestellt werden. Dies gilt nur für einen einzelnen parallel-Schritt und wird nicht kaskadiert. Muss eine positive Ganzzahl sein und kann entweder ein Literalwert oder ein Ausdruck sein. Für finden Sie unter Beschränkungen für die Nebenläufigkeit.
  • BRANCHES_OR_FOR: Verwenden Sie entweder branches oder for, um eine der folgenden Optionen anzugeben:
    • Verzweigungen, die gleichzeitig ausgeführt werden können.
    • Eine Schleife, in der Iterationen gleichzeitig ausgeführt werden können.

Wichtige Hinweise:

  • Parallele Zweige und Iterationen können in beliebiger Reihenfolge ausgeführt werden. bei jeder Ausführung in unterschiedlicher Reihenfolge anordnen.
  • Parallele Schritte können andere verschachtelte parallele Schritte bis zum Tiefenlimit umfassen. Siehe Kontingente und Limits.
  • Weitere Informationen finden Sie auf der Syntaxreferenzseite zu parallele Schritte.

Experimentelle Funktion durch parallelen Schritt ersetzen

Wenn Sie experimental.executions.map für parallele Arbeit verwenden, können Sie Ihren Workflow migrieren, um stattdessen parallele Schritte zu verwenden, bei denen normale for-Schleifen parallel ausgeführt werden. Beispiele finden Sie unter Experimentelle Funktion durch parallelen Schritt ersetzen

Beispiele

Diese Beispiele veranschaulichen die Syntax.

Vorgänge parallel ausführen (mit Verzweigungen)

Wenn Ihr Workflow mehrere und unterschiedliche Schritte enthält, die gleichzeitig ausgeführt werden können, kann die Gesamtzeit für die Ausführung dieser Schritte durch das Platzieren in parallelen Verzweigungen verkürzt werden.

Im folgenden Beispiel wird eine Nutzer-ID als Argument an den Workflow übergeben und Daten werden parallel aus zwei verschiedenen Diensten abgerufen. Gemeinsam genutzte Variablen Zulassen, dass Werte in den Zweigen geschrieben und nach den Zweigen gelesen werden Abgeschlossen:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Elemente parallel verarbeiten (mit einer parallelen Schleife)

Wenn Sie dieselbe Aktion für jeden Artikel in einer Liste ausführen müssen, können Sie die Ausführung mit einer parallelen Schleife schneller abschließen. Eine parallele Schleife ermöglicht mehrere Schleifeniterationen parallel ausführen. Im Gegensatz zu regulär für Schleifen, können Iterationen in beliebiger Reihenfolge durchgeführt werden.

Im folgenden Beispiel werden Nutzerbenachrichtigungen in einem Parallele for-Schleife:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Daten mithilfe einer parallelen Schleife aggregieren

Sie können eine Reihe von Elementen verarbeiten, während Sie Daten aus den Vorgängen erfassen. die für jedes Element ausgeführt werden. So können Sie beispielsweise die IDs der erstellten Elemente im Blick behalten oder eine Liste mit Elementen mit Fehlern führen.

Im folgenden Beispiel werden 10 separate Abfragen an eine öffentliche BigQuery- Dataset gibt jeweils die Anzahl der Wörter in einem Dokument oder einer Gruppe von Dokumenten zurück. Mit einer freigegebenen Variablen kann die Anzahl der Wörter nach Abschluss aller Iterationen summiert und gelesen werden. Nachdem wir die Anzahl der Wörter in allen Dokumenten berechnet haben, Workflow gibt die Gesamtsumme zurück.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Nächste Schritte