Aguardar usando callbacks

Os callbacks permitem que as execuções de fluxo de trabalho aguardem outro serviço para fazer uma solicitação ao endpoint de callback. Essa solicitação retoma a execução do fluxo de trabalho.

Com os callbacks, é possível sinalizar para o fluxo de trabalho que evento especificado ocorreu e aguardar esse evento sem a necessidade de pesquisa. Por exemplo, é possível criar um fluxo de trabalho que notifique quando um produto estiver de volta ao estoque ou quando um item for enviado, ou que espera para permitir a interação humana, como revisar um pedido ou validar uma tradução.

Nesta página, mostramos como criar um fluxo de trabalho compatível com um endpoint de callback e que aguarda solicitações HTTP de processos externos chegarem a esse endpoint. Também é possível aguardar eventos usando callbacks e gatilhos do Eventarc.

Os callbacks exigem duas funções integradas da biblioteca padrão:

Criar um endpoint que receba uma solicitação de callback

Crie um endpoint de callback que recebe solicitações HTTP para chegar a esse endpoint.

  1. Siga as etapas para criar um novo fluxo de trabalho ou escolha um fluxo atual para atualizar, mas não o implante ainda.
  2. Na definição do fluxo de trabalho, adicione uma etapa para criar um endpoint de callback:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Substitua METHOD pelo método HTTP esperado, um dentre GET, HEAD, POST, PUT, DELETE, OPTIONS ou PATCH O padrão é POST.

    O resultado é um mapa, callback_details, com um campo url que armazena o URL do endpoint criado.

    O endpoint do callback já está pronto para receber solicitações com o método HTTP especificado. O URL do endpoint criado pode ser usado para acionar o callback de um processo externo ao fluxo de trabalho, por exemplo, transmitindo o URL para uma função do Cloud.

  3. Na definição do fluxo de trabalho, adicione uma etapa para aguardar uma solicitação de callback:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Substitua TIMEOUT pelo número máximo de segundos que o fluxo de trabalho precisa aguardar uma solicitação. O padrão é 43200 (12 horas). Se o tempo decorrido antes do recebimento de uma solicitação, será gerado um TimeoutError.

    Há uma duração máxima de execução. Para mais informações, consulte o limite de solicitações.

    O mapa callback_details da etapa anterior create_callback é transmitido como um argumento.

  4. Implante seu fluxo de trabalho para concluir a criação ou atualização.

    Quando uma solicitação é recebida, todos os detalhes dela são armazenados no mapa callback_request. Assim, você tem acesso a toda a solicitação HTTP, incluindo o cabeçalho, o corpo e um mapa query para qualquer parâmetro de consulta. Exemplo:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Se o corpo do HTTP for em texto ou JSON, o Workflows tentará decodificar o corpo. Caso contrário, serão retornados bytes brutos.

Autorizar solicitações para o endpoint de callback

Para enviar uma solicitação a um endpoint de callback, os serviços do Google Cloud, como o Cloud Run e o Cloud Functions, bem como serviços de terceiros, precisam ser autorizados a fazer isso com as permissões adequadas do Identity and Access Management (IAM), especificamente, workflows.callbacks.send (incluída no papel de Invocador do Workflows).

Fazer uma solicitação direta

A maneira mais simples de criar credenciais de curta duração para uma conta de serviço é fazendo uma solicitação direta. Duas identidades estão envolvidas nesse fluxo: o autor da chamada e a conta de serviço para quem a credencial é criada. A chamada para o fluxo de trabalho básico desta página é um exemplo de solicitação direta. Para mais informações, consulte Usar o IAM para controlar o acesso e Permissões de solicitação direta.

Gerar um token de acesso do OAuth 2.0

Para autorizar um aplicativo a chamar o endpoint de callback, gere um token de acesso do OAuth 2.0 para a conta de serviço associada ao fluxo de trabalho. Supondo que você tenha as permissões necessárias (para os papéis Workflows Editor ou Workflows Admin e Service Account Token Creator), também é possível gerar um token executando o método generateAccessToken.

Se a solicitação generateAccessToken tiver êxito, o corpo da resposta retornada terá um token de acesso do OAuth 2.0 e um prazo de validade. (Por padrão, os tokens de acesso do OAuth 2.0 são válidos por no máximo uma hora.) Exemplo:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
O código accessToken pode ser usado em uma chamada curl para o URL do endpoint de callback, como nos exemplos a seguir:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Gerar um token do OAuth para uma função do Cloud

Se você invocar um callback de uma função do Cloud usando a mesma conta de serviço do fluxo de trabalho e no mesmo projeto, será possível gerar um token de acesso do OAuth na própria função. Exemplo:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Para mais contexto, consulte o tutorial sobre Como criar um fluxo de trabalho human-in-the-loop usando callbacks.

Solicitar acesso off-line

Os tokens de acesso expiram periodicamente e se tornam credenciais inválidas para uma solicitação de API relacionada. É possível atualizar um token de acesso sem solicitar a permissão do usuário se você solicitou acesso off-line aos escopos associados ao token. A solicitação de acesso off-line é um requisito para qualquer aplicativo que precise acessar uma API Google quando o usuário não estiver presente. Para mais informações, consulte Como atualizar um token de acesso (off-line).

Testar um fluxo de trabalho básico de callback

É possível criar um fluxo de trabalho básico e testar a chamada para o endpoint de callback desse fluxo de trabalho usando curl. É necessário ter as permissões necessárias Workflows Editor ou Workflows Admin para o projeto do fluxo de trabalho.

  1. Crie e implante o fluxo de trabalho a seguir e execute.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Depois de executar o fluxo de trabalho, o estado dele será ACTIVE até que a solicitação de callback seja recebida ou o tempo limite seja atingido.

  2. Confirme o estado de execução e recupere o URL do callback:

    Console

    1. No console do Google Cloud, acesse a página Fluxos de trabalho:

      Acessar o Workflows
    2. Clique no nome do fluxo de trabalho que você acabou de executar.

      O estado da execução do fluxo de trabalho é exibido.

    3. Clique na guia Registros.
    4. Procure uma entrada de registro semelhante a esta:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Copie o URL de callback a ser usado no próximo comando.

    gcloud

    1. Primeiro, recupere o ID de execução:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Substitua DURATION por um período apropriado para limitar as entradas de registro retornadas (se você executou o fluxo de trabalho várias vezes).

      Por exemplo, --freshness=t10m retorna entradas de registro de até 10 minutos. Para mais detalhes, consulte gcloud topic datetimes.

      O ID de execução é retornado. Observe que o URL de callback também é retornado no campo textPayload. Copie os dois valores para usar nas etapas a seguir.

    2. Execute este comando:
      
                    O estado da execução do fluxo de trabalho é retornado.
                
  3. Agora é possível chamar o endpoint de callback usando um comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    Observe que, para um endpoint POST, você precisa usar um cabeçalho de representação Content-Type. Exemplo:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    Substitua CALLBACK_URL pelo URL que você copiou na etapa anterior.

  4. Com o console do Google Cloud ou a Google Cloud CLI, confirme se o estado da execução do fluxo de trabalho agora é SUCCEEDED.
  5. Procure a entrada de registro com um textPayload semelhante a este:
    Received {"body":null,"headers":...
    

Amostras

Estas amostras demonstram a sintaxe.

Detectar erros de tempo limite

Esta amostra complementa a anterior porque detecta qualquer erro de tempo limite e grava-os no registro do sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Aguardar um callback em um loop de repetição

Esta amostra modifica a anterior implementando uma etapa de repetição. Usando um predicado de repetição personalizado, o fluxo de trabalho registra um aviso quando ocorre um tempo limite e, em seguida, repete a aguarda o endpoint de callback até cinco vezes. Se a cota de novas tentativas esgotar antes de o callback ser recebido, o erro de tempo limite final fará com que o fluxo de trabalho falhe.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

A seguir