Aguardar usando callbacks

Os callbacks permitem que as execuções de fluxo de trabalho aguardem outro serviço para fazer uma solicitação ao endpoint de callback. Essa solicitação retoma a execução do fluxo de trabalho.

Com os callbacks, é possível sinalizar para o fluxo de trabalho que evento especificado ocorreu e aguardar esse evento sem a necessidade de pesquisa. Por exemplo, é possível criar um fluxo de trabalho que notifique quando um produto está em estoque novamente ou quando um item é enviado, ou que pause para permitir uma interação humana, como revisar um pedido ou validar uma tradução.

Nesta página, mostramos como criar um fluxo de trabalho compatível com um endpoint de callback e que aguarda solicitações HTTP de processos externos chegarem a esse endpoint. Você também pode aguardar eventos usando callbacks e gatilhos do Eventarc.

Os callbacks exigem duas funções integradas da biblioteca padrão:

Criar um endpoint que receba uma solicitação de callback

Crie um endpoint de callback que recebe solicitações HTTP para chegar a esse endpoint.

  1. Siga as etapas para criar um novo fluxo de trabalho ou escolha um fluxo de trabalho atual para atualizar, mas não o implante ainda.
  2. Na definição do fluxo de trabalho, adicione uma etapa para criar um endpoint de callback:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Substitua METHOD pelo método HTTP esperado, um dentre GET, HEAD, POST, PUT, DELETE, OPTIONS ou PATCH O padrão é POST.

    O resultado é um mapa, callback_details, com um campo url que armazena o URL do endpoint criado.

    O endpoint do callback já está pronto para receber solicitações com o método HTTP especificado. O URL do endpoint criado pode ser usado para acionar o callback de um processo externo ao fluxo de trabalho. Por exemplo, transmitindo o URL para uma função do Cloud Run.

  3. Na definição do fluxo de trabalho, adicione uma etapa para aguardar uma solicitação de callback:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Substitua TIMEOUT pelo número máximo de segundos que o fluxo de trabalho precisa aguardar uma solicitação. O padrão é 43200 (12 horas). Se o tempo decorrido antes do recebimento de uma solicitação, será gerado um TimeoutError.

    Há uma duração máxima de execução. Para mais informações, consulte o limite de solicitações.

    O mapa callback_details da etapa anterior create_callback é transmitido como um argumento.

  4. Implante seu fluxo de trabalho para concluir a criação ou atualização.

    Quando uma solicitação é recebida, todos os detalhes dela são armazenados no mapa callback_request. Assim, você tem acesso a toda a solicitação HTTP, incluindo o cabeçalho, o corpo e um mapa query para qualquer parâmetro de consulta. Exemplo:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    Se o corpo do HTTP for em texto ou JSON, o Workflows tentará decodificar o corpo. Caso contrário, serão retornados bytes brutos.

Autorizar solicitações para o endpoint de callback

Para enviar uma solicitação a um endpoint de callback, os serviços do Google Cloud, como o Cloud Run e as funções do Cloud Run, bem como serviços de terceiros, precisam ser autorizados para fazer isso pelas permissões adequadas do Identity and Access Management (IAM), especificamente workflows.callbacks.send (incluído na função Invocador de fluxos de trabalho).

Fazer uma solicitação direta

A maneira mais simples de criar credenciais de curta duração para uma conta de serviço é fazendo uma solicitação direta. Duas identidades estão envolvidas nesse fluxo: o autor da chamada e a conta de serviço para quem a credencial é criada. A chamada para o fluxo de trabalho básico desta página é um exemplo de solicitação direta. Para mais informações, consulte Usar o IAM para controlar o acesso e Permissões de solicitação direta.

Gerar um token de acesso do OAuth 2.0

Para autorizar um aplicativo a chamar o endpoint de callback, gere um token de acesso do OAuth 2.0 para a conta de serviço associada ao fluxo de trabalho. Supondo que você tenha as permissões necessárias (para os papéis Workflows Editor ou Workflows Admin e Service Account Token Creator), também é possível gerar um token executando o método generateAccessToken.

Se a solicitação generateAccessToken tiver êxito, o corpo da resposta retornada terá um token de acesso do OAuth 2.0 e um prazo de validade. (Por padrão, os tokens de acesso do OAuth 2.0 são válidos por no máximo uma hora.) Exemplo:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
O código accessToken pode ser usado em uma chamada curl no URL do endpoint de callback, como nos exemplos a seguir:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Gerar um token do OAuth para uma função do Cloud Run

Se você invocar um callback de uma função do Cloud Run usando a mesma conta de serviço do fluxo de trabalho e no mesmo projeto, será possível gerar um token de acesso do OAuth na própria função. Exemplo:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

Para mais contexto, consulte o tutorial sobre Como criar um fluxo de trabalho human-in-the-loop usando callbacks.

Solicitar acesso off-line

Os tokens de acesso expiram periodicamente e se tornam credenciais inválidas para uma solicitação de API relacionada. É possível atualizar um token de acesso sem solicitar a permissão do usuário se você solicitou acesso off-line aos escopos associados ao token. A solicitação de acesso off-line é um requisito para qualquer aplicativo que precise acessar uma API Google quando o usuário não estiver presente. Para mais informações, consulte Como atualizar um token de acesso (off-line).

Invocar um fluxo de trabalho exatamente uma vez usando callbacks

Os callbacks são totalmente idempotentes, o que significa que você pode tentar novamente um callback se ele falhar sem produzir resultados inesperados ou efeitos colaterais.

Depois de criar um endpoint de callback, o URL fica pronto para receber solicitações de entrada e geralmente é retornado para um autor da chamada antes que a chamada correspondente para await_callback seja feita. No entanto, se o URL do callback ainda não tiver sido recebido quando a etapa await_callback for executada, a execução do fluxo de trabalho será bloqueada até que o endpoint seja recebido (ou um tempo limite ocorra). Depois de recebida, a execução do fluxo de trabalho é retomada e o callback é processado.

Depois de executar a etapa create_callback_endpoint e criar um endpoint de callback, um único slot de callback fica disponível para o fluxo de trabalho. Quando uma solicitação de callback é recebida, esse slot é preenchido com o payload do callback até que o callback possa ser processado. Quando a etapa await_callback é executada, o callback é processado, o slot é esvaziado e disponibilizado para outro callback. Em seguida, você pode reutilizar o endpoint de callback e chamar await_callback novamente.

Se await_callback for chamado apenas uma vez, mas uma segunda callback for recebida, um dos seguintes cenários ocorrerá e um código de status HTTP apropriado será retornado:

  • O HTTP 429: Too Many Requests indica que o primeiro callback foi recebido com sucesso, mas não foi processado. Ele ainda está aguardando o processamento. O segundo callback é rejeitado pelo fluxo de trabalho.

  • O HTTP 200: Success indica que o primeiro callback foi recebido com sucesso e uma resposta foi retornada. O segundo callback é armazenado e pode nunca ser processado, a menos que await_callback seja chamado uma segunda vez. Se o fluxo de trabalho for encerrado antes disso, a segunda solicitação de callback nunca será processada e será descartada.

  • O HTTP 404: Page Not Found indica que o fluxo de trabalho não está mais em execução. O primeiro callback foi processado e o fluxo de trabalho foi concluído ou falhou. Para determinar isso, você precisa consultar o estado de execução do fluxo de trabalho.

Callbacks paralelos

Quando as etapas são executadas em paralelo e um callback é criado por uma linha de execução mãe e aguardado em etapas filhas, o mesmo padrão descrito anteriormente é seguido.

No exemplo a seguir, quando a etapa create_callback_endpoint é executada, um slot de callback é criado. Cada chamada subsequente para await_callback abre um novo slot de callback. Dez callbacks podem ser feitos simultaneamente, se todas as linhas de execução estiverem em execução e aguardando antes que uma solicitação de callback seja feita. Outros callbacks poderiam ser feitos, mas seriam armazenados e nunca processados.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

Os callbacks são processados na mesma ordem em que cada chamada é feita por um ramo para await_callback. No entanto, a ordem de execução das ramificações não é determinística e pode chegar a um resultado usando vários caminhos. Para mais informações, consulte Etapas paralelas.

Testar um fluxo de trabalho básico de callback

É possível criar um fluxo de trabalho básico e testar a chamada para o endpoint de callback desse fluxo de trabalho usando curl. É necessário ter as permissões necessárias Workflows Editor ou Workflows Admin para o projeto do fluxo de trabalho.

  1. Crie e implante o fluxo de trabalho a seguir e execute-o.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    Depois de executar o fluxo de trabalho, o estado dele será ACTIVE até que a solicitação de callback seja recebida ou até que o tempo limite expire.

  2. Confirme o estado de execução e recupere o URL do callback:

    Console

    1. No console do Google Cloud, acesse a página Fluxos de trabalho:

      Acessar fluxos de trabalho
    2. Clique no nome do fluxo de trabalho que você acabou de executar.

      O estado da execução do fluxo de trabalho é exibido.

    3. Clique na guia Registros.
    4. Procure uma entrada de registro semelhante a esta:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. Copie o URL de callback a ser usado no próximo comando.

    gcloud

    1. Primeiro, recupere o ID de execução:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      Substitua DURATION por um período apropriado para limitar as entradas de registro retornadas (se você executou o fluxo de trabalho várias vezes).

      Por exemplo, --freshness=t10m retorna entradas de registro de até 10 minutos. Para mais detalhes, consulte gcloud topic datetimes.

      O ID de execução é retornado. Observe que o URL de callback também é retornado no campo textPayload. Copie os dois valores para usar nas etapas a seguir.

    2. Execute este comando:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      O estado da execução do fluxo de trabalho é retornado.
  3. Agora é possível chamar o endpoint de callback usando um comando curl:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    Para um endpoint POST, é necessário usar um cabeçalho de representação Content-Type. Exemplo:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    Substitua CALLBACK_URL pelo URL que você copiou na etapa anterior.

  4. No console do Google Cloud ou usando a Google Cloud CLI, confirme se o estado da execução do fluxo de trabalho agora é SUCCEEDED.
  5. Procure a entrada de registro com um textPayload semelhante a este:
    Received {"body":null,"headers":...

Amostras

Estas amostras demonstram a sintaxe.

Detectar erros de tempo limite

Esta amostra complementa a anterior porque detecta qualquer erro de tempo limite e grava-os no registro do sistema.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Aguardar um callback em um loop de repetição

Esta amostra modifica a anterior implementando uma etapa de repetição. Usando um predicado de repetição personalizado, o fluxo de trabalho registra um aviso quando ocorre um tempo limite e, em seguida, repete a aguarda o endpoint de callback até cinco vezes. Se a cota de novas tentativas esgotar antes de o callback ser recebido, o erro de tempo limite final fará com que o fluxo de trabalho falhe.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

A seguir