Attendere utilizzando i callback

I callback consentono alle esecuzioni del flusso di lavoro di attendere che un altro servizio invii una richiesta all'endpoint di callback; questa richiesta riprende l'esecuzione del flusso di lavoro.

Con i callback, puoi segnalare al tuo flusso di lavoro che si è verificato un evento specifico e attendere l'evento senza eseguire il polling. Ad esempio, puoi creare un flusso di lavoro che ti avvisa quando un prodotto torna disponibile o quando un articolo è stato spedito oppure che attende l'interazione umana, ad esempio la revisione di un ordine o la convalida di una traduzione.

Questa pagina mostra come creare un flusso di lavoro che supporti un endpoint di callback e che attenda l'arrivo di richieste HTTP da processi esterni a questo endpoint. Puoi anche attendere gli eventi utilizzando i callback e i trigger Eventarc.

I callback richiedono l'utilizzo di due funzioni integrate della libreria standard:

Crea un endpoint che riceve una richiesta di callback

Crea un endpoint di callback che possa ricevere richieste HTTP per raggiungere l'endpoint.

  1. Segui i passaggi per creare un nuovo flusso di lavoro o scegline uno esistente da aggiornare ma non eseguirne ancora il deployment.
  2. Nella definizione del flusso di lavoro, aggiungi un passaggio per creare un endpoint di callback:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Sostituisci METHOD con il metodo HTTP previsto, uno tra GET, HEAD, POST, PUT, DELETE, OPTIONS o PATCH. Il valore predefinito è POST.

    Il risultato è una mappa, callback_details, con un campo url che memorizza l'URL dell'endpoint creato.

    L'endpoint di callback è ora pronto a ricevere richieste in entrata con il metodo HTTP specificato. L'URL dell'endpoint creato può essere utilizzato per attivare il callback da un processo esterno al flusso di lavoro, ad esempio passando l'URL a una funzione Cloud Run.

  3. Nella definizione del flusso di lavoro, aggiungi un passaggio per attendere una richiesta di richiamata:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Sostituisci TIMEOUT con il numero massimo di secondi per cui il flusso di lavoro deve attendere una richiesta. Il valore predefinito è 43200 (12 ore). Se il tempo trascorre prima della ricezione di una richiesta, viene generato un TimeoutError.

    Tieni presente che esiste una durata massima di esecuzione. Per ulteriori informazioni, consulta il limite di richieste.

    La mappa callback_details del passaggio create_callback precedente viene passata come argomento.

  4. Esegui il deployment del workflow per completare la creazione o l'aggiornamento.

    Quando viene ricevuta una richiesta, tutti i dettagli vengono memorizzati nella mappa callback_request. A questo punto hai accesso all'intera richiesta HTTP, inclusi intestazione, corpo e una mappa query per qualsiasparametri di ricercary. Ad esempio:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Se il corpo HTTP è testo o JSON, Workflows tenterà di decodificarlo; in caso contrario, vengono restituiti byte non elaborati.

Autorizza le richieste all'endpoint di callback

Per inviare una richiesta a un endpoint di callback,i servizi Google Cloud come Cloud Run e Cloud Run Functions, nonché i servizi di terze parti, devono essere autorizzati a farlo disponendo delle autorizzazioni Identity and Access Management (IAM) appropriate; in particolare, workflows.callbacks.send(incluse nel ruolo Invoker di Workflows).

Inviare una richiesta diretta

Il modo più semplice per creare credenziali di breve durata per un account di servizio è inviare una richiesta diretta. In questo flusso sono coinvolte due identità: il chiamante e ilaccount di serviziot per cui viene creata la credenziale. La chiamata al flusso di lavoro di base in questa pagina è un esempio di richiesta diretta. Per ulteriori informazioni, consulta Utilizzare IAM per controllare l'accesso e Autorizzazioni per le richieste dirette.

Generare un token di accesso OAuth 2.0

Per autorizzare un'applicazione a chiamare l'endpoint di callback, puoi generare un token di accesso OAuth 2.0 per il account di servizio associato al flusso di lavoro. Supponendo che tu disponga delle autorizzazioni richieste (per i ruoli Workflows Editor o Workflows Admin e Service Account Token Creator), puoi anche generare autonomamente un token eseguendo il metodo generateAccessToken.

Se la richiesta generateAccessToken ha esito positivo, il corpo della risposta restituito contiene un token di accesso OAuth 2.0 e un tempo di scadenza. (Per impostazione predefinita, i token di accesso OAuth 2.0 sono validi per un massimo di un'ora.) Ad esempio:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
Il codice accessToken può essere utilizzato in una chiamata curl all'URL dell'endpoint di callback come negli esempi seguenti:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Genera un token OAuth per una funzione Cloud Run

Se richiami un callback da una funzione Cloud Run utilizzando lo stesso account di servizio del flusso di lavoro e nello stesso progetto, puoi generare un token di accesso OAuth nella funzione stessa. Ad esempio:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Per maggiori informazioni, consulta il tutorial su Creazione di un flusso di lavoro human-in-the-loop utilizzando i callback.

Richiedere l'accesso offline

I token di accesso scadono periodicamente e diventano credenziali non valide per una richiesta API correlata. Puoi aggiornare un token di accesso senza chiedere all'utente l'autorizzazione se hai richiesto l'accesso offline agli ambiti associati al token. La richiesta di accesso offline è un requisito per qualsiasi applicazione che deve accedere a un'API Google quando l'utente non è presente. Per maggiori informazioni, vedi Aggiornare un token di accesso (accesso offline).

Richiamare un flusso di lavoro esattamente una volta utilizzando i callback

I callback sono completamente idempotenti, il che significa che puoi riprovare un callback se non va a buon fine senza produrre risultati imprevisti o effetti collaterali.

Dopo aver creato un endpoint di callback, l'URL è pronto a ricevere richieste in entrata e in genere viene restituito a un chiamante prima che venga effettuata la chiamata corrispondente a await_callback. Tuttavia, se l'URL di callback non è ancora stato ricevuto quando viene eseguito il passaggio await_callback, l'esecuzione del flusso di lavoro viene bloccata finché non viene ricevuto l'endpoint (o si verifica un timeout). Una volta ricevuto, l'esecuzione del flusso di lavoro riprende e il callback viene elaborato.

Dopo aver eseguito il passaggio create_callback_endpoint e creato un endpoint di callback, è disponibile un singolo slot di callback per il flusso di lavoro. Quando viene ricevuta una richiesta di callback, questo slot viene riempito con il payload di callback finché il callback non può essere elaborato. Quando viene eseguito il passaggio await_callback, la callback viene elaborata e lo slot viene svuotato e reso disponibile per un'altra callback. Puoi quindi riutilizzare l'endpoint di callback e chiamare di nuovo await_callback.

Se await_callback viene chiamato una sola volta, ma viene ricevuta una seconda callback, si verifica uno dei seguenti scenari e viene restituito un codice di stato HTTP appropriato:

  • HTTP 429: Too Many Requests indica che il primo callback è stato ricevuto correttamente, ma non è stato elaborato; è ancora in attesa di elaborazione. Il secondo callback viene rifiutato dal workflow.

  • HTTP 200: Success indica che il primo callback è stato ricevuto correttamente ed è stata restituita una risposta. Il secondo callback viene memorizzato e potrebbe non essere mai elaborato a meno che await_callback non venga chiamato una seconda volta. Se il flusso di lavoro termina prima che ciò avvenga, la seconda richiesta di richiamata non viene mai elaborata e viene eliminata.

  • HTTP 404: Page Not Found indica che il flusso di lavoro non è più in esecuzione. Il primo callback è stato elaborato e il flusso di lavoro è stato completato oppure il flusso di lavoro non è riuscito. Per determinarlo, devi eseguire una query sullo stato di esecuzione del flusso di lavoro.

Callback paralleli

Quando i passaggi vengono eseguiti in parallelo e una callback viene creata da un thread principale e attesa nei passaggi secondari, viene seguito lo stesso pattern descritto in precedenza.

Nell'esempio seguente, quando viene eseguito il passaggio create_callback_endpoint, viene creato uno slot di callback. Ogni chiamata successiva a await_callback apre un nuovo slot di richiamata. È possibile effettuare dieci callback contemporaneamente, se tutti i thread sono in esecuzione e in attesa prima di effettuare una richiesta di callback. Potrebbero essere effettuate ulteriori richiamate, ma verrebbero memorizzate e mai elaborate.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Tieni presente che i callback vengono elaborati nello stesso ordine in cui ogni chiamata viene effettuata da una filiale a await_callback. Tuttavia, l'ordine di esecuzione dei rami è non deterministico e può arrivare a un risultato utilizzando vari percorsi. Per ulteriori informazioni, consulta la sezione Passaggi paralleli.

Prova un flusso di lavoro di richiamata di base

Puoi creare un flusso di lavoro di base e poi testare la chiamata all'endpoint di callback di questo flusso di lavoro utilizzando curl. Devi disporre delle autorizzazioni Workflows Editor o Workflows Admin necessarie per il progetto in cui si trova il flusso di lavoro.

  1. Crea ed esegui il deployment del seguente flusso di lavoro, quindi eseguilo.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Dopo l'esecuzione del flusso di lavoro, lo stato dell'esecuzione del flusso di lavoro è ACTIVE fino alla ricezione della richiesta di callback o allo scadere del timeout.

  2. Conferma lo stato di esecuzione e recupera l'URL di callback:

    Console

    1. Nella console Google Cloud , vai alla pagina Workflows:

      Vai a Workflows
    2. Fai clic sul nome del flusso di lavoro che hai appena eseguito.

      Viene visualizzato lo stato dell'esecuzione del flusso di lavoro.

    3. Fai clic sulla scheda Log.
    4. Cerca una voce di log simile alla seguente:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. Copia l'URL di callback da utilizzare nel comando successivo.

    gcloud

    1. Innanzitutto, recupera l'ID esecuzione:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      Sostituisci DURATION con un periodo di tempo appropriato per limitare le voci di log restituite (se hai eseguito il flusso di lavoro più volte).

      Ad esempio, --freshness=t10m restituisce voci di log non più vecchie di 10 minuti. Per maggiori dettagli, vedi gcloud topic datetimes.

      Viene restituito l'ID esecuzione. Tieni presente che l'URL di callback viene restituito anche nel campo textPayload. Copia entrambi i valori da utilizzare nei passaggi successivi.

    2. Esegui questo comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      Viene restituito lo stato dell'esecuzione del flusso di lavoro.
  3. Ora puoi chiamare l'endpoint di callback utilizzando un comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    Tieni presente che per un endpoint POST devi utilizzare un'intestazione di rappresentazione Content-Type. Ad esempio:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    Sostituisci CALLBACK_URL con l'URL che hai copiato nel passaggio precedente.

  4. Tramite la console Google Cloud o utilizzando Google Cloud CLI, verifica che lo stato dell'esecuzione del flusso di lavoro sia ora SUCCEEDED.
  5. Cerca la voce di log con il textPayload restituito simile al seguente:
    Received {"body":null,"headers":...

Esempi

Questi esempi mostrano la sintassi.

Rilevare gli errori di timeout

Questo esempio si aggiunge a quello precedente rilevando eventuali errori di timeout e scrivendo gli errori nel log di sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Attendi un callback in un ciclo di nuovi tentativi

Questo esempio modifica l'esempio precedente implementando un passaggio di ripetizione. Utilizzando un predicato di nuovi tentativi personalizzato, il flusso di lavoro registra un avviso quando si verifica un timeout e poi riprova l'attesa sull'endpoint di callback fino a cinque volte. Se la quota di nuovi tentativi viene esaurita prima della ricezione del callback, l'errore di timeout finale causa l'interruzione del workflow.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

Passaggi successivi