Aguarde através de chamadas de retorno

Os callbacks permitem que as execuções de fluxos de trabalho aguardem que outro serviço faça um pedido ao ponto final de callback. Esse pedido retoma a execução do fluxo de trabalho.

Com os callbacks, pode sinalizar ao seu fluxo de trabalho que ocorreu um evento especificado e aguardar esse evento sem sondagem. Por exemplo, pode criar um fluxo de trabalho que lhe envia uma notificação quando um produto volta a estar em stock ou quando um artigo foi enviado, ou que aguarda para permitir a interação humana, como rever uma encomenda ou validar uma tradução.

Esta página mostra-lhe como criar um fluxo de trabalho que suporta um ponto final de retorno de chamada e que aguarda a chegada de pedidos HTTP de processos externos a esse ponto final. Também pode aguardar eventos através de callbacks e acionadores do Eventarc.

As chamadas de retorno requerem a utilização de duas funções incorporadas da biblioteca padrão:

Crie um ponto final que receba um pedido de retorno de chamada

Crie um ponto final de retorno de chamada que possa receber pedidos HTTP para chegar a esse ponto final.

  1. Siga os passos para criar um novo fluxo de trabalho ou escolha um fluxo de trabalho existente para atualizar, mas ainda não o implemente.
  2. Na definição do fluxo de trabalho, adicione um passo para criar um ponto final de retorno de chamada:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Substitua METHOD pelo método HTTP esperado, um de GET, HEAD, POST, PUT, DELETE, OPTIONS ou PATCH. A predefinição é POST.

    O resultado é um mapa, callback_details, com um campo url que armazena o URL do ponto final criado.

    O ponto final de callback está agora pronto para receber pedidos recebidos com o método HTTP especificado. O URL do ponto final criado pode ser usado para acionar o callback a partir de um processo externo ao fluxo de trabalho; por exemplo, transmitindo o URL a uma função do Cloud Run.

  3. Na definição do fluxo de trabalho, adicione um passo para aguardar um pedido de retorno de chamada:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Substitua TIMEOUT pelo número máximo de segundos que o fluxo de trabalho deve aguardar por um pedido. A predefinição é 43200 (12 horas). Se o tempo expirar antes de ser recebido um pedido, é gerado um TimeoutError.

    Tenha em atenção que existe uma duração de execução máxima. Para mais informações, consulte o limite de pedidos.

    O mapa callback_details do passo anterior create_callback é transmitido como um argumento.

  4. Implemente o fluxo de trabalho para concluir a criação ou a atualização.

    Quando é recebido um pedido, todos os detalhes do pedido são armazenados no mapa callback_request. Em seguida, tem acesso a todo o pedido HTTP, incluindo o respetivo cabeçalho, corpo e um mapa para quaisquer parâmetros de consulta.query Por exemplo:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Se o corpo HTTP for texto ou JSON, o Workflows tenta descodificar o corpo; caso contrário, são devolvidos bytes não processados.

Autorize pedidos ao ponto final de callback

Para enviar um pedido a um ponto final de callback, Google Cloud os serviços como o Cloud Run e as funções do Cloud Run, bem como os serviços de terceiros, têm de estar autorizados a fazê-lo através das autorizações de gestão de identidade e de acesso (IAM) adequadas; especificamente, workflows.callbacks.send (incluídas na função Workflows Invoker).

Faça um pedido direto

A forma mais simples de criar credenciais de curta duração para uma conta de serviço é fazer um pedido direto. Existem duas identidades envolvidas neste fluxo: o autor da chamada e a conta de serviço para a qual a credencial é criada. A chamada para o fluxo de trabalho básico nesta página é um exemplo de um pedido direto. Para mais informações, consulte os artigos Use a IAM para controlar o acesso e Autorizações de pedidos diretos.

Gere uma chave de acesso OAuth 2.0

Para autorizar uma aplicação a chamar o ponto final de callback, pode gerar um token de acesso OAuth 2.0 para a conta de serviço associada ao fluxo de trabalho. Partindo do princípio de que tem as autorizações necessárias (para as funções Workflows Editor ou Workflows Admin e Service Account Token Creator), também pode gerar um token por si próprio executando o método generateAccessToken.

Se o pedido generateAccessToken for bem-sucedido, o corpo da resposta devolvido contém uma chave de acesso OAuth 2.0 e um prazo de validade. (Por predefinição, as chaves de acesso de OAuth 2.0 são válidas durante 1 hora no máximo.) Por exemplo:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
O código accessToken pode ser usado numa chamada curl para o URL do ponto final de retorno de chamada, como nos seguintes exemplos:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Gere um token OAuth para uma função do Cloud Run

Se estiver a invocar um callback a partir de uma função do Cloud Run com a mesma conta de serviço que o fluxo de trabalho e no mesmo projeto, pode gerar um token de acesso OAuth na própria função. Por exemplo:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Para mais contexto, consulte o tutorial sobre como criar um fluxo de trabalho com intervenção humana através de callbacks.

Peça acesso offline

As chaves de acesso expiram periodicamente e tornam-se credenciais inválidas para um pedido de API relacionado. Pode atualizar um token de acesso sem pedir permissão ao utilizador se tiver pedido acesso offline aos âmbitos associados ao token. Pedir acesso offline é um requisito para qualquer aplicação que precise de aceder a uma API Google quando o utilizador não está presente. Para mais informações, consulte o artigo Atualizar um token de acesso (acesso offline).

Invocar um fluxo de trabalho exatamente uma vez através de callbacks

Os callbacks são totalmente idempotentes, o que significa que pode tentar novamente um callback se falhar sem produzir resultados inesperados ou efeitos secundários.

Depois de criar um ponto final de retorno, o URL está pronto para receber pedidos recebidos e é normalmente devolvido a um autor da chamada antes de a chamada correspondente para await_callback ser feita. No entanto, se o URL de retorno de chamada ainda não tiver sido recebido quando o passo await_callback for executado, a execução do fluxo de trabalho é bloqueada até que o ponto final seja recebido (ou ocorra um limite de tempo). Após a receção, a execução do fluxo de trabalho é retomada e o callback é processado.

Depois de executar o passo create_callback_endpoint e criar um ponto final de chamada de retorno, fica disponível um único espaço de chamada de retorno para o fluxo de trabalho. Quando é recebido um pedido de retorno de chamada, este espaço é preenchido com a carga útil do retorno de chamada até que o retorno de chamada possa ser processado. Quando o passo await_callback é executado, o callback é processado e o espaço é esvaziado e disponibilizado para outro callback. Em seguida, pode reutilizar o ponto final de retorno de chamada e ligar await_callback novamente.

Se await_callback for chamado apenas uma vez, mas for recebido um segundo callback, ocorre um dos seguintes cenários e é devolvido um código de estado HTTP adequado:

  • O HTTP 429: Too Many Requests indica que o primeiro callback foi recebido com êxito, mas não foi processado. Continua a aguardar o processamento. O segundo retorno é rejeitado pelo fluxo de trabalho.

  • O HTTP 200: Success indica que o primeiro callback foi recebido com êxito e foi devolvida uma resposta. A segunda chamada de retorno é armazenada e pode nunca ser processada, a menos que await_callback seja chamada uma segunda vez. Se o fluxo de trabalho terminar antes disso, o segundo pedido de retorno de chamada nunca é processado e é rejeitado.

  • HTTP 404: Page Not Found indica que o fluxo de trabalho já não está em execução. A primeira chamada de retorno foi processada e o fluxo de trabalho foi concluído, ou o fluxo de trabalho falhou. Para determinar isto, tem de consultar o estado de execução do fluxo de trabalho.

Chamadas de retorno paralelas

Quando os passos são executados em paralelo e um callback é criado por um thread principal e aguardado nos passos secundários, segue-se o mesmo padrão descrito anteriormente.

No exemplo seguinte, quando o passo create_callback_endpoint é executado, é criado um espaço de preenchimento de callback. Cada chamada subsequente para await_callback abre um novo espaço de retorno de chamada. É possível fazer dez chamadas de retorno em simultâneo se todos os threads estiverem em execução e a aguardar antes de fazer um pedido de chamada de retorno. Podem ser feitas chamadas de retorno adicionais, mas são armazenadas e nunca processadas.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Tenha em atenção que os retornos de chamada são processados pela mesma ordem em que cada chamada é feita por uma sucursal para await_callback. No entanto, a ordem de execução das ramificações não é determinística e pode chegar a um resultado através de vários caminhos. Para mais informações, consulte Passos paralelos.

Experimente um fluxo de trabalho de chamada de resposta básico

Pode criar um fluxo de trabalho básico e, em seguida, testar a chamada para o ponto final de retorno de chamada desse fluxo de trabalho usando o curl. Tem de ter as autorizações Workflows Editor ou Workflows Admin necessárias para o projeto no qual o fluxo de trabalho reside.

  1. Crie e implemente o seguinte fluxo de trabalho e, de seguida, execute-o.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Após a execução do fluxo de trabalho, o estado da execução do fluxo de trabalho é ACTIVE até que o pedido de retorno de chamada seja recebido ou o tempo limite expire.

  2. Confirme o estado de execução e obtenha o URL de retorno:

    Consola

    1. Na Google Cloud consola, aceda à página Fluxos de trabalho:

      Aceda a Fluxos de trabalho
    2. Clique no nome do fluxo de trabalho que acabou de executar.

      É apresentado o estado da execução do fluxo de trabalho.

    3. Clique no separador Registos.
    4. Procure uma entrada de registo semelhante à seguinte:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. Copie o URL de retorno de chamada para usar no comando seguinte.

    gcloud

    1. Primeiro, obtenha o ID de execução:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      Substitua DURATION por uma quantidade adequada de tempo para limitar as entradas do registo devolvidas (se tiver executado o fluxo de trabalho várias vezes).

      Por exemplo, --freshness=t10m devolve entradas do registo com uma antiguidade máxima de 10 minutos. Para obter mais detalhes, consulte gcloud topic datetimes.

      O ID de execução é devolvido. Tenha em atenção que o URL de retorno também é devolvido no campo textPayload. Copie ambos os valores para usar nos passos seguintes.

    2. Execute o seguinte comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      É devolvido o estado da execução do fluxo de trabalho.
  3. Agora, pode chamar o ponto final de retorno de chamada através de um comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    Tenha em atenção que, para um ponto final POST, tem de usar um cabeçalho de representação Content-Type. Por exemplo:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    Substitua CALLBACK_URL pelo URL que copiou no passo anterior.

  4. Através da Google Cloud consola ou da CLI do Google Cloud, confirme que o estado da execução do fluxo de trabalho é agora SUCCEEDED.
  5. Procure a entrada de registo com o textPayload devolvido semelhante ao seguinte:
    Received {"body":null,"headers":...

Amostras

Estes exemplos demonstram a sintaxe.

Detete erros de limite de tempo

Este exemplo adiciona ao exemplo anterior a captura de erros de limite de tempo e a escrita dos erros no registo do sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Aguardar uma chamada de retorno num ciclo de novas tentativas

Este exemplo modifica o exemplo anterior implementando um passo de repetição. Usando um predicado de repetição personalizado, o fluxo de trabalho regista um aviso quando ocorre um limite de tempo e, em seguida, tenta novamente a espera no ponto final de retorno de chamada até cinco vezes. Se a quota de novas tentativas for esgotada antes de receber a chamada de retorno, o erro de limite de tempo final faz com que o fluxo de trabalho falhe.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

O que se segue?