Os callbacks permitem que as execuções de fluxo de trabalho aguardem outro serviço para fazer uma solicitação ao endpoint de callback. Essa solicitação retoma a execução do fluxo de trabalho.
Com os callbacks, é possível sinalizar para o fluxo de trabalho que evento especificado ocorreu e aguardar esse evento sem a necessidade de pesquisa. Por exemplo, é possível criar um fluxo de trabalho que notifique quando um produto está em estoque novamente ou quando um item é enviado, ou que pause para permitir uma interação humana, como revisar um pedido ou validar uma tradução.
Nesta página, mostramos como criar um fluxo de trabalho compatível com um endpoint de callback e que aguarda solicitações HTTP de processos externos chegarem a esse endpoint. Você também pode aguardar eventos usando callbacks e gatilhos do Eventarc.
Os callbacks exigem duas funções integradas da biblioteca padrão:
events.create_callback_endpoint
: cria um endpoint de callback que aguarda o método HTTP especificadoevents.await_callback
: aguarda um callback ser recebido no endpoint fornecido
Criar um endpoint que receba uma solicitação de callback
Crie um endpoint de callback que recebe solicitações HTTP para chegar a esse endpoint.
- Siga as etapas para criar um novo fluxo de trabalho ou escolha um fluxo de trabalho atual para atualizar, mas não o implante ainda.
- Na definição do fluxo de trabalho, adicione uma etapa para criar um endpoint de callback:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
Substitua
METHOD
pelo método HTTP esperado, um dentreGET
,HEAD
,POST
,PUT
,DELETE
,OPTIONS
ouPATCH
O padrão éPOST
.O resultado é um mapa,
callback_details
, com um campourl
que armazena o URL do endpoint criado.O endpoint do callback já está pronto para receber solicitações com o método HTTP especificado. O URL do endpoint criado pode ser usado para acionar o callback de um processo externo ao fluxo de trabalho. Por exemplo, transmitindo o URL para uma função do Cloud Run.
- Na definição do fluxo de trabalho, adicione uma etapa para aguardar uma solicitação de callback:
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
Substitua
TIMEOUT
pelo número máximo de segundos que o fluxo de trabalho precisa aguardar uma solicitação. O padrão é 43200 (12 horas). Se o tempo decorrido antes do recebimento de uma solicitação, será gerado umTimeoutError
.Há uma duração máxima de execução. Para mais informações, consulte o limite de solicitações.
O mapa
callback_details
da etapa anteriorcreate_callback
é transmitido como um argumento. - Implante seu fluxo de trabalho para concluir a criação ou atualização.
Quando uma solicitação é recebida, todos os detalhes dela são armazenados no mapa
callback_request
. Assim, você tem acesso a toda a solicitação HTTP, incluindo o cabeçalho, o corpo e um mapaquery
para qualquer parâmetro de consulta. Exemplo:YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
Se o corpo do HTTP for em texto ou JSON, o Workflows tentará decodificar o corpo. Caso contrário, serão retornados bytes brutos.
Autorizar solicitações para o endpoint de callback
Para enviar uma solicitação a um endpoint de callback, os serviços do Google Cloud, como o
Cloud Run e as funções do Cloud Run, bem como serviços de
terceiros, precisam ser autorizados para fazer isso pelas permissões adequadas do
Identity and Access Management (IAM), especificamente workflows.callbacks.send
(incluído na função Invocador de fluxos de trabalho).
Fazer uma solicitação direta
A maneira mais simples de criar credenciais de curta duração para uma conta de serviço é fazendo uma solicitação direta. Duas identidades estão envolvidas nesse fluxo: o autor da chamada e a conta de serviço para quem a credencial é criada. A chamada para o fluxo de trabalho básico desta página é um exemplo de solicitação direta. Para mais informações, consulte Usar o IAM para controlar o acesso e Permissões de solicitação direta.
Gerar um token de acesso do OAuth 2.0
Para autorizar um aplicativo a chamar o endpoint de callback, gere um
token de acesso do OAuth 2.0 para a conta de serviço associada ao fluxo de trabalho.
Supondo que você tenha as permissões necessárias (para os papéis Workflows Editor
ou
Workflows Admin
e Service Account Token Creator
), também é possível
gerar um token
executando o método generateAccessToken
.
Se a solicitação generateAccessToken
tiver êxito, o corpo da resposta retornada
terá um token de acesso do OAuth 2.0 e um prazo de validade. (Por
padrão, os tokens de acesso do OAuth 2.0 são válidos por no máximo uma hora.) Exemplo:
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
pode ser usado em uma chamada curl no URL do endpoint
de callback, como nos exemplos a seguir:
curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
Gerar um token do OAuth para uma função do Cloud Run
Se você invocar um callback de uma função do Cloud Run usando a mesma conta de serviço do fluxo de trabalho e no mesmo projeto, será possível gerar um token de acesso do OAuth na própria função. Exemplo:
Para mais contexto, consulte o tutorial sobre Como criar um fluxo de trabalho human-in-the-loop usando callbacks.
Solicitar acesso off-line
Os tokens de acesso expiram periodicamente e se tornam credenciais inválidas para uma solicitação de API relacionada. É possível atualizar um token de acesso sem solicitar a permissão do usuário se você solicitou acesso off-line aos escopos associados ao token. A solicitação de acesso off-line é um requisito para qualquer aplicativo que precise acessar uma API Google quando o usuário não estiver presente. Para mais informações, consulte Como atualizar um token de acesso (off-line).
Invocar um fluxo de trabalho exatamente uma vez usando callbacks
Os callbacks são totalmente idempotentes, o que significa que você pode tentar novamente um callback se ele falhar sem produzir resultados inesperados ou efeitos colaterais.
Depois de criar um endpoint de callback, o URL fica pronto para receber solicitações
de entrada e geralmente é retornado para um autor da chamada antes que a chamada correspondente para
await_callback
seja feita. No entanto, se o URL do callback ainda não tiver sido
recebido quando a etapa await_callback
for executada, a execução do fluxo de trabalho será
bloqueada até que o endpoint seja recebido (ou um tempo limite ocorra). Depois de recebida, a
execução do fluxo de trabalho é retomada e o callback é processado.
Depois de executar a etapa create_callback_endpoint
e criar um endpoint de callback, um único slot de callback fica disponível para o fluxo de trabalho. Quando uma solicitação de callback
é recebida, esse slot é preenchido com o payload do callback até que o
callback possa ser processado. Quando a etapa await_callback
é executada, o
callback é processado, o slot é esvaziado e disponibilizado para outro
callback. Em seguida, você pode reutilizar o endpoint de callback e chamar await_callback
novamente.
Se await_callback
for chamado apenas uma vez, mas uma segunda callback for recebida, um
dos seguintes cenários ocorrerá e um código de status HTTP apropriado será
retornado:
O HTTP
429: Too Many Requests
indica que o primeiro callback foi recebido com sucesso, mas não foi processado. Ele ainda está aguardando o processamento. O segundo callback é rejeitado pelo fluxo de trabalho.O HTTP
200: Success
indica que o primeiro callback foi recebido com sucesso e uma resposta foi retornada. O segundo callback é armazenado e pode nunca ser processado, a menos queawait_callback
seja chamado uma segunda vez. Se o fluxo de trabalho for encerrado antes disso, a segunda solicitação de callback nunca será processada e será descartada.O HTTP
404: Page Not Found
indica que o fluxo de trabalho não está mais em execução. O primeiro callback foi processado e o fluxo de trabalho foi concluído ou falhou. Para determinar isso, você precisa consultar o estado de execução do fluxo de trabalho.
Callbacks paralelos
Quando as etapas são executadas em paralelo e um callback é criado por uma linha de execução mãe e aguardado em etapas filhas, o mesmo padrão descrito anteriormente é seguido.
No exemplo a seguir, quando a etapa create_callback_endpoint
é executada,
um slot de callback é criado. Cada chamada subsequente para await_callback
abre um
novo slot de callback. Dez callbacks podem ser feitos simultaneamente, se todas as linhas de execução estiverem
em execução e aguardando antes que uma solicitação de callback seja feita. Outros callbacks
poderiam ser feitos, mas seriam armazenados e nunca processados.
YAML
- createCallbackInParent: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - parallelStep: parallel: for: range: [1, 10] value: loopValue steps: - waitForCallbackInChild: call: events.await_callback args: callback: ${callback_details}
JSON
[ { "createCallbackInParent": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "POST" }, "result": "callback_details" } }, { "parallelStep": { "parallel": { "for": { "range": [ 1, 10 ], "value": "loopValue", "steps": [ { "waitForCallbackInChild": { "call": "events.await_callback", "args": { "callback": "${callback_details}" } } } ] } } } } ]
Os callbacks são processados na mesma ordem em que cada chamada é feita por um
ramo para await_callback
. No entanto, a ordem de execução das ramificações não é
determinística e pode chegar a um resultado usando vários caminhos. Para mais
informações, consulte
Etapas paralelas.
Testar um fluxo de trabalho básico de callback
É possível criar um fluxo de trabalho básico e testar a chamada para o endpoint de callback
desse fluxo de trabalho usando curl. É necessário ter as permissões necessárias Workflows Editor
ou
Workflows Admin
para o projeto do fluxo de trabalho.
-
Crie e implante o fluxo de trabalho a seguir e
execute-o.
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
Depois de executar o fluxo de trabalho, o estado dele será
ACTIVE
até que a solicitação de callback seja recebida ou até que o tempo limite expire. - Confirme o estado de execução e recupere o URL do callback:
Console
-
No console do Google Cloud, acesse a página Fluxos de trabalho:
Acessar fluxos de trabalho -
Clique no nome do fluxo de trabalho que você acabou de executar.
O estado da execução do fluxo de trabalho é exibido.
- Clique na guia Registros.
Procure uma entrada de registro semelhante a esta:
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- Copie o URL de callback a ser usado no próximo comando.
gcloud
- Primeiro, recupere o ID de execução:
Substituagcloud logging read "Listening for callbacks" --freshness=DURATION
DURATION
por um período apropriado para limitar as entradas de registro retornadas (se você executou o fluxo de trabalho várias vezes).Por exemplo,
--freshness=t10m
retorna entradas de registro de até 10 minutos. Para mais detalhes, consultegcloud topic datetimes
.O ID de execução é retornado. Observe que o URL de callback também é retornado no campo
textPayload
. Copie os dois valores para usar nas etapas a seguir. - Execute este comando:
O estado da execução do fluxo de trabalho é retornado.gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
-
- Agora é possível chamar o endpoint de callback usando um comando curl:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
Para um endpoint
POST
, é necessário usar um cabeçalho de representaçãoContent-Type
. Exemplo:curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
Substitua
CALLBACK_URL
pelo URL que você copiou na etapa anterior. - No console do Google Cloud ou usando a Google Cloud CLI,
confirme se o estado da execução do fluxo de trabalho agora é
SUCCEEDED
. - Procure a entrada de registro com um
textPayload
semelhante a este:Received {"body":null,"headers":...
Amostras
Estas amostras demonstram a sintaxe.
Detectar erros de tempo limite
Esta amostra complementa a anterior porque detecta qualquer erro de tempo limite e grava-os no registro do sistema.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
Aguardar um callback em um loop de repetição
Esta amostra modifica a anterior implementando uma etapa de repetição. Usando um predicado de repetição personalizado, o fluxo de trabalho registra um aviso quando ocorre um tempo limite e, em seguida, repete a aguarda o endpoint de callback até cinco vezes. Se a cota de novas tentativas esgotar antes de o callback ser recebido, o erro de tempo limite final fará com que o fluxo de trabalho falhe.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }