Espera con devoluciones de llamada

Las devoluciones de llamada permiten que las ejecuciones de flujos de trabajo esperen a que otro servicio realice una una solicitud al extremo de devolución de llamada; esta solicitud reanude la ejecución de la en el flujo de trabajo.

Con las devoluciones de llamada, puedes indicar al flujo de trabajo que un evento específico y esperar a que ese evento no tenga sondeo. Por ejemplo, puedes cree un flujo de trabajo que notifique cuando un producto vuelva a estar en stock o cuando se envió el artículo; o que espera para permitir la interacción humana, como la revisión de una o validar una traducción.

En esta página, se muestra cómo crear un flujo de trabajo que admita un extremo de devolución de llamada. y que espera que las solicitudes HTTP de los procesos externos lleguen extremo. También puedes Esperar eventos mediante devoluciones de llamada y activadores de Eventarc

Las devoluciones de llamada requieren el uso de dos funciones estándar integradas de la biblioteca:

Crea un extremo que reciba una solicitud de devolución de llamada

Crear un extremo de devolución de llamada que pueda recibir solicitudes HTTP para llegar a ese extremo.

  1. Sigue los pasos para crear el proyecto. un flujo de trabajo nuevo o elegir uno existente para actualizar pero aún no lo implementas.
  2. En la definición del flujo de trabajo, agrega el siguiente paso para crear un extremo de devolución de llamada:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Reemplaza METHOD por el método HTTP esperado, uno de GET, HEAD, POST, PUT, DELETE, OPTIONS o PATCH. El el valor predeterminado es POST.

    El resultado es un mapa, callback_details, con una Campo url que almacena la URL del extremo creado

    El extremo de devolución de llamada ahora está listo para recibir solicitudes entrantes con el método HTTP especificado. Se puede usar la URL del extremo creado para activar la devolución de llamada desde un proceso externo al flujo de trabajo; por ejemplo, al pasar la URL a una Cloud Function.

  3. En la definición del flujo de trabajo, agrega el siguiente paso para esperar una solicitud de devolución de llamada:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Reemplaza TIMEOUT por la cantidad máxima de elementos segundos que el flujo de trabajo debe esperar una solicitud. El valor predeterminado es 43200 (12 horas). Si el tiempo transcurre antes de que se reciba una solicitud, se Se genera TimeoutError.

    Ten en cuenta que hay una duración máxima de ejecución. Para obtener más información, consulta el límite de solicitudes.

    El mapa de callback_details anterior El paso create_callback se pasa como argumento.

  4. Implementa tu flujo de trabajo para terminar de crearlo o actualizarlo.

    Cuando se recibe una solicitud, todos los detalles de la solicitud se almacenan en el mapa de callback_request. Luego, tienes acceso a toda HTTP(S), incluidos su encabezado, cuerpo y un mapa de query para cualquier parámetro de consulta. Por ejemplo:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Si el cuerpo HTTP es texto o JSON, Workflows intentará decodificar el cuerpo. De lo contrario, se mostrarán bytes sin procesar.

Autoriza solicitudes al extremo de devolución de llamada

Para enviar una solicitud a un extremo de devolución de llamada, los servicios de Google Cloud como Cloud Run y Cloud Functions, además de herramientas de servicios, debe estar autorizada para ello y contar con la información permisos de Identity and Access Management (IAM) específicamente, workflows.callbacks.send (se incluye en el rol de invocador de flujos de trabajo).

Cómo realizar una solicitud directa

La forma más sencilla de crear credenciales de corta duración para una cuenta de servicio es hacer una solicitud directa. Hay dos identidades involucradas en este flujo: el emisor, y la cuenta de servicio para la que se creó la credencial. La llamada a la el flujo de trabajo básico de esta página es un ejemplo de una solicitud directa. Para obtener más información, consulta Usa IAM para controlar el acceso y Permisos de solicitud directa.

Genera un token de acceso de OAuth 2.0

Para autorizar a una aplicación a llamar al extremo de devolución de llamada, puedes generar un Token de acceso de OAuth 2.0 para la cuenta de servicio asociada con el flujo de trabajo. Si tienes los permisos necesarios (para Workflows Editor o Workflows Admin y Service Account Token Creator), también puedes generar un token tú mismo ejecutar el método generateAccessToken

Si la solicitud generateAccessToken tiene éxito, el del cuerpo de la respuesta contiene un token de acceso de OAuth 2.0 y una fecha y hora de vencimiento. (Por de forma predeterminada, los tokens de acceso de OAuth 2.0 son válidos durante un máximo de 1 hora). Por ejemplo:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
El código accessToken se puede usar en una llamada curl al extremo de devolución de llamada como en los siguientes ejemplos:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Genera un token de OAuth para una Cloud Function

Si invocas una devolución de llamada desde una Cloud Function con la misma de servicio como el flujo de trabajo y, en el mismo proyecto, puedes generar una Token de acceso de OAuth en la función. Por ejemplo:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Para obtener más contexto, consulta el instructivo sobre Crear un flujo de trabajo con interacción humana mediante devoluciones de llamada.

Solicitar acceso sin conexión

Los tokens de acceso caducan periódicamente y se convierten en credenciales no válidas para una cuenta solicitud a la API. Puedes actualizar un token de acceso sin pedirle al usuario permiso si solicitaste acceso sin conexión a los alcances asociados con el token. Solicitar acceso sin conexión es un requisito para cualquier aplicación que necesite para acceder a una API de Google cuando el usuario no está presente. Para obtener más información, consulta Actualiza un token de acceso (acceso sin conexión).

Invoca un flujo de trabajo exactamente una vez mediante devoluciones de llamada

Las devoluciones de llamada son completamente idempotentes, lo que significa que puedes reintenta una devolución de llamada si falla. sin producir resultados inesperados ni efectos secundarios inesperados.

Después de crear un extremo de devolución de llamada, la URL está lista para recibir entradas de estado y, por lo general, se devuelve a un llamador antes de la llamada correspondiente Se creó await_callback. Sin embargo, si todavía no se cuando se ejecuta el paso await_callback, la ejecución del flujo de trabajo se hasta que se reciba el extremo (o se agote el tiempo de espera). Una vez recibidas, las se reanuda la ejecución del flujo de trabajo y se procesa la devolución de llamada.

Después de ejecutar el paso create_callback_endpoint y crear una devolución de llamada hay una única ranura de devolución de llamada disponible para el flujo de trabajo. Cuando se recibe una devolución de llamada una solicitud, este espacio se llena con la carga útil de devolución de llamada hasta que y procesa la devolución de llamada. Cuando se ejecuta el paso await_callback, el se procesa la devolución de llamada, y el espacio se vacía y queda disponible para otra devolución de llamada. Luego, puedes volver a usar el extremo de devolución de llamada y llamar a await_callback. de nuevo.

Si se llama a await_callback solo una vez, pero se recibe una segunda devolución de llamada, se genera una de las siguientes situaciones y se aplica un código de estado HTTP devuelto:

  • HTTP 429: Too Many Requests indica que se recibió la primera devolución de llamada correctamente, pero no se procesó; aún queda a la espera de ser procesada. El el flujo de trabajo rechaza la segunda devolución de llamada.

  • HTTP 200: Success indica que se recibió la primera devolución de llamada correctamente y se devolvió una respuesta. La segunda devolución de llamada se almacena Es posible que nunca se procese, a menos que se llame a await_callback por segunda vez. Si el botón finaliza antes de que eso suceda, la segunda solicitud de devolución de llamada nunca se se procesan y se descartan.

  • HTTP 404: Page Not Found indica que el flujo de trabajo ya no se está ejecutando. Se procesó la primera devolución de llamada y se completó el flujo de trabajo, o el flujo de trabajo falló. Para determinar esto, deberás consultar el flujo de trabajo el estado de ejecución.

Devoluciones de llamada paralelas

Cuando los pasos se ejecutan en paralelo y un elemento superior crea una devolución de llamada subproceso y se espera en los pasos secundarios, el mismo patrón que se describió anteriormente sigue.

En el siguiente ejemplo, cuando se ejecuta el paso create_callback_endpoint, se crea una ranura de devolución de llamada. Cada llamada posterior a await_callback abre un nuevo espacio de devolución de llamada. Se pueden realizar diez devoluciones de llamada simultáneamente, si todos los subprocesos se en ejecución y esperando antes de realizar una solicitud de devolución de llamada. Devoluciones de llamada adicionales se crearían, pero se almacenarán y nunca se procesarían.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Ten en cuenta que las devoluciones de llamada se procesan en el mismo orden en que cada llamada la realiza un rama a await_callback. Sin embargo, el orden de ejecución de las ramas es no deterministas y pueden llegar a un resultado mediante diversas rutas. Para ver más información, consulta Pasos paralelos.

Prueba un flujo de trabajo básico de devolución de llamada

Puedes crear un flujo de trabajo básico y, luego, probar la llamada a los servicios de devolución de llamada con curl. Debes tener los Workflows Editor o Los permisos Workflows Admin para el proyecto en el que reside el flujo de trabajo.

  1. Crea e implementa el siguiente flujo de trabajo y, luego, ejecutarlo.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Después de ejecutar el flujo de trabajo, su estado es ACTIVE hasta que se reciba la solicitud de devolución de llamada o se agote el tiempo de espera el tiempo transcurrido.

  2. Confirma el estado de ejecución y recupera la URL de devolución de llamada:

    Console

    1. En la consola de Google Cloud, ve a Página Flujos de trabajo:

      Ir a Workflows
    2. Haz clic en el nombre del flujo de trabajo que acabas de ejecutar.

      Se muestra el estado de la ejecución del flujo de trabajo.

    3. Haz clic en la pestaña Registros.
    4. Busca una entrada de registro similar a la que se muestra a continuación:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Copia la URL de devolución de llamada para usarla en el siguiente comando.

    gcloud

    1. Primero, recupera el ID de ejecución:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Reemplaza DURATION por un importe adecuado de tiempo para limitar las entradas de registro devueltas (si ejecutaste el flujo de trabajo varias veces).

      Por ejemplo, --freshness=t10m muestra entradas de registro. que no tengan más de 10 minutos de antigüedad. Para obtener más información, consulta gcloud topic datetimes

      Se muestra el ID de ejecución. Ten en cuenta que la URL de devolución de llamada también que se muestra en el campo textPayload. Copiar ambos valores para su uso en los siguientes pasos.

    2. Ejecuta el siguiente comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      
      Se muestra el estado de la ejecución del flujo de trabajo.
  3. Ahora puedes llamar al extremo de devolución de llamada con un comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    Ten en cuenta que para un extremo POST, debes usar un Encabezado de representación Content-Type. Por ejemplo:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    Reemplaza CALLBACK_URL por la URL que copiaste en el archivo paso anterior.

  4. A través de la consola de Google Cloud o de Google Cloud CLI, confirma que el estado de la ejecución del flujo de trabajo ahora sea SUCCEEDED.
  5. Busca la entrada de registro con el textPayload devuelto. que se ve de la siguiente manera:
    Received {"body":null,"headers":...
    

Muestras

En estos ejemplos, se demuestra la sintaxis.

Detecta errores de tiempo de espera

Esta muestra se suma a la anterior mediante la captura de errores de tiempo de espera. escribir los errores en el registro del sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Espera una devolución de llamada en un bucle de reintentos

Esta muestra modifica la muestra anterior implementando un paso de reintento. Con un predicado de reintento personalizado, el flujo de trabajo registra una advertencia y vuelve a intentar la espera en el extremo de devolución de llamada hasta cinco veces. Si la cuota de reintentos se agota antes de que se reciba la devolución de llamada, la respuesta tiempo de espera hace que el flujo de trabajo falle.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

¿Qué sigue?