Os callbacks permitem que as execuções de fluxo de trabalho aguardem outro serviço para fazer uma solicitação ao endpoint de callback. Essa solicitação retoma a execução do fluxo de trabalho.
Com os callbacks, é possível sinalizar para o fluxo de trabalho que evento especificado ocorreu e aguardar esse evento sem a necessidade de pesquisa. Por exemplo, é possível criar um fluxo de trabalho que notifique quando um produto estiver de volta ao estoque ou quando um item for enviado, ou que espera para permitir a interação humana, como revisar um pedido ou validar uma tradução.
Nesta página, mostramos como criar um fluxo de trabalho compatível com um endpoint de callback e que aguarda solicitações HTTP de processos externos chegarem a esse endpoint. Também é possível aguardar eventos usando callbacks e gatilhos do Eventarc.
Os callbacks exigem duas funções integradas da biblioteca padrão:
events.create_callback_endpoint
: cria um endpoint de callback que aguarda o método HTTP especificadoevents.await_callback
: aguarda um callback ser recebido no endpoint fornecido
Criar um endpoint que receba uma solicitação de callback
Crie um endpoint de callback que recebe solicitações HTTP para chegar a esse endpoint.
- Siga as etapas para criar um novo fluxo de trabalho ou escolha um fluxo atual para atualizar, mas não o implante ainda.
- Na definição do fluxo de trabalho, adicione uma etapa para criar um endpoint de callback:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
Substitua
METHOD
pelo método HTTP esperado, um dentreGET
,HEAD
,POST
,PUT
,DELETE
,OPTIONS
ouPATCH
O padrão éPOST
.O resultado é um mapa,
callback_details
, com um campourl
que armazena o URL do endpoint criado.O endpoint do callback já está pronto para receber solicitações com o método HTTP especificado. O URL do endpoint criado pode ser usado para acionar o callback de um processo externo ao fluxo de trabalho, por exemplo, transmitindo o URL para uma função do Cloud.
- Na definição do fluxo de trabalho, adicione uma etapa para aguardar uma solicitação de callback:
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
Substitua
TIMEOUT
pelo número máximo de segundos que o fluxo de trabalho precisa aguardar uma solicitação. O padrão é 43200 (12 horas). Se o tempo decorrido antes do recebimento de uma solicitação, será gerado umTimeoutError
.Há uma duração máxima de execução. Para mais informações, consulte o limite de solicitações.
O mapa
callback_details
da etapa anteriorcreate_callback
é transmitido como um argumento. - Implante seu fluxo de trabalho para concluir a criação ou atualização.
Quando uma solicitação é recebida, todos os detalhes dela são armazenados no mapa
callback_request
. Assim, você tem acesso a toda a solicitação HTTP, incluindo o cabeçalho, o corpo e um mapaquery
para qualquer parâmetro de consulta. Exemplo:YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
Se o corpo do HTTP for em texto ou JSON, o Workflows tentará decodificar o corpo. Caso contrário, serão retornados bytes brutos.
Autorizar solicitações para o endpoint de callback
Para enviar uma solicitação a um endpoint de callback, os serviços do Google Cloud, como
o Cloud Run e o Cloud Functions, bem como serviços de
terceiros, precisam ser autorizados a fazer isso com as permissões
adequadas do Identity and Access Management (IAM), especificamente, workflows.callbacks.send
(incluída no papel de Invocador do Workflows).
Fazer uma solicitação direta
A maneira mais simples de criar credenciais de curta duração para uma conta de serviço é fazendo uma solicitação direta. Duas identidades estão envolvidas nesse fluxo: o autor da chamada e a conta de serviço para quem a credencial é criada. A chamada para o fluxo de trabalho básico desta página é um exemplo de solicitação direta. Para mais informações, consulte Usar o IAM para controlar o acesso e Permissões de solicitação direta.
Gerar um token de acesso do OAuth 2.0
Para autorizar um aplicativo a chamar o endpoint de callback, gere um
token de acesso do OAuth 2.0 para a conta de serviço associada ao fluxo de trabalho.
Supondo que você tenha as permissões necessárias (para os papéis Workflows Editor
ou
Workflows Admin
e Service Account Token Creator
), também é possível
gerar um token
executando o método generateAccessToken
.
Se a solicitação generateAccessToken
tiver êxito, o corpo da resposta retornada
terá um token de acesso do OAuth 2.0 e um prazo de validade. (Por
padrão, os tokens de acesso do OAuth 2.0 são válidos por no máximo uma hora.) Exemplo:
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
pode ser usado em uma chamada curl para o URL do endpoint de callback, como nos exemplos a seguir:
curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
Gerar um token do OAuth para uma função do Cloud
Se você invocar um callback de uma função do Cloud usando a mesma conta de serviço do fluxo de trabalho e no mesmo projeto, será possível gerar um token de acesso do OAuth na própria função. Exemplo:
Para mais contexto, consulte o tutorial sobre Como criar um fluxo de trabalho human-in-the-loop usando callbacks.
Solicitar acesso off-line
Os tokens de acesso expiram periodicamente e se tornam credenciais inválidas para uma solicitação de API relacionada. É possível atualizar um token de acesso sem solicitar a permissão do usuário se você solicitou acesso off-line aos escopos associados ao token. A solicitação de acesso off-line é um requisito para qualquer aplicativo que precise acessar uma API Google quando o usuário não estiver presente. Para mais informações, consulte Como atualizar um token de acesso (off-line).
Testar um fluxo de trabalho básico de callback
É possível criar um fluxo de trabalho básico e testar a chamada para o endpoint de callback
desse fluxo de trabalho usando curl. É necessário ter as permissões necessárias Workflows Editor
ou
Workflows Admin
para o projeto do fluxo de trabalho.
-
Crie e implante o fluxo de trabalho a seguir e
execute.
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
Depois de executar o fluxo de trabalho, o estado dele será
ACTIVE
até que a solicitação de callback seja recebida ou o tempo limite seja atingido. - Confirme o estado de execução e recupere o URL do callback:
Console
-
No console do Google Cloud, acesse a página Fluxos de trabalho:
Acessar o Workflows -
Clique no nome do fluxo de trabalho que você acabou de executar.
O estado da execução do fluxo de trabalho é exibido.
- Clique na guia Registros.
Procure uma entrada de registro semelhante a esta:
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- Copie o URL de callback a ser usado no próximo comando.
gcloud
- Primeiro, recupere o ID de execução:
gcloud logging read "Listening for callbacks" --freshness=DURATION
SubstituaDURATION
por um período apropriado para limitar as entradas de registro retornadas (se você executou o fluxo de trabalho várias vezes).Por exemplo,
--freshness=t10m
retorna entradas de registro de até 10 minutos. Para mais detalhes, consultegcloud topic datetimes
.O ID de execução é retornado. Observe que o URL de callback também é retornado no campo
textPayload
. Copie os dois valores para usar nas etapas a seguir. - Execute este comando:
-
- Agora é possível chamar o endpoint de callback usando um comando curl:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
Observe que, para um endpoint
POST
, você precisa usar um cabeçalho de representaçãoContent-Type
. Exemplo:curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
Substitua
CALLBACK_URL
pelo URL que você copiou na etapa anterior. - Com o console do Google Cloud ou a Google Cloud CLI, confirme se o estado da execução do fluxo de trabalho agora é
SUCCEEDED
. - Procure a entrada de registro com um
textPayload
semelhante a este:Received {"body":null,"headers":...
Amostras
Estas amostras demonstram a sintaxe.
Detectar erros de tempo limite
Esta amostra complementa a anterior porque detecta qualquer erro de tempo limite e grava-os no registro do sistema.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
Aguardar um callback em um loop de repetição
Esta amostra modifica a anterior implementando uma etapa de repetição. Usando um predicado de repetição personalizado, o fluxo de trabalho registra um aviso quando ocorre um tempo limite e, em seguida, repete a aguarda o endpoint de callback até cinco vezes. Se a cota de novas tentativas esgotar antes de o callback ser recebido, o erro de tempo limite final fará com que o fluxo de trabalho falhe.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }