Aguardar usando callbacks

Os callbacks permitem que as execuções de fluxo de trabalho aguardem outro serviço para fazer uma solicitação ao endpoint de callback. Essa solicitação retoma a execução do fluxo de trabalho.

Com os callbacks, é possível sinalizar para o fluxo de trabalho que evento especificado ocorreu e aguardar esse evento sem a necessidade de pesquisa. Por exemplo, é possível criar um fluxo de trabalho que notifica quando um produto volta ao estoque ou quando um item é enviado ou que aguarda a interação humana, como a revisão de um pedido ou a validação de uma tradução.

Nesta página, mostramos como criar um fluxo de trabalho compatível com um endpoint de callback e que aguarda solicitações HTTP de processos externos chegarem a esse endpoint. Você também pode aguardar eventos usando callbacks e gatilhos do Eventarc.

Os callbacks exigem duas funções integradas da biblioteca padrão:

Criar um endpoint que receba uma solicitação de callback

Crie um endpoint de callback que recebe solicitações HTTP para chegar a esse endpoint.

  1. Siga as etapas para criar um novo fluxo de trabalho ou escolha um fluxo de trabalho atual para atualizar, mas não o implante ainda.
  2. Na definição do fluxo de trabalho, adicione uma etapa para criar um endpoint de callback:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Substitua METHOD pelo método HTTP esperado, um dentre GET, HEAD, POST, PUT, DELETE, OPTIONS ou PATCH O padrão é POST.

    O resultado é um mapa, callback_details, com um campo url que armazena o URL do endpoint criado.

    O endpoint do callback já está pronto para receber solicitações com o método HTTP especificado. O URL do endpoint criado pode ser usado para acionar o callback de um processo externo ao fluxo de trabalho, por exemplo, transmitindo o URL para uma função do Cloud.

  3. Na definição do fluxo de trabalho, adicione uma etapa para aguardar uma solicitação de callback:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Substitua TIMEOUT pelo número máximo de segundos que o fluxo de trabalho precisa aguardar uma solicitação. O padrão é 43200 (12 horas). Se o tempo decorrido antes do recebimento de uma solicitação, será gerado um TimeoutError.

    Há uma duração máxima de execução. Para mais informações, consulte o limite de solicitações.

    O mapa callback_details da etapa anterior create_callback é transmitido como um argumento.

  4. Implante seu fluxo de trabalho para concluir a criação ou atualização.

    Quando uma solicitação é recebida, todos os detalhes dela são armazenados no mapa callback_request. Assim, você tem acesso a toda a solicitação HTTP, incluindo o cabeçalho, o corpo e um mapa query para qualquer parâmetro de consulta. Exemplo:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Se o corpo do HTTP for em texto ou JSON, o Workflows tentará decodificar o corpo. Caso contrário, serão retornados bytes brutos.

Autorizar solicitações para o endpoint de callback

Para enviar uma solicitação a um endpoint de callback, os serviços do Google Cloud, como Cloud Run e Cloud Functions, bem como serviços de terceiros, precisam ser autorizados a fazer isso com as permissões adequadas do Identity and Access Management (IAM), especificamente workflows.callbacks.send (incluído no papel Invocador do Workflows).

Fazer uma solicitação direta

A maneira mais simples de criar credenciais de curta duração para uma conta de serviço é fazendo uma solicitação direta. Duas identidades estão envolvidas nesse fluxo: o autor da chamada e a conta de serviço para quem a credencial é criada. A chamada para o fluxo de trabalho básico desta página é um exemplo de solicitação direta. Para mais informações, consulte Usar o IAM para controlar o acesso e Permissões de solicitação direta.

Gerar um token de acesso do OAuth 2.0

Para autorizar um aplicativo a chamar o endpoint de callback, gere um token de acesso do OAuth 2.0 para a conta de serviço associada ao fluxo de trabalho. Supondo que você tenha as permissões necessárias (para os papéis Workflows Editor ou Workflows Admin e Service Account Token Creator), também é possível gerar um token executando o método generateAccessToken.

Se a solicitação generateAccessToken tiver êxito, o corpo da resposta retornada terá um token de acesso do OAuth 2.0 e um prazo de validade. (Por padrão, os tokens de acesso do OAuth 2.0 são válidos por no máximo uma hora.) Exemplo:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
O código accessToken pode ser usado em uma chamada curl para o URL do endpoint de callback, como nos exemplos a seguir:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Gerar um token do OAuth para uma função do Cloud

Se você invocar um callback de uma função do Cloud usando a mesma conta de serviço do fluxo de trabalho e no mesmo projeto, será possível gerar um token de acesso do OAuth na própria função. Exemplo:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Para mais contexto, consulte o tutorial sobre Como criar um fluxo de trabalho human-in-the-loop usando callbacks.

Solicitar acesso off-line

Os tokens de acesso expiram periodicamente e se tornam credenciais inválidas para uma solicitação de API relacionada. É possível atualizar um token de acesso sem solicitar a permissão do usuário se você solicitou acesso off-line aos escopos associados ao token. A solicitação de acesso off-line é um requisito para qualquer aplicativo que precise acessar uma API Google quando o usuário não estiver presente. Para mais informações, consulte Como atualizar um token de acesso (off-line).

Invocar um fluxo de trabalho exatamente uma vez usando callbacks

Os callbacks são totalmente idempotentes, ou seja, você pode repetir um callback se ele falhar, sem produzir resultados ou efeitos colaterais inesperados.

Depois de criar um endpoint de callback, o URL estará pronto para receber solicitações recebidas e geralmente será retornado ao autor da chamada antes que a chamada correspondente para await_callback seja feita. No entanto, se o URL de callback ainda não tiver sido recebido quando a etapa await_callback for executada, a execução do fluxo de trabalho será bloqueada até que o endpoint seja recebido (ou até atingir o tempo limite). Depois de recebido, a execução do fluxo de trabalho é retomada e o callback é processado.

Depois de executar a etapa create_callback_endpoint e criar um endpoint de callback, um único slot de callback ficará disponível para o fluxo de trabalho. Quando uma solicitação de callback é recebida, esse slot é preenchido com o payload de callback até que o callback possa ser processado. Quando a etapa await_callback é executada, o callback é processado, e o slot é esvaziado e disponibilizado para outro callback. Em seguida, reutilize o endpoint de callback e chame await_callback novamente.

Se await_callback for chamado apenas uma vez, mas um segundo callback for recebido, um dos seguintes cenários vai ocorrer e um código de status HTTP apropriado será retornado:

  • O HTTP 429: Too Many Requests indica que o primeiro callback foi recebido, mas não foi processado. Ele permanece aguardando o processamento. O segundo callback é rejeitado pelo fluxo de trabalho.

  • O HTTP 200: Success indica que o primeiro callback foi recebido e que uma resposta foi retornada. O segundo callback é armazenado e pode nunca ser processado, a menos que await_callback seja chamado uma segunda vez. Se o fluxo de trabalho terminar antes disso, a segunda solicitação de callback nunca será processada e será descartada.

  • O HTTP 404: Page Not Found indica que o fluxo de trabalho não está mais em execução. O primeiro callback foi processado e o fluxo de trabalho foi concluído ou o fluxo de trabalho falhou. Para determinar isso, você precisará consultar o estado de execução do fluxo de trabalho.

Callbacks paralelos

Quando as etapas são executadas em paralelo e um callback é criado por uma linha de execução pai e esperado nas etapas filhas, o mesmo padrão descrito anteriormente é seguido.

No exemplo a seguir, quando a etapa create_callback_endpoint é executada, um slot de callback é criado. Cada chamada subsequente para await_callback abre um novo slot de callback. Dez callbacks podem ser feitos simultaneamente, se todas as linhas de execução estiverem em execução e aguardando antes que uma solicitação de callback seja feita. Outros callbacks poderiam ser feitos, mas seriam armazenados e nunca processados.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Os callbacks são processados na mesma ordem em que cada chamada é feita por uma ramificação para await_callback. No entanto, a ordem de execução das ramificações não é determinista e pode chegar a um resultado usando vários caminhos. Para mais informações, consulte Etapas paralelas.

Testar um fluxo de trabalho básico de callback

É possível criar um fluxo de trabalho básico e testar a chamada para o endpoint de callback desse fluxo de trabalho usando curl. É necessário ter as permissões necessárias Workflows Editor ou Workflows Admin para o projeto do fluxo de trabalho.

  1. Crie e implante o fluxo de trabalho a seguir e execute.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Depois de executar o fluxo de trabalho, o estado de execução dele será ACTIVE até que a solicitação de callback seja recebida ou até que o tempo limite expire.

  2. Confirme o estado de execução e recupere o URL do callback:

    Console

    1. No console do Google Cloud, acesse a página Fluxos de trabalho:

      Acessar o Workflows
    2. Clique no nome do fluxo de trabalho que você acabou de executar.

      O estado da execução do fluxo de trabalho é exibido.

    3. Clique na guia Registros.
    4. Procure uma entrada de registro semelhante a esta:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Copie o URL de callback a ser usado no próximo comando.

    gcloud

    1. Primeiro, recupere o ID de execução:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Substitua DURATION por um período adequado para limitar as entradas de registro retornadas (se você tiver executado o fluxo de trabalho várias vezes).

      Por exemplo, --freshness=t10m retorna entradas de registro de até 10 minutos. Para mais detalhes, consulte gcloud topic datetimes.

      O ID de execução é retornado. Observe que o URL de callback também é retornado no campo textPayload. Copie os dois valores para usar nas etapas a seguir.

    2. Execute o seguinte comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      
      O estado de execução do fluxo de trabalho é retornado.
  3. Agora é possível chamar o endpoint de callback usando um comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    Observe que, para um endpoint POST, você precisa usar um cabeçalho de representação Content-Type. Exemplo:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    Substitua CALLBACK_URL pelo URL que você copiou na etapa anterior.

  4. Por meio do console do Google Cloud ou usando a Google Cloud CLI, confirme se o estado da execução do fluxo de trabalho agora é SUCCEEDED.
  5. Procure a entrada de registro com o textPayload retornado semelhante a este:
    Received {"body":null,"headers":...
    

Amostras

Estas amostras demonstram a sintaxe.

Detectar erros de tempo limite

Esta amostra complementa a anterior porque detecta qualquer erro de tempo limite e grava-os no registro do sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Aguardar um callback em um loop de repetição

Esta amostra modifica a anterior implementando uma etapa de repetição. Usando um predicado de repetição personalizado, o fluxo de trabalho registra um aviso quando ocorre um tempo limite e, em seguida, repete a aguarda o endpoint de callback até cinco vezes. Se a cota de novas tentativas esgotar antes de o callback ser recebido, o erro de tempo limite final fará com que o fluxo de trabalho falhe.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

A seguir