Espera con devoluciones de llamada

Las devoluciones de llamada permiten que las ejecuciones de flujos de trabajo esperen a que otro servicio realice una solicitud al extremo de devolución de llamada. Esa solicitud reanuda la ejecución del flujo de trabajo.

Con las devoluciones de llamada, puedes indicarle a tu flujo de trabajo que se produjo un evento especificado y esperar ese evento sin sondeo. Por ejemplo, puedes crear un flujo de trabajo que te notifique cuando un producto vuelva a estar en stock o cuando se envíe un artículo, o que espere para permitir la interacción humana, como revisar un pedido o validar una traducción.

En esta página, se muestra cómo crear un flujo de trabajo que admita un extremo de devolución de llamada y que espere a que lleguen a ese extremo solicitudes HTTP de procesos externos. También puedes esperar eventos con devoluciones de llamada y activadores de Eventarc.

Las devoluciones de llamada requieren el uso de dos funciones integradas de la biblioteca estándar:

Crea un extremo que reciba una solicitud de devolución de llamada

Crea un extremo de devolución de llamada que pueda recibir solicitudes HTTP para llegar a ese extremo.

  1. Sigue los pasos para crear un flujo de trabajo nuevo o elige uno existente para actualizarlo, pero no lo implementes aún.
  2. En la definición del flujo de trabajo, agrega un paso para crear un extremo de devolución de llamada:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Reemplaza METHOD por el método HTTP esperado, uno de GET, HEAD, POST, PUT, DELETE, OPTIONS o PATCH. El valor predeterminado es POST.

    El resultado es un mapa, callback_details, con un campo url que almacena la URL del extremo creado.

    El extremo de devolución de llamada ahora está listo para recibir solicitudes entrantes con el método HTTP especificado. La URL del extremo creado se puede usar para activar la devolución de llamada desde un proceso externo al flujo de trabajo, por ejemplo, pasando la URL a una función de Cloud Run.

  3. En la definición del flujo de trabajo, agrega un paso para esperar una solicitud de devolución de llamada:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Reemplaza TIMEOUT por la cantidad máxima de segundos que el flujo de trabajo debe esperar una solicitud. El valor predeterminado es 43200 (12 horas). Si el tiempo transcurre antes de que se reciba una solicitud, se genera un TimeoutError.

    Ten en cuenta que hay una duración máxima de ejecución. Para obtener más información, consulta el límite de solicitudes.

    El mapa callback_details del paso create_callback anterior se pasa como un argumento.

  4. Implementa tu flujo de trabajo para terminar de crearlo o actualizarlo.

    Cuando se recibe una solicitud, todos sus detalles se almacenan en el mapa callback_request. Luego, tendrás acceso a toda la solicitud HTTP, incluidos su encabezado, cuerpo y un mapa query para cualquier parámetro de consulta. Por ejemplo:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Si el cuerpo HTTP es texto o JSON, Workflows intentará decodificarlo. De lo contrario, se devolverán bytes sin procesar.

Autoriza solicitudes al extremo de devolución de llamada

Para enviar una solicitud a un extremo de devolución de llamada,los servicios de Google Cloud como Cloud Run y Cloud Run Functions, así como los servicios de terceros, deben estar autorizados para hacerlo con los permisos adecuados de Identity and Access Management (IAM); específicamente, workflows.callbacks.send(incluido en el rol de invocador de Workflows).

Cómo realizar una solicitud directa

La forma más sencilla de crear credenciales de corta duración para una cuenta de servicio es realizar una solicitud directa. En este flujo, participan dos identidades: el emisor y la cuenta de servicio para la que se creó la credencial. La llamada al flujo de trabajo básico en esta página es un ejemplo de una solicitud directa. Para obtener más información, consulta Usa IAM para controlar el acceso y Permisos de solicitud directa.

Genera un token de acceso de OAuth 2.0

Para autorizar una aplicación a llamar al extremo de devolución de llamada, puedes generar un token de acceso de OAuth 2.0 para la cuenta de servicio asociada al flujo de trabajo. Si suponemos que tienes los permisos necesarios (para los roles Workflows Editor o Workflows Admin y Service Account Token Creator), también puedes generar un token por tu cuenta ejecutando el método generateAccessToken.

Si la solicitud generateAccessToken se ejecuta correctamente, el cuerpo de la respuesta que se devuelve contendrá un token de acceso de OAuth 2.0 y un tiempo de caducidad. (De forma predeterminada, los tokens de acceso de OAuth 2.0 son válidos durante un máximo de 1 hora). Por ejemplo:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
Luego, el código accessToken se puede usar en una llamada a curl a la URL del extremo de devolución de llamada, como en los siguientes ejemplos:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Genera un token de OAuth para una función de Cloud Run

Si invocas una devolución de llamada desde una función de Cloud Run con la misma cuenta de servicio que el flujo de trabajo y en el mismo proyecto, puedes generar un token de acceso de OAuth en la función. Por ejemplo:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Para obtener más información, consulta el instructivo sobre cómo crear un flujo de trabajo con participación humana mediante devoluciones de llamada.

Cómo solicitar acceso sin conexión

Los tokens de acceso vencen periódicamente y se convierten en credenciales no válidas para una solicitud de API relacionada. Puedes actualizar un token de acceso sin solicitarle permiso al usuario si solicitaste acceso sin conexión a los alcances asociados con el token. Solicitar acceso sin conexión es un requisito para cualquier aplicación que necesite acceder a una API de Google cuando el usuario no esté presente. Para obtener más información, consulta Actualiza un token de acceso (acceso sin conexión).

Cómo invocar un flujo de trabajo exactamente una vez con devoluciones de llamada

Las devoluciones de llamada son completamente idempotentes, lo que significa que puedes volver a intentarlas si fallan sin producir resultados inesperados ni efectos secundarios.

Después de crear un extremo de devolución de llamada, la URL está lista para recibir solicitudes entrantes y, por lo general, se devuelve a un llamador antes de que se realice la llamada correspondiente a await_callback. Sin embargo, si aún no se recibió la URL de devolución de llamada cuando se ejecuta el paso await_callback, se bloquea la ejecución del flujo de trabajo hasta que se recibe el extremo (o se produce un tiempo de espera). Una vez que se recibe, se reanuda la ejecución del flujo de trabajo y se procesa la devolución de llamada.

Después de ejecutar el paso create_callback_endpoint y crear un extremo de devolución de llamada, habrá una sola ranura de devolución de llamada disponible para el flujo de trabajo. Cuando se recibe una solicitud de devolución de llamada, este segmento se completa con la carga útil de la devolución de llamada hasta que se pueda procesar la devolución de llamada. Cuando se ejecuta el paso await_callback, se procesa la devolución de llamada, y la ranura se vacía y queda disponible para otra devolución de llamada. Luego, puedes volver a usar el extremo de devolución de llamada y llamar a await_callback.

Si se llama a await_callback solo una vez, pero se recibe una segunda devolución de llamada, se produce una de las siguientes situaciones y se devuelve un código de estado HTTP adecuado:

  • El HTTP 429: Too Many Requests indica que la primera devolución de llamada se recibió correctamente, pero no se procesó; sigue esperando a que se procese. El flujo de trabajo rechaza la segunda devolución de llamada.

  • El código HTTP 200: Success indica que se recibió la primera devolución de llamada correctamente y se devolvió una respuesta. La segunda devolución de llamada se almacena y es posible que nunca se procese, a menos que se llame a await_callback por segunda vez. Si el flujo de trabajo finaliza antes de que eso suceda, la segunda solicitud de devolución de llamada nunca se procesa y se descarta.

  • El código HTTP 404: Page Not Found indica que el flujo de trabajo ya no se está ejecutando. Se procesó la primera devolución de llamada y se completó el flujo de trabajo, o bien falló el flujo de trabajo. Para determinar esto, deberás consultar el estado de ejecución del flujo de trabajo.

Devoluciones de llamada paralelas

Cuando los pasos se ejecutan en paralelo y un subproceso principal crea una devolución de llamada y espera en los pasos secundarios, se sigue el mismo patrón que se describió anteriormente.

En el siguiente ejemplo, cuando se ejecuta el paso create_callback_endpoint, se crea una ranura de devolución de llamada. Cada llamada subsiguiente a await_callback abre un nuevo espacio de devolución de llamada. Se pueden realizar diez devoluciones de llamada de forma simultánea si todos los subprocesos se ejecutan y esperan antes de que se realice una solicitud de devolución de llamada. Se podrían realizar devoluciones de llamada adicionales, pero se almacenarían y nunca se procesarían.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Ten en cuenta que las devoluciones de llamada se procesan en el mismo orden en que cada rama realiza la llamada a await_callback. Sin embargo, el orden de ejecución de las ramas no es determinístico y puede llegar a un resultado a través de varias rutas. Para obtener más información, consulta Pasos paralelos.

Prueba un flujo de trabajo de devolución de llamada básico

Puedes crear un flujo de trabajo básico y, luego, probar la llamada al extremo de devolución de llamada de ese flujo de trabajo con curl. Debes tener los permisos Workflows Editor o Workflows Admin necesarios para el proyecto en el que reside el flujo de trabajo.

  1. Crea e implementa el siguiente flujo de trabajo y, luego, ejecútalo.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Después de ejecutar el flujo de trabajo, el estado de la ejecución del flujo de trabajo es ACTIVE hasta que se recibe la solicitud de devolución de llamada o se agota el tiempo de espera.

  2. Confirma el estado de ejecución y recupera la URL de devolución de llamada:

    Console

    1. En la consola de Google Cloud , ve a la página Workflows:

      Ir a Workflows
    2. Haz clic en el nombre del flujo de trabajo que acabas de ejecutar.

      Se muestra el estado de la ejecución del flujo de trabajo.

    3. Haz clic en la pestaña Registros.
    4. Busca una entrada de registro similar a la que se muestra a continuación:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. Copia la URL de devolución de llamada para usarla en el siguiente comando.

    gcloud

    1. Primero, recupera el ID de ejecución:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      Reemplaza DURATION por un período adecuado para limitar las entradas de registro que se muestran (si ejecutaste el flujo de trabajo varias veces).

      Por ejemplo, --freshness=t10m devuelve las entradas de registro que no tienen más de 10 minutos. Para obtener más información, consulta gcloud topic datetimes.

      Se devuelve el ID de ejecución. Ten en cuenta que la URL de devolución de llamada también se devuelve en el campo textPayload. Copia ambos valores para usarlos en los siguientes pasos.

    2. Ejecuta el siguiente comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      Se devuelve el estado de la ejecución del flujo de trabajo.
  3. Ahora puedes llamar al extremo de devolución de llamada con un comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    Ten en cuenta que, para un extremo POST, debes usar un encabezado de representación Content-Type. Por ejemplo:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    Reemplaza CALLBACK_URL por la URL que copiaste en el paso anterior.

  4. A través de la consola de Google Cloud o con Google Cloud CLI, confirma que el estado de la ejecución del flujo de trabajo ahora es SUCCEEDED.
  5. Busca la entrada de registro con el textPayload devuelto que se parezca a lo siguiente:
    Received {"body":null,"headers":...

Muestras

En estos ejemplos, se muestra la sintaxis.

Cómo detectar errores de tiempo de espera

Este ejemplo se basa en el anterior y detecta cualquier error de tiempo de espera, y escribe los errores en el registro del sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Espera una devolución de llamada en un bucle de reintentos

En este ejemplo, se modifica el anterior implementando un paso de reintento. Con un predicado de reintento personalizado, el flujo de trabajo registra una advertencia cuando se produce un tiempo de espera y, luego, reintenta la espera en el extremo de devolución de llamada hasta cinco veces. Si se agota la cuota de reintentos antes de que se reciba la devolución de llamada, el error de tiempo de espera final hace que falle el flujo de trabajo.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

¿Qué sigue?