Mit Callbacks warten

Callbacks ermöglichen es Workflow-Ausführungen, zu warten, bis ein anderer Dienst eine Anfrage an den Callback-Endpunkt sendet. Diese Anfrage setzt die Ausführung des Workflows fort.

Mit Callbacks können Sie Ihrem Workflow signalisieren, dass ein bestimmtes Ereignis aufgetreten ist, und auf dieses Ereignis ohne Abfrage warten. So können Sie zum Beispiel können Sie einen Workflow erstellen, der Sie benachrichtigt, wenn ein Produkt wieder auf Lager ist Artikel wurde versendet; oder darauf wartet, dass menschliche Interaktionen wie das Überprüfen eines eine Übersetzung zu bestellen oder zu überprüfen.

Auf dieser Seite erfahren Sie, wie Sie einen Workflow erstellen, der einen Callback-Endpunkt unterstützt und wartet, bis HTTP-Anfragen von externen Prozessen an diesem Endpunkt ankommen. Sie können auch mit Callbacks und Eventarc-Triggern auf Ereignisse warten.

Callbacks erfordern die Verwendung von zwei integrierten Funktionen für Standardbibliotheken:

Endpunkt erstellen, der eine Callback-Anfrage empfängt

Erstellen Sie einen Callback-Endpunkt, der HTTP-Anfragen erhalten kann, die am Endpunkt ankommen.

  1. Folgen Sie der Anleitung zum Erstellen Workflow erstellen oder einen bestehenden Workflow auswählen, Update aber noch nicht bereitstellen.
  2. Fügen Sie der Definition des Workflows einen Schritt hinzu, um einen Callback-Endpunkt zu erstellen:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Ersetzen Sie METHOD durch die erwartete HTTP-Methode, entweder GET, HEAD, POST, PUT, DELETE, OPTIONS oder PATCH. Der Standardwert ist POST.

    Das Ergebnis ist eine Zuordnung callback_details mit einem Feld url, das die URL des erstellten Endpunkts speichert.

    Der Callback-Endpunkt kann jetzt eingehende Anfragen mit der angegebenen HTTP-Methode empfangen. Die URL des erstellten Endpunkts kann verwendet werden, um den Callback von einem Prozess außerhalb des Workflows auszulösen. Dazu übergeben Sie beispielsweise die URL an eine Cloud Functions-Funktion.

  3. Fügen Sie in der Definition des Workflows einen Schritt hinzu, um auf eine Callback-Anfrage zu warten:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Ersetzen Sie TIMEOUT durch die maximale Anzahl von Sekunden, die der Workflow auf eine Anfrage warten soll. Der Standardwert ist 43.200 (12 Stunden). Wenn diese Zeit vor dem Empfang einer Anfrage verstrichen ist, wird ein TimeoutError ausgelöst.

    Beachten Sie, dass es eine maximale Ausführungsdauer gibt. Weitere Informationen finden Sie unter Das Anfragelimit.

    Die callback_details-Karte aus der früheren Der Schritt create_callback wird als Argument übergeben.

  4. Stellen Sie Ihren Workflow bereit, um ihn zu erstellen oder zu aktualisieren.

    Wenn eine Anfrage empfangen wird, werden alle Details der Anfrage in der Zuordnung callback_request gespeichert. Anschließend haben Sie Zugriff auf die gesamte HTTP-Anfrage, einschließlich Header, Text und eine query-Zuordnung für alle Abfrageparameter. Beispiel:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Wenn der HTTP-Text ein Text oder JSON ist, versucht Workflows, den Text zu decodieren. Andernfalls werden Rohbyte zurückgegeben.

Anfragen an den Callback-Endpunkt autorisieren

Zum Senden einer Anfrage an einen Callback-Endpunkt werden Google Cloud-Dienste wie Cloud Run und Cloud Functions sowie Drittanbieter müssen über die entsprechenden Berechtigungen zur Identitäts- und Zugriffsverwaltung (Identity and Access Management, IAM), und zwar workflows.callbacks.send (in der Rolle Workflows-Aufrufer enthalten).

Direkte Anfrage stellen

Am einfachsten erstellen Sie kurzlebige Anmeldedaten für ein Dienstkonto, indem Sie eine direkte Anfrage stellen. An diesem Ablauf sind zwei Identitäten beteiligt: der Aufrufer und das Dienstkonto, für das die Anmeldedaten erstellt werden. Der Aufruf des grundlegenden Workflows auf dieser Seite ist ein Beispiel für eine direkte Anfrage. Weitere Informationen finden Sie unter IAM zur Zugriffssteuerung verwenden und Berechtigungen für direkte Anfragen:

OAuth 2.0-Zugriffstoken generieren

Zum Autorisieren einer Anwendung zum Aufrufen des Callback-Endpunkts können Sie ein OAuth 2.0-Zugriffstoken für das mit dem Workflow verknüpfte Dienstkonto generieren. Wenn Sie die erforderlichen Berechtigungen (für die Rollen Workflows Editor oder Workflows Admin und Service Account Token Creator) haben, können Sie selbst ein Token generieren. Führen Sie dazu die generateAccessToken-Methode aus.

Wenn die Anfrage generateAccessToken erfolgreich ist, enthält der zurückgegebene Antworttext ein OAuth 2.0-Zugriffstoken und eine Ablaufzeit. Standardmäßig sind OAuth 2.0-Zugriffstokens maximal eine Stunde lang gültig. Beispiel:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
Der accessToken-Code kann dann in einem curl-Aufruf an den Callback-Endpunkt verwendet werden URL wie in den folgenden Beispielen gezeigt:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

OAuth-Token für eine Cloud Functions-Funktion generieren

Wenn Sie einen Callback von einer Cloud Functions-Funktion aufrufen, die dasselbe Dienstkonto wie der Workflow im selben Projekt verwendet, können Sie ein OAuth-Zugriffstoken in der Funktion selbst generieren. Beispiel:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Weitere Informationen finden Sie in der Anleitung Human-in-the-Loop-Workflow mit Callbacks erstellen.

Offlinezugriff anfordern

Zugriffstokens laufen regelmäßig ab und werden zu ungültigen Anmeldedaten für eine zugehörige API-Anfrage. Sie können ein Zugriffstoken aktualisieren, ohne den Nutzer um eine Berechtigung zu bitten, wenn Sie den Offlinezugriff auf die mit dem Token verknüpften Bereiche angefordert haben. Das Anfordern des Offlinezugriffs ist eine Voraussetzung für jede Anwendung, die auf eine Google API zugreifen muss, wenn der Nutzer nicht vorhanden ist. Weitere Informationen finden Sie unter Zugriffstoken aktualisieren (Offlinezugriff).

Workflow genau einmal mithilfe von Callbacks aufrufen

Callbacks sind vollständig idempotent, was bedeutet, dass Sie Einen Callback wiederholen, wenn er fehlschlägt ohne unerwartete Ergebnisse oder Nebenwirkungen zu verursachen.

Nachdem du einen Callback-Endpunkt erstellt hast, kann die URL eingehende -Anfragen und wird normalerweise vor dem entsprechenden Aufruf an einen Aufrufer zurückgegeben. await_callback wurde erstellt. Wenn die Callback-URL jedoch noch nicht wenn der Schritt await_callback ausgeführt wird, erfolgt die Workflowausführung blockiert, bis der Endpunkt empfangen wird (oder eine Zeitüberschreitung auftritt). Nach dem Erhalt Die Workflowausführung wird fortgesetzt und der Callback wird verarbeitet.

Nachdem Sie den Schritt create_callback_endpoint ausgeführt und einen Callback erstellt haben Endpunkt, steht dem Workflow ein einziger Callback-Slot zur Verfügung. Wenn ein Rückruf -Anfrage empfangen, wird dieser Slot mit der Callback-Nutzlast gefüllt, bis der Callback verarbeitet werden kann. Wenn der Schritt await_callback ausgeführt wird, verarbeitet, und die Anzeigenfläche wird geleert und für eine andere Callback des Nutzers an. Du kannst dann den Callback-Endpunkt wiederverwenden und await_callback aufrufen noch einmal.

Wenn await_callback nur einmal aufgerufen wird, aber ein zweiter Callback eingeht, wird einer der folgenden Szenarien eintritt, und ein entsprechender HTTP-Statuscode ist zurückgegeben:

  • HTTP 429: Too Many Requests gibt an, dass der erste Callback empfangen wurde aber noch nicht verarbeitet wurde, noch verarbeitet werden muss. Die der zweite Callback vom Workflow abgelehnt wird.

  • HTTP 200: Success gibt an, dass der erste Callback empfangen wurde und eine Antwort wurde zurückgegeben. Der zweite Callback wird gespeichert und wird möglicherweise nie verarbeitet, es sei denn, await_callback wird ein zweites Mal aufgerufen. Wenn die bevor dieser eintritt, wird die zweite Rückrufanforderung nie verarbeitet und verworfen.

  • HTTP 404: Page Not Found gibt an, dass der Workflow nicht mehr ausgeführt wird. Entweder wurde der erste Callback verarbeitet und der Workflow ist abgeschlossen, oder der Workflow fehlgeschlagen. Dazu müssen Sie den Workflow abfragen Ausführungsstatus.

Parallele Callbacks

Wenn Schritte parallel ausgeführt werden und ein Callback von einem übergeordneten Element erstellt wird -Thread und wurde in untergeordneten Schritten gewartet, das gleiche Muster wie zuvor beschrieben. wird befolgt.

Wenn im folgenden Beispiel der Schritt create_callback_endpoint ausgeführt wird, wird eine Callback-Fläche erstellt. Bei jedem nachfolgenden Aufruf von await_callback wird ein Callback-Slot angezeigt. Zehn Callbacks können gleichzeitig ausgeführt werden, wenn alle Threads ausgeführt und wartet, bevor eine Rückrufanforderung gesendet wird. Zusätzliche Callbacks gemacht, aber gespeichert und nie verarbeitet werden.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Beachten Sie, dass Callbacks in der gleichen Reihenfolge verarbeitet werden wie jeder Aufruf von einem Zweig zu await_callback. Die Ausführungsreihenfolge der Zweige ist jedoch nicht deterministisch und kann über verschiedene Pfade zu einem Ergebnis gelangen. Weitere Informationen finden Sie unter Parallele Schritte:

Einfachen Callback-Workflow testen

Sie können einen grundlegenden Workflow erstellen und dann den Aufruf des Aufrufs der Callback-Endpunkt mithilfe von "curl" an. Sie müssen die erforderlichen Workflows Editor- oder Workflows Admin-Berechtigungen für das Projekt, in dem sich der Workflow befindet.

  1. <ph type="x-smartling-placeholder"></ph> Erstellen Sie den folgenden Workflow und stellen Sie ihn bereit. ausführen.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Nach der Ausführung des Workflows lautet der Status der Workflowausführung: ACTIVE, bis die Rückrufanfrage eingeht oder das Zeitlimit überschritten wurde an.

  2. Prüfen Sie den Ausführungsstatus und rufen Sie die Callback-URL ab:
    <ph type="x-smartling-placeholder"></ph> <ph type="x-smartling-placeholder">
    </ph>

    Console

    1. Wechseln Sie in der Google Cloud Console zur Seite Workflows:

      Zur Seite "Workflows"
    2. Klicken Sie auf den Namen des gerade ausgeführten Workflows.

      Der Status der Workflow-Ausführung wird angezeigt.

    3. Klicken Sie auf den Tab Logs.
    4. Suchen Sie nach einem Logeintrag wie dem Folgenden:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Kopieren Sie die Callback-URL, die im nächsten Befehl verwendet werden soll.

    gcloud

    1. Rufen Sie zuerst die Ausführungs-ID ab:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Ersetzen Sie DURATION durch einen geeigneten Betrag. die Anzahl der zurückgegebenen Logeinträge zu begrenzen (wenn Sie den Arbeitsablauf mehrmals ausgeführt werden.

      --freshness=t10m gibt beispielsweise Logeinträge zurück. die nicht älter als 10 Minuten sind. Weitere Informationen finden Sie unter gcloud topic datetimes.

      Die Ausführungs-ID wird zurückgegeben. Die Callback-URL wird ebenfalls im Feld textPayload zurückgegeben. Kopieren Sie beide Werte, um sie in den folgenden Schritten zu verwenden.

    2. Führen Sie den folgenden Befehl aus:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      
      Der Status der Workflowausführung wird zurückgegeben.
  3. Sie können den Callback-Endpunkt jetzt mit einem curl-Befehl aufrufen:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    Beachten Sie, dass Sie für einen POST-Endpunkt einen Content-Type-Darstellungsheader. Beispiel:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    Ersetzen Sie CALLBACK_URL durch die URL, die Sie im vorherigen Schritt kopiert haben.

  4. Über die Google Cloud Console oder die Google Cloud CLI Bestätigen Sie, dass der Status der Workflowausführung jetzt SUCCEEDED ist.
  5. Suchen Sie nach dem Logeintrag mit dem zurückgegebenen textPayload das in etwa so aussieht:
    Received {"body":null,"headers":...
    

Beispiele

Diese Beispiele veranschaulichen die Syntax.

Zeitüberschreitungsfehler abfangen

Dieses Beispiel erweitert das vorherige Beispiel, indem Zeitüberschreitungsfehler abgefangen und die Fehler in das Systemlog geschrieben werden.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Auf einen Callback in einer Wiederholungsschleife warten

In diesem Beispiel wird das vorherige Beispiel durch Implementieren eines Wiederholungsschritts geändert. Mit einem benutzerdefinierten Wiederholungsprädikat protokolliert der Workflow eine Warnung, wenn ein Zeitlimit auftritt, und wiederholt die Wartezeit auf dem Callback-Endpunkt bis zu fünfmal. Wenn das Wiederholungskontingent aufgebraucht ist, bevor der Callback empfangen wird, führt der letzte Zeitüberschreitungsfehler dazu, dass der Workflow fehlschlägt.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

Nächste Schritte