Attendre à l'aide de rappels

Les rappels permettent aux exécutions de workflow d'attendre qu'un autre service envoie une requête au point de terminaison de rappel. Cette requête reprend l'exécution du workflow.

Grâce aux rappels, vous pouvez signaler à votre workflow qu'un événement spécifié s'est produit et attendre cet événement sans interroger. Par exemple, vous pouvez créer un workflow qui vous avertit lorsqu'un produit est de nouveau en stock ou lorsqu'un article a été expédié, ou qui attend une interaction humaine telle que la révision d'une commande ou la validation d'une traduction.

Cette page explique comment créer un workflow compatible avec un point de terminaison de rappel et qui attend que les requêtes HTTP provenant de processus externes arrivent à ce point de terminaison. Vous pouvez également attendre des événements à l'aide de rappels et de déclencheurs Eventarc.

Les rappels nécessitent l'utilisation de deux fonctions intégrées à la bibliothèque standard:

Créer un point de terminaison qui reçoit une requête de rappel

Créez un point de terminaison de rappel pouvant recevoir des requêtes HTTP pour y parvenir.

  1. Suivez les étapes pour créer un workflow, ou sélectionnez un workflow existant à mettre à jour sans le déployer.
  2. Dans la définition du workflow, ajoutez une étape permettant de créer un point de terminaison de rappel:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Remplacez METHOD par la méthode HTTP attendue, parmi les suivantes : GET, HEAD, POST, PUT, DELETE, OPTIONS ou PATCH. La valeur par défaut est POST.

    Le résultat est une carte, callback_details, avec un champ url qui stocke l'URL du point de terminaison créé.

    Le point de terminaison de rappel est maintenant prêt à recevoir les requêtes entrantes avec la méthode HTTP spécifiée. L'URL du point de terminaison créé peut être utilisée pour déclencher le rappel à partir d'un processus externe au workflow ; par exemple, en transmettant l'URL à une fonction Cloud.

  3. Dans la définition du workflow, ajoutez une étape pour attendre une requête de rappel:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Remplacez TIMEOUT par le nombre maximal de secondes pendant lesquelles le workflow doit attendre une requête. La valeur par défaut est de 43 200 (12 heures). Si le temps s'écoule avant la réception d'une requête, une exception TimeoutError est générée.

    Notez qu'il existe une durée d'exécution maximale. Pour en savoir plus, consultez la limite du nombre de requêtes.

    La carte callback_details de l'étape create_callback précédente est transmise en tant qu'argument.

  4. Déployez votre workflow pour terminer sa création ou sa mise à jour.

    Lorsqu'une requête est reçue, tous les détails de la requête sont stockés dans la carte callback_request. Vous avez ensuite accès à l'intégralité de la requête HTTP, y compris son en-tête, son corps et une query de mappage pour tous les paramètres de requête. Exemple :

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Si le corps HTTP est en texte ou en JSON, les workflows tentent de décoder le corps. Sinon, les octets bruts sont renvoyés.

Autoriser les requêtes sur le point de terminaison de rappel

Pour envoyer une requête à un point de terminaison de rappel, les services Google Cloud tels que Cloud Run et Cloud Functions, ainsi que les services tiers, doivent être autorisés à le faire avec les autorisations IAM appropriées, en particulier workflows.callbacks.send (inclus dans le rôle Demandeur de workflows).

Envoyer une requête directe

Le moyen le plus simple de créer des identifiants éphémères pour un compte de service consiste à effectuer une requête directe. Deux identités sont impliquées dans ce flux : l'auteur de l'appel et le compte de service pour lequel l'identifiant est créé. L'appel du workflow de base sur cette page est un exemple de requête directe. Pour en savoir plus, consultez Contrôler les accès à l'aide d'IAM et Autorisations liées aux requêtes directes.

Générer un jeton d'accès OAuth 2.0

Pour autoriser une application à appeler le point de terminaison de rappel, vous pouvez générer un jeton d'accès OAuth 2.0 pour le compte de service associé au workflow. Si vous disposez des autorisations requises (pour les rôles Workflows Editor, Workflows Admin et Service Account Token Creator), vous pouvez également générer un jeton vous-même en exécutant la commande. Méthode generateAccessToken.

Si la requête generateAccessToken aboutit, le corps de la réponse renvoyée contient un jeton d'accès OAuth 2.0 et une heure d'expiration. (Par défaut, les jetons d'accès OAuth 2.0 sont valides pendant une durée maximale d'une heure.) Exemple :

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
Le code accessToken peut ensuite être utilisé dans un appel curl à l'URL du point de terminaison de rappel, comme dans les exemples suivants:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Générer un jeton OAuth pour une fonction Cloud

Si vous appelez un rappel depuis une fonction Cloud Functions en utilisant le même compte de service que le workflow et dans le même projet, vous pouvez générer un jeton d'accès OAuth dans la fonction elle-même. Exemple :

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Pour davantage de contexte, reportez-vous au tutoriel Créer un workflow avec intervention humaine à l'aide de rappels.

Demander un accès hors connexion

Les jetons d'accès expirent régulièrement et deviennent des identifiants non valides pour une requête API associée. Vous pouvez actualiser un jeton d'accès sans demander l'autorisation à l'utilisateur si vous avez demandé un accès hors connexion aux champs d'application associés au jeton. Une requête d'accès hors connexion est requise pour toute application devant accéder à une API Google en l'absence d'utilisateur. Pour en savoir plus, consultez la section Actualiser un jeton d'accès (accès hors connexion).

Tester un workflow de rappel de base

Vous pouvez créer un workflow de base, puis tester l'appel du point de terminaison de rappel de ce workflow à l'aide de curl. Vous devez disposer des autorisations Workflows Editor ou Workflows Admin nécessaires pour le projet dans lequel réside le workflow.

  1. Créez et déployez le workflow suivant, puis execute.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Une fois le workflow exécuté, son état d'exécution est ACTIVE jusqu'à ce que la demande de rappel soit reçue ou que le délai d'inactivité soit écoulé.

  2. Confirmez l'état d'exécution et récupérez l'URL de rappel:

    Console

    1. Dans la console Google Cloud, accédez à la page Workflows:

      Accéder à Workflows
    2. Cliquez sur le nom du workflow que vous venez d'exécuter.

      L'état d'exécution du workflow s'affiche.

    3. Cliquez sur l'onglet Journaux.
    4. Recherchez une entrée de journal semblable à ceci :

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Copiez l'URL de rappel à utiliser dans la commande suivante.

    gcloud

    1. Commencez par récupérer l'ID d'exécution:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Remplacez DURATION par la durée appropriée pour limiter le nombre d'entrées de journal renvoyées (si vous avez exécuté le workflow plusieurs fois).

      Par exemple, --freshness=t10m renvoie les entrées de journal qui ne datent pas de plus de 10 minutes. Pour en savoir plus, consultez gcloud topic datetimes.

      L'ID d'exécution est renvoyé. Notez que l'URL de rappel est également renvoyée dans le champ textPayload. Copiez les deux valeurs à utiliser dans les étapes suivantes.

    2. Exécutez la commande ci-dessous.
      
                    L'état d'exécution du workflow est renvoyé.
                
  3. Vous pouvez maintenant appeler le point de terminaison de rappel à l'aide d'une commande curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    Notez que pour un point de terminaison POST, vous devez utiliser un en-tête de représentation Content-Type. Exemple :

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    Remplacez CALLBACK_URL par l'URL que vous avez copiée à l'étape précédente.

  4. Dans la console Google Cloud ou à l'aide de la Google Cloud CLI, vérifiez que l'état d'exécution du workflow est maintenant SUCCEEDED.
  5. Recherchez l'entrée de journal avec le textPayload renvoyé qui ressemble à ceci:
    Received {"body":null,"headers":...
    

Exemples

Ces exemples illustrent la syntaxe.

Repérer les erreurs de délai d'attente

Cet exemple s'ajoute à l'exemple précédent en capturant les erreurs d'expiration de délai et en les écrivant dans le journal système.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Attendre un rappel dans une boucle de nouvelle tentative

Cet exemple modifie l'exemple précédent en mettant en œuvre une étape de nouvelle tentative. À l'aide d'un prédicat de nouvelle tentative personnalisé, le workflow enregistre un avertissement lorsqu'un délai d'expiration se produit, puis effectue une nouvelle tentative d'attente sur le point de terminaison de rappel, jusqu'à cinq fois. Si le quota de nouvelles tentatives est épuisé avant la réception du rappel, l'erreur de délai avant expiration finale entraîne l'échec du workflow.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

Étapes suivantes