Os callbacks permitem que as execuções de fluxos de trabalho aguardem que outro serviço faça um pedido ao ponto final de callback. Esse pedido retoma a execução do fluxo de trabalho.
Com os callbacks, pode sinalizar ao seu fluxo de trabalho que ocorreu um evento especificado e aguardar esse evento sem sondagem. Por exemplo, pode criar um fluxo de trabalho que lhe envia uma notificação quando um produto volta a estar em stock ou quando um artigo foi enviado, ou que aguarda para permitir a interação humana, como rever uma encomenda ou validar uma tradução.
Esta página mostra-lhe como criar um fluxo de trabalho que suporta um ponto final de retorno de chamada e que aguarda a chegada de pedidos HTTP de processos externos a esse ponto final. Também pode aguardar eventos através de callbacks e acionadores do Eventarc.
As chamadas de retorno requerem a utilização de duas funções incorporadas da biblioteca padrão:
events.create_callback_endpoint
—Cria um ponto final de retorno que espera o método HTTP especificadoevents.await_callback
—Aguarda a receção de uma chamada de resposta no ponto final especificado
Crie um ponto final que receba um pedido de retorno de chamada
Crie um ponto final de retorno de chamada que possa receber pedidos HTTP para chegar a esse ponto final.
- Siga os passos para criar um novo fluxo de trabalho ou escolha um fluxo de trabalho existente para atualizar, mas ainda não o implemente.
- Na definição do fluxo de trabalho, adicione um passo para criar um ponto final de retorno de chamada:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
Substitua
METHOD
pelo método HTTP esperado, um deGET
,HEAD
,POST
,PUT
,DELETE
,OPTIONS
ouPATCH
. A predefinição éPOST
.O resultado é um mapa,
callback_details
, com um campourl
que armazena o URL do ponto final criado.O ponto final de callback está agora pronto para receber pedidos recebidos com o método HTTP especificado. O URL do ponto final criado pode ser usado para acionar o callback a partir de um processo externo ao fluxo de trabalho; por exemplo, transmitindo o URL a uma função do Cloud Run.
- Na definição do fluxo de trabalho, adicione um passo para aguardar um pedido de retorno de chamada:
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
Substitua
TIMEOUT
pelo número máximo de segundos que o fluxo de trabalho deve aguardar por um pedido. A predefinição é 43200 (12 horas). Se o tempo expirar antes de ser recebido um pedido, é gerado umTimeoutError
.Tenha em atenção que existe uma duração de execução máxima. Para mais informações, consulte o limite de pedidos.
O mapa
callback_details
do passo anteriorcreate_callback
é transmitido como um argumento. - Implemente o fluxo de trabalho para concluir a criação ou a atualização.
Quando é recebido um pedido, todos os detalhes do pedido são armazenados no mapa
callback_request
. Em seguida, tem acesso a todo o pedido HTTP, incluindo o respetivo cabeçalho, corpo e um mapa para quaisquer parâmetros de consulta.query
Por exemplo:YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
Se o corpo HTTP for texto ou JSON, o Workflows tenta descodificar o corpo; caso contrário, são devolvidos bytes não processados.
Autorize pedidos ao ponto final de callback
Para enviar um pedido a um ponto final de callback, Google Cloud os serviços como
o Cloud Run e as funções do Cloud Run, bem como os serviços
de terceiros, têm de estar autorizados a fazê-lo através das autorizações
de gestão de identidade e de acesso (IAM) adequadas; especificamente, workflows.callbacks.send
(incluídas na função Workflows Invoker).
Faça um pedido direto
A forma mais simples de criar credenciais de curta duração para uma conta de serviço é fazer um pedido direto. Existem duas identidades envolvidas neste fluxo: o autor da chamada e a conta de serviço para a qual a credencial é criada. A chamada para o fluxo de trabalho básico nesta página é um exemplo de um pedido direto. Para mais informações, consulte os artigos Use a IAM para controlar o acesso e Autorizações de pedidos diretos.
Gere uma chave de acesso OAuth 2.0
Para autorizar uma aplicação a chamar o ponto final de callback, pode gerar um token de acesso OAuth 2.0 para a conta de serviço associada ao fluxo de trabalho.
Partindo do princípio de que tem as autorizações necessárias (para as funções Workflows Editor
ou Workflows Admin
e Service Account Token Creator
), também pode gerar um token por si próprio executando o método generateAccessToken
.
Se o pedido generateAccessToken
for bem-sucedido, o corpo da resposta devolvido contém uma chave de acesso OAuth 2.0 e um prazo de validade. (Por predefinição, as chaves de acesso de OAuth 2.0 são válidas durante 1 hora no máximo.) Por
exemplo:
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
pode ser usado numa chamada curl para o URL do ponto final de retorno de chamada, como nos seguintes exemplos:
curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
Gere um token OAuth para uma função do Cloud Run
Se estiver a invocar um callback a partir de uma função do Cloud Run com a mesma conta de serviço que o fluxo de trabalho e no mesmo projeto, pode gerar um token de acesso OAuth na própria função. Por exemplo:
Para mais contexto, consulte o tutorial sobre como criar um fluxo de trabalho com intervenção humana através de callbacks.
Peça acesso offline
As chaves de acesso expiram periodicamente e tornam-se credenciais inválidas para um pedido de API relacionado. Pode atualizar um token de acesso sem pedir permissão ao utilizador se tiver pedido acesso offline aos âmbitos associados ao token. Pedir acesso offline é um requisito para qualquer aplicação que precise de aceder a uma API Google quando o utilizador não está presente. Para mais informações, consulte o artigo Atualizar um token de acesso (acesso offline).
Invocar um fluxo de trabalho exatamente uma vez através de callbacks
Os callbacks são totalmente idempotentes, o que significa que pode tentar novamente um callback se falhar sem produzir resultados inesperados ou efeitos secundários.
Depois de criar um ponto final de retorno, o URL está pronto para receber pedidos
recebidos e é normalmente devolvido a um autor da chamada antes de a chamada correspondente para
await_callback
ser feita. No entanto, se o URL de retorno de chamada ainda não tiver sido recebido quando o passo await_callback
for executado, a execução do fluxo de trabalho é bloqueada até que o ponto final seja recebido (ou ocorra um limite de tempo). Após a receção, a execução do fluxo de trabalho é retomada e o callback é processado.
Depois de executar o passo create_callback_endpoint
e criar um ponto final de chamada de retorno, fica disponível um único espaço de chamada de retorno para o fluxo de trabalho. Quando é recebido um pedido de retorno de chamada, este espaço é preenchido com a carga útil do retorno de chamada até que o retorno de chamada possa ser processado. Quando o passo await_callback
é executado, o callback é processado e o espaço é esvaziado e disponibilizado para outro callback. Em seguida, pode reutilizar o ponto final de retorno de chamada e ligar await_callback
novamente.
Se await_callback
for chamado apenas uma vez, mas for recebido um segundo callback, ocorre um dos seguintes cenários e é devolvido um código de estado HTTP adequado:
O HTTP
429: Too Many Requests
indica que o primeiro callback foi recebido com êxito, mas não foi processado. Continua a aguardar o processamento. O segundo retorno é rejeitado pelo fluxo de trabalho.O HTTP
200: Success
indica que o primeiro callback foi recebido com êxito e foi devolvida uma resposta. A segunda chamada de retorno é armazenada e pode nunca ser processada, a menos queawait_callback
seja chamada uma segunda vez. Se o fluxo de trabalho terminar antes disso, o segundo pedido de retorno de chamada nunca é processado e é rejeitado.HTTP
404: Page Not Found
indica que o fluxo de trabalho já não está em execução. A primeira chamada de retorno foi processada e o fluxo de trabalho foi concluído, ou o fluxo de trabalho falhou. Para determinar isto, tem de consultar o estado de execução do fluxo de trabalho.
Chamadas de retorno paralelas
Quando os passos são executados em paralelo e um callback é criado por um thread principal e aguardado nos passos secundários, segue-se o mesmo padrão descrito anteriormente.
No exemplo seguinte, quando o passo create_callback_endpoint
é executado, é criado um espaço de preenchimento de callback. Cada chamada subsequente para await_callback
abre um novo espaço de retorno de chamada. É possível fazer dez chamadas de retorno em simultâneo se todos os threads estiverem em execução e a aguardar antes de fazer um pedido de chamada de retorno. Podem ser feitas chamadas de retorno adicionais, mas são armazenadas e nunca processadas.
YAML
- createCallbackInParent: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - parallelStep: parallel: for: range: [1, 10] value: loopValue steps: - waitForCallbackInChild: call: events.await_callback args: callback: ${callback_details}
JSON
[ { "createCallbackInParent": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "POST" }, "result": "callback_details" } }, { "parallelStep": { "parallel": { "for": { "range": [ 1, 10 ], "value": "loopValue", "steps": [ { "waitForCallbackInChild": { "call": "events.await_callback", "args": { "callback": "${callback_details}" } } } ] } } } } ]
Tenha em atenção que os retornos de chamada são processados pela mesma ordem em que cada chamada é feita por uma sucursal para await_callback
. No entanto, a ordem de execução das ramificações não é determinística e pode chegar a um resultado através de vários caminhos. Para mais
informações, consulte
Passos paralelos.
Experimente um fluxo de trabalho de chamada de resposta básico
Pode criar um fluxo de trabalho básico e, em seguida, testar a chamada para o ponto final de retorno de chamada desse fluxo de trabalho usando o curl. Tem de ter as autorizações Workflows Editor
ou Workflows Admin
necessárias para o projeto no qual o fluxo de trabalho reside.
-
Crie e implemente o seguinte fluxo de trabalho e, de seguida,
execute-o.
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
Após a execução do fluxo de trabalho, o estado da execução do fluxo de trabalho é
ACTIVE
até que o pedido de retorno de chamada seja recebido ou o tempo limite expire. - Confirme o estado de execução e obtenha o URL de retorno:
Consola
-
Na Google Cloud consola, aceda à página Fluxos de trabalho:
Aceda a Fluxos de trabalho -
Clique no nome do fluxo de trabalho que acabou de executar.
É apresentado o estado da execução do fluxo de trabalho.
- Clique no separador Registos.
Procure uma entrada de registo semelhante à seguinte:
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- Copie o URL de retorno de chamada para usar no comando seguinte.
gcloud
- Primeiro, obtenha o ID de execução:
Substituagcloud logging read "Listening for callbacks" --freshness=DURATION
DURATION
por uma quantidade adequada de tempo para limitar as entradas do registo devolvidas (se tiver executado o fluxo de trabalho várias vezes).Por exemplo,
--freshness=t10m
devolve entradas do registo com uma antiguidade máxima de 10 minutos. Para obter mais detalhes, consultegcloud topic datetimes
.O ID de execução é devolvido. Tenha em atenção que o URL de retorno também é devolvido no campo
textPayload
. Copie ambos os valores para usar nos passos seguintes. - Execute o seguinte comando:
É devolvido o estado da execução do fluxo de trabalho.gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
-
- Agora, pode chamar o ponto final de retorno de chamada através de um comando curl:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
Tenha em atenção que, para um ponto final
POST
, tem de usar um cabeçalho de representaçãoContent-Type
. Por exemplo:curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
Substitua
CALLBACK_URL
pelo URL que copiou no passo anterior. - Através da Google Cloud consola ou da CLI do Google Cloud,
confirme que o estado da execução do fluxo de trabalho é agora
SUCCEEDED
. - Procure a entrada de registo com o
textPayload
devolvido semelhante ao seguinte:Received {"body":null,"headers":...
Amostras
Estes exemplos demonstram a sintaxe.
Detete erros de limite de tempo
Este exemplo adiciona ao exemplo anterior a captura de erros de limite de tempo e a escrita dos erros no registo do sistema.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
Aguardar uma chamada de retorno num ciclo de novas tentativas
Este exemplo modifica o exemplo anterior implementando um passo de repetição. Usando um predicado de repetição personalizado, o fluxo de trabalho regista um aviso quando ocorre um limite de tempo e, em seguida, tenta novamente a espera no ponto final de retorno de chamada até cinco vezes. Se a quota de novas tentativas for esgotada antes de receber a chamada de retorno, o erro de limite de tempo final faz com que o fluxo de trabalho falhe.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }