Esperar mediante retrollamadas

Las retrollamadas permiten que las ejecuciones de flujos de trabajo esperen a que otro servicio envíe una solicitud al endpoint de retrollamada. Esta solicitud reanuda la ejecución del flujo de trabajo.

Con las retrollamadas, puedes indicar a tu flujo de trabajo que se ha producido un evento específico y esperar a que se produzca ese evento sin sondeo. Por ejemplo, puedes crear un flujo de trabajo que te avise cuando un producto vuelva a estar disponible o cuando se haya enviado un artículo, o que espere para permitir la interacción humana, como revisar un pedido o validar una traducción.

En esta página se explica cómo crear un flujo de trabajo que admita un endpoint de retrollamada y que espere a que lleguen solicitudes HTTP de procesos externos a ese endpoint. También puedes esperar eventos mediante devoluciones de llamada y activadores de Eventarc.

Las retrollamadas requieren el uso de dos funciones integradas de la biblioteca estándar:

Crear un endpoint que reciba una solicitud de retrollamada

Crea un endpoint de retrollamada que pueda recibir solicitudes HTTP para llegar a ese endpoint.

  1. Sigue los pasos para crear un flujo de trabajo o elige uno que ya tengas para actualizarlo (pero no lo implementes todavía).
  2. En la definición del flujo de trabajo, añade un paso para crear un endpoint de retrollamada:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Sustituye METHOD por el método HTTP esperado, que puede ser GET, HEAD, POST, PUT, DELETE, OPTIONS o PATCH. El valor predeterminado es POST.

    El resultado es un mapa, callback_details, con un campo url que almacena la URL del endpoint creado.

    El endpoint de retrollamada ya puede recibir solicitudes entrantes con el método HTTP especificado. La URL del endpoint creado se puede usar para activar la retrollamada desde un proceso externo al flujo de trabajo. Por ejemplo, se puede transferir la URL a una función de Cloud Run.

  3. En la definición del flujo de trabajo, añade un paso para esperar una solicitud de retrollamada:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Sustituye TIMEOUT por el número máximo de segundos que debe esperar el flujo de trabajo para recibir una solicitud. El valor predeterminado es 43200 (12 horas). Si transcurre el tiempo antes de que se reciba una solicitud, se genera un error TimeoutError.

    Ten en cuenta que hay una duración máxima de ejecución. Para obtener más información, consulta el límite de solicitudes.

    El mapa callback_details del paso create_callback anterior se transfiere como argumento.

  4. Implementa el flujo de trabajo para terminar de crearlo o actualizarlo.

    Cuando se recibe una solicitud, todos los detalles de la solicitud se almacenan en el mapa callback_request. De esta forma, tendrá acceso a toda la solicitud HTTP, incluidos el encabezado, el cuerpo y un mapa query para los parámetros de consulta. Por ejemplo:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Si el cuerpo HTTP es texto o JSON, Workflows intentará decodificarlo. De lo contrario, se devolverán bytes sin procesar.

Autorizar solicitudes al endpoint de retrollamada

Para enviar una solicitud a un endpoint de retrollamada, los Google Cloud servicios como Cloud Run y las funciones de Cloud Run, así como los servicios de terceros, deben tener autorización para hacerlo. Para ello, deben tener los permisos de Gestión de Identidades y Accesos (IAM) adecuados, concretamente, workflows.callbacks.send (incluidos en el rol Invocador de Workflows).

.

Hacer una solicitud directa

La forma más sencilla de crear credenciales de duración reducida para una cuenta de servicio es hacer una solicitud directa. En este flujo intervienen dos identidades: la persona que llama y la cuenta de servicio para la que se crea la credencial. La llamada al flujo de trabajo básico de esta página es un ejemplo de solicitud directa. Para obtener más información, consulta Usar la gestión de identidades y accesos para controlar el acceso y Permisos de solicitud directa.

Generar un token de acceso OAuth 2.0

Para autorizar una aplicación para que llame al endpoint de retrollamada, puedes generar un token de acceso de OAuth 2.0 para la cuenta de servicio asociada al flujo de trabajo. Si tienes los permisos necesarios (para los roles Workflows Editor o Workflows Admin y Service Account Token Creator), también puedes generar un token por tu cuenta ejecutando el método generateAccessToken.

Si la solicitud generateAccessToken se realiza correctamente, el cuerpo de la respuesta devuelta contiene un token de acceso de OAuth 2.0 y un tiempo de vencimiento. De forma predeterminada, los tokens de acceso de OAuth 2.0 son válidos durante 1 hora como máximo. Por ejemplo:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
El código accessToken se puede usar en una llamada curl a la URL del endpoint de retrollamada, como en los siguientes ejemplos:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Generar un token de OAuth para una función de Cloud Run

Si invocas una retrollamada desde una función de Cloud Run con la misma cuenta de servicio que el flujo de trabajo y en el mismo proyecto, puedes generar un token de acceso de OAuth en la propia función. Por ejemplo:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Para obtener más contexto, consulta el tutorial sobre cómo crear un flujo de trabajo con intervención humana mediante retrollamadas.

Solicitar acceso sin conexión

Los tokens de acceso caducan periódicamente y se convierten en credenciales no válidas para una solicitud de API relacionada. Puedes actualizar un token de acceso sin pedir permiso al usuario si has solicitado acceso sin conexión a los ámbitos asociados al token. Solicitar acceso sin conexión es un requisito para cualquier aplicación que necesite acceder a una API de Google cuando el usuario no esté presente. Para obtener más información, consulta Actualizar un token de acceso (acceso sin conexión).

Invocar un flujo de trabajo exactamente una vez mediante retrollamadas

Las retrollamadas son totalmente idempotentes, lo que significa que puedes volver a intentar una retrollamada si falla sin producir resultados inesperados ni efectos secundarios.

Una vez que hayas creado un endpoint de retrollamada, la URL estará lista para recibir solicitudes entrantes y, por lo general, se devolverá a la persona que llama antes de que se realice la llamada correspondiente a await_callback. Sin embargo, si la URL de retrollamada aún no se ha recibido cuando se ejecuta el paso await_callback, la ejecución del flujo de trabajo se bloqueará hasta que se reciba el endpoint (o se agote el tiempo de espera). Una vez recibida, se reanuda la ejecución del flujo de trabajo y se procesa la retrollamada.

Después de ejecutar el paso create_callback_endpoint y crear un endpoint de retrollamada, el flujo de trabajo tendrá una sola ranura de retrollamada disponible. Cuando se recibe una solicitud de retrollamada, este espacio se rellena con la carga útil de la retrollamada hasta que se pueda procesar. Cuando se ejecuta el paso await_callback, se procesa la devolución de llamada y el espacio se vacía y se pone a disposición de otra devolución de llamada. Después, puedes volver a usar el endpoint de retrollamada y llamar a await_callback.

Si se llama a await_callback solo una vez, pero se recibe una segunda retrollamada, se produce una de las siguientes situaciones y se devuelve un código de estado HTTP adecuado:

  • El código HTTP 429: Too Many Requests indica que la primera retrollamada se ha recibido correctamente, pero no se ha procesado. Sigue esperando a que se procese. El flujo de trabajo rechaza la segunda devolución de llamada.

  • HTTP 200: Success indica que la primera devolución de llamada se ha recibido correctamente y se ha devuelto una respuesta. La segunda retrollamada se almacena y es posible que nunca se procese a menos que se llame a await_callback una segunda vez. Si el flujo de trabajo finaliza antes de que eso ocurra, la segunda solicitud de retrollamada nunca se procesa y se descarta.

  • HTTP 404: Page Not Found indica que el flujo de trabajo ya no se está ejecutando. Se ha procesado la primera retrollamada y el flujo de trabajo se ha completado, o bien el flujo de trabajo ha fallado. Para determinarlo, tendrás que consultar el estado de ejecución del flujo de trabajo.

Retrollamadas paralelas

Cuando los pasos se ejecutan en paralelo y un subproceso principal crea una retrollamada que se espera en los pasos secundarios, se sigue el mismo patrón que se ha descrito anteriormente.

En el siguiente ejemplo, cuando se ejecuta el paso create_callback_endpoint, se crea un espacio de retrollamada. Cada llamada posterior a await_callback abre un nuevo espacio de retrollamada. Se pueden hacer diez retrollamadas simultáneamente si todos los subprocesos se están ejecutando y esperando antes de que se haga una solicitud de retrollamada. Se podrían hacer devoluciones de llamada adicionales, pero se almacenarían y nunca se procesarían.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Ten en cuenta que las retrollamadas se procesan en el mismo orden en que cada llamada se realiza por una rama a await_callback. Sin embargo, el orden de ejecución de las ramificaciones no es determinista y puede llegar a un resultado siguiendo varias rutas. Para obtener más información, consulta los pasos paralelos.

Probar un flujo de trabajo de devolución de llamada básico

Puedes crear un flujo de trabajo básico y, a continuación, probar la llamada al endpoint de retrollamada de ese flujo de trabajo mediante curl. Debes tener los permisos Workflows Editor o Workflows Admin necesarios para el proyecto en el que se encuentra el flujo de trabajo.

  1. Crea y despliega el siguiente flujo de trabajo y, a continuación, ejecútalo.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Después de ejecutar el flujo de trabajo, el estado de la ejecución del flujo de trabajo es ACTIVE hasta que se recibe la solicitud de retrollamada o se agota el tiempo de espera.

  2. Confirma el estado de ejecución y recupera la URL de retrollamada:

    Consola

    1. En la Google Cloud consola, ve a la página Flujos de trabajo:

      Ve a Workflows (Flujos de trabajo)
    2. Haz clic en el nombre del flujo de trabajo que acabas de ejecutar.

      Se muestra el estado de la ejecución del flujo de trabajo.

    3. Haz clic en la pestaña Registros.
    4. Busca una entrada de registro similar a la siguiente:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. Copia la URL de retrollamada para usarla en el siguiente comando.

    gcloud

    1. Primero, obtén el ID de ejecución:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      Sustituye DURATION por un periodo de tiempo adecuado para limitar las entradas de registro devueltas (si has ejecutado el flujo de trabajo varias veces).

      Por ejemplo, --freshness=t10m devuelve las entradas de registro que no tengan más de 10 minutos. Para obtener más información, consulta gcloud topic datetimes.

      Se devuelve el ID de ejecución. Ten en cuenta que la URL de retrollamada también se devuelve en el campo textPayload. Copia ambos valores para usarlos en los pasos siguientes.

    2. Ejecuta el siguiente comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      Se devuelve el estado de la ejecución del flujo de trabajo.
  3. Ahora puedes llamar al endpoint de retrollamada con un comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    Ten en cuenta que, para un endpoint POST, debes usar un encabezado de representación Content-Type. Por ejemplo:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    Sustituye CALLBACK_URL por la URL que has copiado en el paso anterior.

  4. En la consola de Google Cloud o con Google Cloud CLI, confirma que el estado de la ejecución del flujo de trabajo es ahora SUCCEEDED.
  5. Busca la entrada de registro con el textPayload que se parezca a lo siguiente:
    Received {"body":null,"headers":...

Ejemplos

En estos ejemplos se muestra la sintaxis.

Detectar errores de tiempo de espera

En este ejemplo se añade al anterior la captura de errores de tiempo de espera y la escritura de los errores en el registro del sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Esperar una devolución de llamada en un bucle de reintento

En este ejemplo se modifica el anterior implementando un paso de reintento. Si se usa un predicado de reintento personalizado, el flujo de trabajo registra una advertencia cuando se produce un tiempo de espera y, a continuación, reintenta la espera en el endpoint de retrollamada hasta cinco veces. Si se agota la cuota de reintentos antes de recibir la retrollamada, el error de tiempo de espera final provoca que el flujo de trabajo falle.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

Siguientes pasos