Attendi utilizzando i callback

I callback consentono alle esecuzioni del flusso di lavoro di attendere che un altro servizio effettui una richiesta all'endpoint di callback; questa richiesta riprende l'esecuzione del flusso di lavoro.

Con i callback, puoi segnalare al tuo flusso di lavoro che si è verificato un evento specificato e attendere su quell'evento senza polling. Ad esempio, puoi creare un flusso di lavoro che ti avvisa quando un prodotto è di nuovo disponibile o quando un articolo è stato spedito, oppure che attende di consentire l'interazione umana come la revisione di un ordine o la convalida di una traduzione.

Questa pagina mostra come creare un flusso di lavoro che supporta un endpoint di callback e che attende che le richieste HTTP da processi esterni arrivino a quell'endpoint. Puoi anche attendere gli eventi utilizzando callback e trigger Eventarc.

I callback richiedono l'utilizzo di due funzioni integrate nella libreria standard:

Creare un endpoint che riceve una richiesta di callback

Creare un endpoint di callback che può ricevere richieste HTTP per arrivare a quell'endpoint.

  1. Segui i passaggi per creare un nuovo flusso di lavoro o scegli un flusso di lavoro esistente da aggiornare ma non eseguirne ancora il deployment.
  2. Nella definizione del flusso di lavoro, aggiungi un passaggio per creare un endpoint di callback:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Sostituisci METHOD con il metodo HTTP previsto, uno di GET, HEAD, POST, PUT, DELETE, OPTIONS o PATCH. Il valore predefinito è POST.

    Il risultato è una mappa, callback_details, con un campo url che memorizza l'URL dell'endpoint creato.

    L'endpoint di callback è ora pronto a ricevere le richieste in entrata con il metodo HTTP specificato. L'URL dell'endpoint creato può essere utilizzato per attivare il callback da un processo esterno al flusso di lavoro, ad esempio passando l'URL a una Cloud Function.

  3. Nella definizione del flusso di lavoro, aggiungi un passaggio di attesa per una richiesta di callback:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Sostituisci TIMEOUT con il numero massimo di secondi che il flusso di lavoro deve attendere per una richiesta. Il valore predefinito è 43200 (12 ore). Se trascorre il tempo prima della ricezione di una richiesta, viene generato un valore TimeoutError.

    Tieni presente che esiste una durata massima di esecuzione. Per maggiori informazioni, consulta il limite di richieste.

    La mappa callback_details del passaggio create_callback precedente viene passata come argomento.

  4. Esegui il deployment del tuo flusso di lavoro per completarne la creazione o l'aggiornamento.

    Quando viene ricevuta una richiesta, tutti i dettagli della richiesta vengono archiviati nella mappa callback_request. Avrai quindi accesso all'intera richiesta HTTP, inclusi intestazione, corpo e una mappa query per qualsiasi parametri di ricerca. Ad esempio:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Se il corpo HTTP è testo o JSON, Workflows tenterà di decodificarlo, altrimenti vengono restituiti byte non elaborati.

Autorizza le richieste all'endpoint di callback

Per inviare una richiesta a un endpoint di callback, i servizi Google Cloud come Cloud Run e Cloud Functions, nonché i servizi di terze parti, devono essere autorizzati a farlo disporre delle autorizzazioni IAM (Identity and Access Management) appropriate, in particolare workflows.callbacks.send (incluso nel ruolo Invoker di flussi di lavoro).

Fai una richiesta diretta

Il modo più semplice per creare credenziali di breve durata per un account di servizio è effettuare una richiesta diretta. In questo flusso sono coinvolte due identità: il chiamante e l'account di servizio per cui viene creata la credenziale. La chiamata al flusso di lavoro di base in questa pagina è un esempio di richiesta diretta. Per maggiori informazioni, consulta Utilizzare IAM per controllare l'accesso e Autorizzazioni per le richieste dirette.

Generare un token di accesso OAuth 2.0

Per autorizzare un'applicazione a chiamare l'endpoint di callback, puoi generare un token di accesso OAuth 2.0 per l'account di servizio associato al flusso di lavoro. Supponendo che tu disponga delle autorizzazioni necessarie (per i ruoli Workflows Editor, Workflows Admin e Service Account Token Creator), puoi anche generare un token autonomamente eseguendo il metodo generateAccessToken.

Se la richiesta generateAccessToken ha esito positivo, il corpo della risposta restituita contiene un token di accesso OAuth 2.0 e una data di scadenza. Per impostazione predefinita, i token di accesso OAuth 2.0 sono validi per un massimo di un'ora. Ad esempio:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
Il codice accessToken può quindi essere utilizzato in una chiamata curl all'URL dell'endpoint di callback come nei seguenti esempi:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Generare un token OAuth per una Cloud Function

Se stai richiamando un callback da una Cloud Function utilizzando lo stesso account di servizio del flusso di lavoro e nello stesso progetto, puoi generare un token di accesso OAuth nella funzione stessa. Ad esempio:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Per maggiori informazioni, consulta il tutorial sulla creazione di un flusso di lavoro human-in-the-loop utilizzando i callback.

Richiedi accesso offline

I token di accesso scadono periodicamente e diventano credenziali non valide per una richiesta API correlata. Puoi aggiornare un token di accesso senza chiedere all'utente l'autorizzazione se hai richiesto l'accesso offline agli ambiti associati al token. La richiesta dell'accesso offline è un requisito per qualsiasi applicazione che deve accedere a un'API di Google quando l'utente non è presente. Per maggiori informazioni, consulta Aggiornare un token di accesso (accesso offline).

Prova un flusso di lavoro di base per le chiamate

Puoi creare un flusso di lavoro di base e quindi testare la chiamata all'endpoint di callback di quel flusso di lavoro utilizzando curl. Devi disporre delle autorizzazioni Workflows Editor o Workflows Admin necessarie per il progetto in cui si trova il flusso di lavoro.

  1. Crea ed esegui il deployment del flusso di lavoro seguente, quindi execute.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Dopo aver eseguito il flusso di lavoro, lo stato di esecuzione è ACTIVE fino alla ricezione della richiesta di callback o alla scadenza del timeout.

  2. Conferma lo stato di esecuzione e recupera l'URL di callback:

    Console

    1. Nella console Google Cloud, vai alla pagina Flussi di lavoro:

      Vai a Workflows
    2. Fai clic sul nome del flusso di lavoro che hai appena eseguito.

      Viene visualizzato lo stato dell'esecuzione del flusso di lavoro.

    3. Fai clic sulla scheda Log.
    4. Cerca una voce di log simile alla seguente:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Copia l'URL di callback per utilizzarlo nel comando successivo.

    gcloud

    1. Innanzitutto, recupera l'ID esecuzione:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Sostituisci DURATION con una quantità di tempo appropriata per limitare le voci di log restituite (se hai eseguito il flusso di lavoro più volte).

      Ad esempio, --freshness=t10m restituisce voci di log non antecedenti ai 10 minuti. Per maggiori dettagli, consulta gcloud topic datetimes.

      Viene restituito l'ID esecuzione. Tieni presente che l'URL di callback viene restituito anche nel campo textPayload. Copia entrambi i valori per utilizzarli nei passaggi seguenti.

    2. Esegui questo comando:
      
                    Viene restituito lo stato dell'esecuzione del flusso di lavoro.
                
  3. Ora puoi chiamare l'endpoint di callback utilizzando un comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    Tieni presente che per un endpoint POST, devi utilizzare un'intestazione di rappresentazione Content-Type. Ad esempio:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    Sostituisci CALLBACK_URL con l'URL che hai copiato nel passaggio precedente.

  4. Tramite la console Google Cloud o utilizzando Google Cloud CLI, verifica che lo stato dell'esecuzione del flusso di lavoro sia ora SUCCEEDED.
  5. Cerca la voce di log con il valore textPayload restituito simile alla seguente:
    Received {"body":null,"headers":...
    

Esempi

Questi esempi mostrano la sintassi.

Errori di timeout delle intercettazioni

Questo esempio si aggiunge all'esempio precedente rilevando eventuali errori di timeout e scrivendoli nel log di sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Attendi in un loop di nuovi tentativi

Questo esempio modifica il campione precedente implementando un passaggio per riprovare. Utilizzando un predicato personalizzato per i tentativi, il flusso di lavoro registra un avviso quando si verifica un timeout e poi riprova l'attesa sull'endpoint di callback, fino a cinque volte. Se la quota per i nuovi tentativi è esaurita prima che venga ricevuto il callback, l'errore di timeout finale determina l'esito negativo del flusso di lavoro.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

Passaggi successivi