Les rappels permettent aux exécutions de workflow d'attendre qu'un autre service envoie une requête au point de terminaison de rappel. Cette requête reprend l'exécution du workflow.
Grâce aux rappels, vous pouvez signaler à votre workflow qu'un événement spécifié s'est produit et attendre cet événement sans interroger. Par exemple, vous pouvez créer un flux de travail qui vous avertit lorsqu'un produit est de nouveau en stock ou qu'un article a été expédié ; ou qui attend qu'une interaction humaine soit autorisée, comme l'examen d'une ou la validation d'une traduction.
Cette page explique comment créer un workflow compatible avec un point de terminaison de rappel et qui attend que les requêtes HTTP provenant de processus externes arrivent à ce point de terminaison. Vous pouvez également Attendre les événements à l'aide de rappels et de déclencheurs Eventarc
Les rappels nécessitent l'utilisation de deux fonctions intégrées à la bibliothèque standard:
events.create_callback_endpoint
: crée un point de terminaison de rappel qui attend la méthode HTTP spécifiée.events.await_callback
: attend la réception d'un rappel au point de terminaison donné.
Créer un point de terminaison qui reçoit une requête de rappel
Créez un point de terminaison de rappel pouvant recevoir des requêtes HTTP pour y parvenir.
- Suivez les étapes de création. un nouveau flux de travail, ou choisir un flux de travail existant pour mise à jour mais ne la déployez pas encore.
- Dans la définition du workflow, ajoutez une étape permettant de créer un point de terminaison de rappel:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
Remplacez
METHOD
par la méthode HTTP attendue, parmi les suivantes :GET
,HEAD
,POST
,PUT
,DELETE
,OPTIONS
ouPATCH
. La valeur par défaut estPOST
.Le résultat est une carte,
callback_details
, avec un champurl
qui stocke l'URL du point de terminaison créé.Le point de terminaison de rappel est maintenant prêt à recevoir les requêtes entrantes avec la méthode HTTP spécifiée. L'URL du point de terminaison créé peut être utilisée pour déclencher le rappel à partir d'un processus externe au workflow ; par exemple, en transmettant l'URL à une fonction Cloud.
- Dans la définition du workflow, ajoutez une étape pour attendre une requête de rappel:
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
Remplacez
TIMEOUT
par le nombre maximal de secondes pendant lesquelles le workflow doit attendre une requête. La valeur par défaut est de 43 200 (12 heures). Si le temps s'écoule avant la réception d'une requête, une exceptionTimeoutError
est générée.Notez qu'il existe une durée d'exécution maximale. Pour en savoir plus, consultez la limite du nombre de requêtes.
La carte
callback_details
de l'étapecreate_callback
précédente est transmise en tant qu'argument. - Déployez votre workflow pour terminer sa création ou sa mise à jour.
Lorsqu'une requête est reçue, tous les détails de la requête sont stockés dans la carte
callback_request
. Vous avez ensuite accès à l'intégralité de la requête HTTP, y compris son en-tête, son corps et unequery
de mappage pour tous les paramètres de requête. Exemple :YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
Si le corps HTTP est en texte ou en JSON, les workflows tentent de décoder le corps. Sinon, les octets bruts sont renvoyés.
Autoriser les requêtes sur le point de terminaison de rappel
Pour envoyer une requête à un point de terminaison de rappel, les services Google Cloud tels que
Cloud Run et Cloud Functions, ainsi que
Google Cloud, doit être autorisé à le faire par le biais de
Autorisations IAM (Identity and Access Management) en particulier workflows.callbacks.send
(inclus dans le rôle Demandeur de workflows).
Envoyer une requête directe
Le moyen le plus simple de créer des identifiants éphémères pour un compte de service consiste à effectuer une requête directe. Deux identités sont impliquées dans ce flux : l'auteur de l'appel et le compte de service pour lequel l'identifiant est créé. L'appel du workflow de base sur cette page est un exemple de requête directe. Pour en savoir plus, consultez Utiliser IAM pour contrôler les accès et Autorisations de requête directe.
Générer un jeton d'accès OAuth 2.0
Pour autoriser une application à appeler le point de terminaison de rappel, vous pouvez générer un jeton d'accès OAuth 2.0 pour le compte de service associé au workflow.
Si vous disposez des autorisations requises (pour les rôles Workflows Editor
, Workflows Admin
et Service Account Token Creator
), vous pouvez également générer un jeton vous-même en exécutant la commande. Méthode generateAccessToken
.
Si la requête generateAccessToken
aboutit, le corps de la réponse renvoyée contient un jeton d'accès OAuth 2.0 et une heure d'expiration. (Par défaut, les jetons d'accès OAuth 2.0 sont valides pendant une durée maximale d'une heure.) Exemple :
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
peut ensuite être utilisé dans un appel curl au point de terminaison de rappel
URL comme dans les exemples suivants:
curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
Générer un jeton OAuth pour une fonction Cloud
Si vous appelez un rappel depuis une fonction Cloud Functions en utilisant le même compte de service que le workflow et dans le même projet, vous pouvez générer un jeton d'accès OAuth dans la fonction elle-même. Exemple :
Pour davantage de contexte, reportez-vous au tutoriel Créer un workflow avec intervention humaine à l'aide de rappels.
Demander un accès hors connexion
Les jetons d'accès expirent régulièrement et deviennent des identifiants non valides pour une requête API associée. Vous pouvez actualiser un jeton d'accès sans demander l'autorisation à l'utilisateur si vous avez demandé un accès hors connexion aux champs d'application associés au jeton. Une requête d'accès hors connexion est requise pour toute application devant accéder à une API Google en l'absence d'utilisateur. Pour en savoir plus, consultez la section Actualiser un jeton d'accès (accès hors connexion).
Appeler un workflow une seule fois à l'aide de rappels
Les rappels sont entièrement idempotents, ce qui signifie que vous pouvez réessayer un rappel en cas d'échec sans produire de résultats ou d'effets secondaires inattendus.
Une fois que vous avez créé un point de terminaison de rappel, l'URL est prête à recevoir des
et est généralement renvoyée à un appelant avant l'appel correspondant
await_callback
est créé. Toutefois, si l'URL de rappel n'a pas encore été
reçu lors de l'exécution de l'étape await_callback
, l'exécution du workflow est
bloqués jusqu'à la réception du point de terminaison (ou la fin du délai d'inactivité). Une fois le fichier reçu,
l'exécution du workflow reprend et le rappel est traité.
Après avoir exécuté l'étape create_callback_endpoint
et créé un rappel
un seul emplacement de rappel est disponible pour le workflow. Lorsqu'un rappel
de rappel, ce créneau est rempli avec la charge utile de rappel jusqu'à ce que le
peut être traité. Lorsque l'étape await_callback
est exécutée, le
est traité, et l'emplacement est vidé et mis à la disposition d'un autre
. Vous pouvez ensuite réutiliser le point de terminaison de rappel et appeler await_callback
à nouveau.
Si await_callback
n'est appelé qu'une seule fois, mais qu'un deuxième rappel est reçu, un
des scénarios suivants se produisent et un code d'état HTTP approprié est
renvoyé:
HTTP
429: Too Many Requests
indique que le premier rappel a été reçu avec succès, mais n'a pas été traité ; il reste en attente de traitement. La le deuxième rappel est rejeté par le workflow.HTTP
200: Success
indique que le premier rappel a été reçu avec succès et une réponse a été renvoyée. Le deuxième rappel est stocké et risque de ne jamais être traité, sauf siawait_callback
est appelé une deuxième fois. Si le se termine avant que cela ne se produise, la deuxième demande de rappel n'est jamais traitées et supprimées.Le code HTTP
404: Page Not Found
indique que le workflow n'est plus en cours d'exécution. Soit le premier rappel a été traité et le workflow est terminé, soit le workflow a échoué. Pour le déterminer, vous devez interroger le workflow de l'état d'exécution.
Rappels parallèles
Lorsque les étapes sont exécutées en parallèle et qu'un rappel est créé par un parent et attendu dans les étapes enfants, le même schéma que celui décrit précédemment est suivie.
Dans l'exemple suivant, lorsque l'étape create_callback_endpoint
est exécutée,
un emplacement de rappel est créé. Chaque appel suivant à await_callback
ouvre un
un nouvel emplacement de rappel. Dix rappels peuvent être effectués simultanément, si tous les threads sont
en cours d'exécution et en attente avant
qu'une demande de rappel ne soit effectuée. Rappels supplémentaires
pourraient être effectuées, mais elles seraient
stockées et jamais traitées.
YAML
- createCallbackInParent: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - parallelStep: parallel: for: range: [1, 10] value: loopValue steps: - waitForCallbackInChild: call: events.await_callback args: callback: ${callback_details}
JSON
[ { "createCallbackInParent": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "POST" }, "result": "callback_details" } }, { "parallelStep": { "parallel": { "for": { "range": [ 1, 10 ], "value": "loopValue", "steps": [ { "waitForCallbackInChild": { "call": "events.await_callback", "args": { "callback": "${callback_details}" } } } ] } } } } ]
Notez que les rappels sont traités dans le même ordre que celui dans lequel chaque appel est effectué
branche vers await_callback
. Cependant, l'ordre d'exécution des branches est
non déterministe et peut parvenir à un
résultat en utilisant différentes voies. Pour plus
pour en savoir plus, consultez
Étapes parallèles
Tester un workflow de rappel de base
Vous pouvez créer un workflow de base, puis tester l'appel du point de terminaison de rappel de ce workflow à l'aide de curl. Vous devez disposer des autorisations Workflows Editor
ou Workflows Admin
nécessaires pour le projet dans lequel réside le workflow.
- Créez et déployez le workflow suivant, puis exécutez-le.
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
Après l'exécution du workflow, l'état de son exécution est le suivant :
ACTIVE
jusqu'à la réception de la demande de rappel ou jusqu'à la fin du délai d'inactivité s'écoule. - Confirmez l'état d'exécution et récupérez l'URL de rappel:
Console
-
Dans la console Google Cloud, accédez à Page Workflows:
Accéder à "Workflows" -
Cliquez sur le nom du workflow que vous venez d'exécuter.
L'état d'exécution du workflow s'affiche.
- Cliquez sur l'onglet Logs (Journaux).
Recherchez une entrée de journal semblable à ceci :
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- Copiez l'URL de rappel à utiliser dans la commande suivante.
gcloud
- Commencez par récupérer l'ID d'exécution:
gcloud logging read "Listening for callbacks" --freshness=DURATION
RemplacezDURATION
par un montant approprié. pour limiter le nombre d'entrées de journal renvoyées (si vous avez exécuté la le workflow de ML à plusieurs reprises).Par exemple,
--freshness=t10m
renvoie les entrées de journal qui ne datent pas de plus de 10 minutes. Pour en savoir plus, consultezgcloud topic datetimes
.L'ID d'exécution est renvoyé. Notez que l'URL de rappel est également renvoyée dans le champ
textPayload
. Copiez les deux valeurs à utiliser dans les étapes suivantes. - Exécutez la commande suivante:
gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
L'état de l'exécution du workflow est renvoyé.
-
- Vous pouvez maintenant appeler le point de terminaison du rappel à l'aide d'une commande curl:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
Notez que pour un point de terminaison
POST
, vous devez utiliser un En-tête de représentationContent-Type
. Exemple :curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
Remplacez
CALLBACK_URL
par l'URL que vous avez copiée à l'étape précédente. - Via la console Google Cloud ou la Google Cloud CLI,
Vérifiez que l'état d'exécution du workflow est maintenant
SUCCEEDED
. - Recherchez l'entrée de journal avec le
textPayload
renvoyé. qui se présente comme suit:Received {"body":null,"headers":...
Exemples
Ces exemples illustrent la syntaxe.
Repérer les erreurs de délai d'attente
Cet exemple s'ajoute à l'exemple précédent en capturant les erreurs d'expiration de délai et en les écrivant dans le journal système.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
Attendre un rappel dans une boucle de nouvelle tentative
Cet exemple modifie l'exemple précédent en mettant en œuvre une étape de nouvelle tentative. À l'aide d'un prédicat de nouvelle tentative personnalisé, le workflow enregistre un avertissement lorsqu'un délai d'expiration se produit, puis effectue une nouvelle tentative d'attente sur le point de terminaison de rappel, jusqu'à cinq fois. Si le quota de nouvelles tentatives est épuisé avant la réception du rappel, l'erreur de délai avant expiration finale entraîne l'échec du workflow.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }