Las devoluciones de llamada permiten que las ejecuciones de flujos de trabajo esperen a que otro servicio realice una solicitud al extremo de devolución de llamada. Esa solicitud reanuda la ejecución del flujo de trabajo.
Con las devoluciones de llamada, puedes indicarle a tu flujo de trabajo que se produjo un evento especificado y esperar a ese evento sin sondear. Por ejemplo, puedes crear un flujo de trabajo que te notifique cuando un producto vuelva a estar en stock o cuando se haya enviado un artículo, o que espere para permitir la interacción humana, como revisar un pedido o validar una traducción.
En esta página, se muestra cómo crear un flujo de trabajo que admita un extremo de devolución de llamada y que espere a que lleguen solicitudes HTTP de procesos externos a ese extremo. También puedes Esperar eventos mediante devoluciones de llamada y activadores de Eventarc
Las devoluciones de llamada requieren el uso de dos funciones estándar integradas de la biblioteca:
events.create_callback_endpoint
: Crea un extremo de devolución de llamada que espera el método HTTP especificadoevents.await_callback
: Esperas para que se reciba una devolución de llamada en el extremo determinado
Crea un extremo que reciba una solicitud de devolución de llamada
Crear un extremo de devolución de llamada que pueda recibir solicitudes HTTP para llegar a ese extremo.
- Sigue los pasos para crear el proyecto. un flujo de trabajo nuevo o elegir uno existente para actualizar pero aún no lo implementas.
- En la definición del flujo de trabajo, agrega un paso para crear un extremo de devolución de llamada:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
Reemplaza
METHOD
por el método HTTP esperado, uno deGET
,HEAD
,POST
,PUT
,DELETE
,OPTIONS
oPATCH
. El valor predeterminado esPOST
.El resultado es un mapa,
callback_details
, con un campourl
que almacena la URL del extremo creado.El extremo de devolución de llamada ahora está listo para recibir solicitudes entrantes con el método HTTP especificado. Se puede usar la URL del extremo creado para activar la devolución de llamada desde un proceso externo al flujo de trabajo; por ejemplo, al pasar la URL a una función de Cloud Run.
- En la definición del flujo de trabajo, agrega el siguiente paso para esperar una solicitud de devolución de llamada:
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
Reemplaza
TIMEOUT
por la cantidad máxima de segundos que el flujo de trabajo debe esperar por una solicitud. El valor predeterminado es 43200 (12 horas). Si transcurre el tiempo antes de que se reciba una solicitud, se genera unTimeoutError
.Ten en cuenta que hay una duración máxima de ejecución. Para obtener más información, consulta el límite de solicitudes.
El mapa
callback_details
del pasocreate_callback
anterior se pasa como argumento. - Implementa tu flujo de trabajo para terminar de crearlo o actualizarlo.
Cuando se recibe una solicitud, todos los detalles de la solicitud se almacenan en el mapa de
callback_request
. Luego, tienes acceso a toda HTTP(S), incluidos su encabezado, cuerpo y un mapa dequery
para cualquier parámetro de consulta. Por ejemplo:YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
Si el cuerpo HTTP es texto o JSON, Workflows intentará decodificarlo. De lo contrario, se mostrarán bytes sin procesar.
Autoriza solicitudes al extremo de devolución de llamada
Para enviar una solicitud a un extremo de devolución de llamada, los servicios de Google Cloud como
Cloud Run y Cloud Run, y apps de terceros
servicios, debe estar autorizada para ello y contar con la información
permisos de Identity and Access Management (IAM) específicamente, workflows.callbacks.send
(se incluye en el rol de invocador de flujos de trabajo).
Cómo realizar una solicitud directa
La forma más sencilla de crear credenciales de corta duración para una cuenta de servicio es realizar una solicitud directa. Hay dos identidades involucradas en este flujo: el emisor, y la cuenta de servicio para la que se creó la credencial. La llamada a la el flujo de trabajo básico de esta página es un ejemplo de una solicitud directa. Para obtener más información, consulta Usa IAM para controlar el acceso y Permisos de solicitud directa.
Genera un token de acceso de OAuth 2.0
Para autorizar que una aplicación llame al extremo de devolución de llamada, puedes generar un
token de acceso de OAuth 2.0 para la cuenta de servicio asociada con el flujo de trabajo.
Si tienes los permisos necesarios (para Workflows Editor
o
Workflows Admin
y Service Account Token Creator
), también puedes
generar un token tú mismo
ejecutar el método generateAccessToken
Si la solicitud generateAccessToken
tiene éxito, el cuerpo de la respuesta que se muestra contendrá un token de acceso de OAuth 2.0 y un tiempo de caducidad. (De forma predeterminada, los tokens de acceso de OAuth 2.0 son válidos durante un máximo de 1 hora). Por ejemplo:
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
se puede usar en una llamada curl al extremo de devolución de llamada
como en los siguientes ejemplos:
curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
Genera un token de OAuth para una función de Cloud Run
Si invocas una devolución de llamada desde una función de Cloud Run con el mismo cuenta de servicio como el flujo de trabajo y, en el mismo proyecto, puedes generar una Token de acceso de OAuth en la función. Por ejemplo:
Para obtener más información, consulta el instructivo sobre cómo crear un flujo de trabajo con un ser humano en el ciclo mediante devoluciones de llamada.
Cómo solicitar acceso sin conexión
Los tokens de acceso vencen periódicamente y se convierten en credenciales no válidas para una solicitud de API relacionada. Puedes actualizar un token de acceso sin pedirle al usuario permiso si solicitaste acceso sin conexión a los alcances asociados con el token. Solicitar acceso sin conexión es un requisito para cualquier aplicación que necesite para acceder a una API de Google cuando el usuario no está presente. Para obtener más información, consulta Actualiza un token de acceso (acceso sin conexión).
Invoca un flujo de trabajo exactamente una vez con devoluciones de llamada
Las devoluciones de llamada son completamente idempotentes, lo que significa que puedes reintenta una devolución de llamada si falla. sin producir resultados inesperados ni efectos secundarios inesperados.
Después de crear un extremo de devolución de llamada, la URL está lista para recibir solicitudes entrantes y, por lo general, se muestra a un emisor antes de que se realice la llamada correspondiente a await_callback
. Sin embargo, si todavía no se
cuando se ejecuta el paso await_callback
, la ejecución del flujo de trabajo se
hasta que se reciba el extremo (o se agote el tiempo de espera). Una vez recibidas, las
se reanuda la ejecución del flujo de trabajo y se procesa la devolución de llamada.
Después de ejecutar el paso create_callback_endpoint
y crear una devolución de llamada
hay una única ranura de devolución de llamada disponible para el flujo de trabajo. Cuando se recibe una solicitud de devolución de llamada, este espacio se completa con la carga útil de devolución de llamada hasta que se puede procesar. Cuando se ejecuta el paso await_callback
, se procesa la devolución de llamada, se vacía el espacio y se pone a disposición de otra devolución de llamada. Luego, puedes volver a usar el extremo de devolución de llamada y llamar a await_callback
nuevamente.
Si se llama a await_callback
solo una vez, pero se recibe una segunda devolución de llamada, se produce una de las siguientes situaciones y se muestra un código de estado HTTP adecuado:
HTTP
429: Too Many Requests
indica que se recibió correctamente la primera devolución de llamada, pero que aún no se procesó; sigue esperando que se procese. El el flujo de trabajo rechaza la segunda devolución de llamada.HTTP
200: Success
indica que se recibió correctamente la primera devolución de llamada y se mostró una respuesta. La segunda devolución de llamada se almacena Es posible que nunca se procese, a menos que se llame aawait_callback
por segunda vez. Si el botón finaliza antes de que eso suceda, la segunda solicitud de devolución de llamada nunca se se procesan y se descartan.HTTP
404: Page Not Found
indica que el flujo de trabajo ya no se está ejecutando. Se procesó la primera devolución de llamada y se completó el flujo de trabajo, o el flujo de trabajo falló. Para determinar esto, deberás consultar el estado de ejecución del flujo de trabajo.
Devoluciones de llamada paralelas
Cuando los pasos se ejecutan en paralelo y un subproceso superior crea una devolución de llamada que se espera en los pasos secundarios, se sigue el mismo patrón que se describió anteriormente.
En el siguiente ejemplo, cuando se ejecuta el paso create_callback_endpoint
, se crea un espacio de devolución de llamada. Cada llamada posterior a await_callback
abre un
nuevo espacio de devolución de llamada. Se pueden realizar diez devoluciones de llamada simultáneamente si todos los subprocesos se
en ejecución y esperando antes de realizar una solicitud de devolución de llamada. Devoluciones de llamada adicionales
se crearían, pero se almacenarán y nunca se procesarían.
YAML
- createCallbackInParent: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - parallelStep: parallel: for: range: [1, 10] value: loopValue steps: - waitForCallbackInChild: call: events.await_callback args: callback: ${callback_details}
JSON
[ { "createCallbackInParent": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "POST" }, "result": "callback_details" } }, { "parallelStep": { "parallel": { "for": { "range": [ 1, 10 ], "value": "loopValue", "steps": [ { "waitForCallbackInChild": { "call": "events.await_callback", "args": { "callback": "${callback_details}" } } } ] } } } } ]
Ten en cuenta que las devoluciones de llamada se procesan en el mismo orden en que cada llamada la realiza un
rama a await_callback
. Sin embargo, el orden de ejecución de las ramas no es determinista y puede llegar a un resultado mediante varias rutas. Para obtener más información, consulta Pasos en paralelo.
Prueba un flujo de trabajo básico de devolución de llamada
Puedes crear un flujo de trabajo básico y, luego, probar la llamada al flujo de trabajo
de devolución de llamada con curl. Debes tener los Workflows Editor
o
Los permisos Workflows Admin
para el proyecto en el que reside el flujo de trabajo.
-
Crea e implementa el siguiente flujo de trabajo y, luego,
ejecutarlo.
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
Después de ejecutar el flujo de trabajo, el estado de la ejecución del flujo de trabajo es
ACTIVE
hasta que se recibe la solicitud de devolución de llamada o transcurre el tiempo de espera. - Confirma el estado de ejecución y recupera la URL de devolución de llamada:
Console
-
En la consola de Google Cloud, ve a la página Flujos de trabajo:
Ir a Workflows -
Haz clic en el nombre del flujo de trabajo que acabas de ejecutar.
Se muestra el estado de la ejecución del flujo de trabajo.
- Haz clic en la pestaña Registros.
Busca una entrada de registro similar a la que se muestra a continuación:
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- Copia la URL de devolución de llamada para usarla en el siguiente comando.
gcloud
- Primero, recupera el ID de ejecución:
Reemplazagcloud logging read "Listening for callbacks" --freshness=DURATION
DURATION
por una cantidad adecuada de tiempo para limitar las entradas de registro que se muestran (si ejecutaste el flujo de trabajo varias veces).Por ejemplo,
--freshness=t10m
muestra entradas de registro. que no tengan más de 10 minutos de antigüedad. Para obtener más información, consultagcloud topic datetimes
.Se muestra el ID de ejecución. Ten en cuenta que la URL de devolución de llamada también se muestra en el campo
textPayload
. Copiar ambos valores para su uso en los siguientes pasos. - Ejecuta el siguiente comando:
Se muestra el estado de la ejecución del flujo de trabajo.gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
-
- Ahora puedes llamar al extremo de devolución de llamada con un comando curl:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
Ten en cuenta que, para un extremo
POST
, debes usar un encabezado de representaciónContent-Type
. Por ejemplo:curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
Reemplaza
CALLBACK_URL
por la URL que copiaste en el paso anterior. - A través de la consola de Google Cloud o Google Cloud CLI, confirma que el estado de la ejecución del flujo de trabajo ahora sea
SUCCEEDED
. - Busca la entrada de registro con el
textPayload
devuelto. que se ve de la siguiente manera:Received {"body":null,"headers":...
Muestras
En estos ejemplos, se muestra la sintaxis.
Cómo detectar errores de tiempo de espera
Este ejemplo se suma al anterior, ya que detecta cualquier error de tiempo de espera y los escribe en el registro del sistema.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
Espera una devolución de llamada en un bucle de reintentos
Esta muestra modifica la muestra anterior implementando un paso de reintento. Con un predicado de reintento personalizado, el flujo de trabajo registra una advertencia y vuelve a intentar la espera en el extremo de devolución de llamada hasta cinco veces. Si la cuota de reintentos se agota antes de que se reciba la devolución de llamada, la respuesta tiempo de espera hace que el flujo de trabajo falle.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }