Attendi con i callback

I callback consentono alle esecuzioni del flusso di lavoro di attendere che un altro servizio effettui un una richiesta all'endpoint di callback; che la richiesta riprende l'esecuzione nel tuo flusso di lavoro.

Con i callback, puoi segnalare al tuo flusso di lavoro che un evento specifico è si è verificato e attendere l'evento senza eseguire il polling. Ad esempio, puoi creare un flusso di lavoro che ti avvisa quando un prodotto è di nuovo disponibile o quando l'articolo è stato spedito; o che attende per consentire l'interazione umana, come la revisione di un ordinare o convalidare una traduzione.

Questa pagina mostra come creare un flusso di lavoro che supporti un endpoint di callback, e attende l'arrivo di richieste HTTP da processi esterni endpoint. Puoi anche attendere eventi utilizzando callback e trigger Eventarc.

I callback richiedono l'utilizzo di due funzioni integrate della libreria standard:

Crea un endpoint che riceve una richiesta di callback

Crea un endpoint di callback che possa ricevere richieste HTTP di arrivo endpoint.

  1. Segui i passaggi per creare un nuovo flusso di lavoro o sceglierne uno esistente aggiorna ma non eseguirne ancora il deployment.
  2. Nella definizione del flusso di lavoro, aggiungi un passaggio per creare un endpoint di callback:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Sostituisci METHOD con il metodo HTTP previsto, uno di GET, HEAD, POST, PUT, DELETE, OPTIONS o PATCH. La Il valore predefinito è POST.

    Il risultato è una mappa, callback_details, con un Campo url in cui è archiviato l'URL dell'endpoint creato.

    L'endpoint di callback è ora pronto a ricevere le richieste in entrata con metodo HTTP specificato. L'URL dell'endpoint creato può essere utilizzato per attivare il callback da un processo esterno al flusso di lavoro; ad esempio passando l'URL a una Cloud Function.

  3. Nella definizione del flusso di lavoro, aggiungi un passaggio di attesa di una richiesta di callback:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Sostituisci TIMEOUT con il numero massimo di secondi in cui il flusso di lavoro deve attendere una richiesta. Il valore predefinito è 43200 (12 ore). Se trascorre un determinato periodo di tempo prima della ricezione di una richiesta, TimeoutError è stato sollevato.

    Tieni presente che esiste una durata massima di esecuzione. Per ulteriori informazioni, vedi il limite per le richieste.

    La mappa callback_details della precedente Viene passato create_callback passaggio come argomento.

  4. Esegui il deployment del flusso di lavoro per completarne la creazione o l'aggiornamento.

    Quando viene ricevuta una richiesta, tutti i dettagli della richiesta vengono archiviati in la mappa di callback_request. Puoi quindi accedere all'intera richiesta HTTP, inclusi intestazione, corpo e una mappa query per parametri di ricerca. Ad esempio:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Se il corpo del messaggio HTTP è testo o JSON, Workflows tenterà di decodificare il corpo; altrimenti vengono restituiti byte non elaborati.

Autorizza le richieste all'endpoint di callback

Per inviare una richiesta a un endpoint di callback, i servizi Google Cloud come Cloud Run e Cloud Functions, nonché servizi devono essere autorizzati a farlo disponendo dei autorizzazioni IAM (Identity and Access Management); in particolare workflows.callbacks.send (incluso nel ruolo Invoker di Workflows).

Fai una richiesta diretta

Il modo più semplice per creare credenziali di breve durata per un account di servizio è invia una richiesta diretta. In questo flusso sono coinvolte due identità: il chiamante e l'account di servizio per il quale è stata creata la credenziale. La chiamata al flusso di lavoro di base in questa pagina è un esempio di richiesta diretta. Per ulteriori informazioni, vedi Utilizzare IAM per controllare l'accesso e Autorizzazioni per le richieste dirette.

Generare un token di accesso OAuth 2.0

Per autorizzare un'applicazione a chiamare l'endpoint di callback, puoi generare un Token di accesso OAuth 2.0 per l'account di servizio associato al flusso di lavoro. Se disponi delle autorizzazioni necessarie (per Workflows Editor o Workflows Admin e Service Account Token Creator), puoi anche generare autonomamente un token con il metodo generateAccessToken.

Se la richiesta generateAccessToken ha esito positivo, il il corpo della risposta contiene un token di accesso OAuth 2.0 e una data di scadenza. (di per impostazione predefinita, i token di accesso OAuth 2.0 sono validi per un massimo di un'ora. Per esempio:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
Il codice accessToken può quindi essere utilizzato in una chiamata curl all'endpoint di callback URL come nei seguenti esempi:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Genera un token OAuth per una Cloud Function

Se stai richiamando un callback da una Cloud Function utilizzando lo stesso di account di servizio come flusso di lavoro e nello stesso progetto, puoi generare Token di accesso OAuth nella funzione stessa. Ad esempio:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Per saperne di più, guarda il tutorial Creazione di un flusso di lavoro human-in-the-loop utilizzando i callback.

Richiedi accesso offline

I token di accesso scadono periodicamente e diventano credenziali non valide per un richiesta API. Puoi aggiornare un token di accesso senza chiedere all'utente di se hai richiesto l'accesso offline agli ambiti associati di accesso. La richiesta dell'accesso offline è un requisito per qualsiasi applicazione che deve per accedere a un'API di Google in assenza dell'utente. Per ulteriori informazioni, vedi Aggiornare un token di accesso (accesso offline).

Richiama un flusso di lavoro esattamente una volta utilizzando i callback

I callback sono completamente idempotenti, il che significa che puoi riprova a eseguire un callback se l'operazione non riesce senza produrre risultati o effetti collaterali imprevisti.

Dopo aver creato un endpoint di callback, l'URL è pronto per ricevere in entrata e in genere viene restituito al chiamante prima della chiamata corrispondente await_callback completato. Tuttavia, se l'URL di callback deve essere ancora quando viene eseguito il passaggio await_callback, l'esecuzione del flusso di lavoro viene è bloccato finché l'endpoint non viene ricevuto (o si verifica un timeout). Una volta ricevuto, l'esecuzione del flusso di lavoro riprende e il callback viene elaborato.

Dopo aver eseguito il passaggio create_callback_endpoint e creato un callback è disponibile un singolo slot di callback per il flusso di lavoro. Quando viene richiamato , lo slot viene riempito con il payload di callback fino il callback. Quando viene eseguito il passaggio await_callback, il callback viene elaborato e lo spazio viene svuotato e reso disponibile per un altro di Google. Puoi quindi riutilizzare l'endpoint di callback e chiamare await_callback di nuovo.

Se await_callback viene chiamato soltanto una volta, ma viene richiamato un secondo momento, si verifica uno dei seguenti scenari e viene generato un codice di stato HTTP appropriato restituito:

  • HTTP 429: Too Many Requests indica che è stato ricevuto il primo callback correttamente, ma non sono stati elaborati; rimane in attesa di essere elaborata. La viene rifiutato dal flusso di lavoro.

  • HTTP 200: Success indica che è stato ricevuto il primo callback ed è stata restituita una risposta. Il secondo callback viene memorizzato potrebbe non essere mai elaborato a meno che await_callback non venga richiamato una seconda volta. Se il flusso di lavoro termina prima che ciò accada, la seconda richiesta di callback non viene mai vengono elaborati e vengono eliminati.

  • HTTP 404: Page Not Found indica che il flusso di lavoro non è più in esecuzione. È stato elaborato il primo callback e il flusso di lavoro è stato completato oppure flusso di lavoro non riuscito. Per determinarlo, devi eseguire una query sul flusso di lavoro stato di esecuzione.

Callback paralleli

Quando i passaggi vengono eseguiti in parallelo e viene creato un callback da un publisher principale thread e in attesa nei passaggi figlio, lo stesso pattern descritto in precedenza viene seguito.

Nell'esempio seguente, quando viene eseguito il passaggio create_callback_endpoint, viene creato uno spazio di callback. Ogni chiamata successiva a await_callback apre una nuovo spazio di callback. È possibile effettuare dieci callback contemporaneamente, se tutti i thread in esecuzione e in attesa prima che venga effettuata una richiesta di callback. Callback aggiuntivi ma che verrebbero archiviati e non verranno mai elaborati.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Tieni presente che i callback vengono elaborati nello stesso ordine in cui ogni chiamata viene effettuata da un ramo a await_callback. Tuttavia, l'ordine di esecuzione dei rami è non deterministici e possono arrivare a un risultato utilizzando vari percorsi. Per ulteriori informazioni le informazioni, vedi Passaggi paralleli.

Prova un flusso di lavoro di base per il callback

Puoi creare un flusso di lavoro di base e quindi testare la chiamata al relativo utilizzando curl. Devi disporre dei Workflows Editor o Autorizzazioni Workflows Admin per il progetto in cui si trova il flusso di lavoro.

  1. Crea ed esegui il deployment del flusso di lavoro seguente e poi eseguirla.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Dopo aver eseguito il flusso di lavoro, lo stato dell'esecuzione viene ACTIVE alla ricezione della richiesta di callback o al timeout trascorrano.

  2. Conferma lo stato di esecuzione e recupera l'URL di callback:
    .

    Console

    1. Nella console Google Cloud, vai alla Pagina Flussi di lavoro:

      Vai a Workflows .
    2. Fai clic sul nome del flusso di lavoro appena eseguito.

      Viene visualizzato lo stato dell'esecuzione del flusso di lavoro.

    3. Fai clic sulla scheda Log.
    4. Cerca una voce di log simile alla seguente:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Copia l'URL di callback da utilizzare nel comando successivo.

    gcloud

    1. Innanzitutto, recupera l'ID esecuzione:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Sostituisci DURATION con una quantità appropriata di tempo per limitare le voci di log restituite (se hai eseguito il flusso di lavoro più volte).

      Ad esempio, --freshness=t10m restituisce voci di log che non siano più vecchie di 10 minuti. Per maggiori dettagli, vedi gcloud topic datetimes.

      Viene restituito l'ID esecuzione. Tieni presente che anche l'URL di callback restituito nel campo textPayload. Copia entrambi i valori da utilizzare nei passaggi successivi.

    2. Esegui questo comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      
      Viene restituito lo stato dell'esecuzione del flusso di lavoro.
  3. Ora puoi chiamare l'endpoint di callback utilizzando un comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    Tieni presente che per un endpoint POST, devi utilizzare una Intestazione della rappresentazione Content-Type. Ad esempio:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    Sostituisci CALLBACK_URL con l'URL che hai copiato nella passaggio precedente.

  4. Tramite la console Google Cloud o Google Cloud CLI, conferma che lo stato dell'esecuzione del flusso di lavoro è ora SUCCEEDED.
  5. Cerca la voce di log con il valore textPayload restituito. è simile al seguente:
    Received {"body":null,"headers":...
    

Esempi

Questi esempi dimostrano la sintassi.

Rileva errori di timeout

Questo esempio si aggiunge a quello precedente rilevando eventuali errori di timeout e scrivendo gli errori nel log di sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Attendi un callback in un loop di nuovi tentativi

Questo esempio modifica l'esempio precedente implementando un nuovo passaggio. L'utilizzo di un predicato personalizzato per un nuovo tentativo, il flusso di lavoro registra un avviso quando si verifica un timeout e poi riprova ad attendere fino a cinque volte sull'endpoint di callback. Se la quota di nuovi tentativi è esaurita prima che venga ricevuto il callback, un errore di timeout causa la mancata riuscita del flusso di lavoro.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

Passaggi successivi