Wait using callbacks

Callbacks allow workflow executions to wait for another service to make a request to the callback endpoint; that request resumes the execution of the workflow.

With callbacks, you can signal to your workflow that a specified event has occurred, and wait on that event without polling. For example, you can create a workflow that notifies you when a product is back in stock or when an item has shipped; or that pauses to allow human interaction such as reviewing an order or validating a translation.

This page shows you how to create a workflow that supports a callback endpoint, and that waits for HTTP requests from external processes to arrive at that endpoint.

Callbacks require the use of two standard library built-in functions:

Before you begin

Make sure that you have access to this feature by requesting to have your project added to private previews for Google Cloud Workflows.

Create an endpoint that receives a callback request

Create a callback endpoint that can receive HTTP requests to arrive at that endpoint.

  1. Follow the steps to create a new workflow, or choose an existing workflow to update but do not deploy it yet.
  2. In the workflow's definition, add a step to create a callback endpoint:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    Replace METHOD with the expected HTTP method, one of GET, HEAD, POST, PUT, DELETE, OPTIONS, or PATCH. The default is POST.

    The result is a map, callback_details, with an url field that stores the URL of the created endpoint.

    The callback endpoint is now ready to receive incoming requests with the specified HTTP method. The URL of the created endpoint can be used to trigger the callback from a process external to the workflow; for example, by passing the URL to a Cloud Function.

  3. In the workflow's definition, add a step to wait for a callback request:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT_LIMIT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT_LIMIT
              },
              "result": "callback_request"
            }
          }
        ]
          

    Replace TIMEOUT_LIMIT with the maximum number of seconds that the workflow should wait for a request. The default is 43200 (12 hours). If the time elapses before a request is received, a TimeoutError is raised.

    The callback_details map from the earlier create_callback step is passed as an argument.

  4. Deploy your workflow to finish creating or updating it.

    When a request is received, all the details of the request are stored in the callback_request map. You then have access to the entire HTTP request, including its header, body, and a query map for any query parameters. For example:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    If the HTTP body is text or JSON, Workflows will attempt to decode the body; otherwise, raw bytes are returned.

Authorize requests to the callback endpoint

To send a request to a callback endpoint, Google Cloud services such as Cloud Run and Cloud Functions, as well as third-party services, must be authorized to do so by having the appropriate Identity and Access Management (IAM) permissions, specifically those of the Workflows Editor or Workflows Admin roles.

Make a direct request

The simplest way to create short-lived credentials for a service account is to make a direct request. Two identities are involved in this flow: the caller, and the service account for whom the credential is created. The call to the basic workflow on this page is an example of a direct request. For more information, see Direct request permissions.

Generate an OAuth 2.0 access token

To authorize an application to call the callback endpoint, you can generate an OAuth 2.0 access token for the service account associated with the workflow. Assuming that you have the required permissions (for the Workflows Editor or Workflows Admin, and Service Account Token Creator roles), you can also generate a token yourself by running the generateAccessToken method.

If the generateAccessToken request is successful, the returned response body contains an OAuth 2.0 access token and an expiration time. (By default, OAuth 2.0 access tokens are valid for a maximum of 1 hour.) For example:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
  

The accessToken code can then be used in a curl call to the callback endpoint URL:

  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  

Generate an OAuth token for a Cloud Function

If you are invoking a callback from a Cloud Function using the same service account as the workflow and in the same project, you can generate an OAuth access token in the function itself. For example:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

For more context, see the tutorial on Creating a human-in-the-loop workflow using callbacks.

Request offline access

Access tokens periodically expire and become invalid credentials for a related API request. You can refresh an access token without prompting the user for permission if you requested offline access to the scopes associated with the token. Requesting offline access is a requirement for any application that needs to access a Google API when the user is not present. For more information, see Refreshing an access token (offline access).

Try out a basic callback workflow

You can create a basic workflow and then test the call to that workflow's callback endpoint using curl. You must have the necessary Workflows Editor or Workflows Admin permissions for the project in which the workflow resides.

  1. Create and deploy the following workflow and then execute it.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          
  2. After executing the workflow, the state of the workflow execution is ACTIVE until the callback request is received or the timeout limit is reached. Confirm the execution state and retrieve the callback URL:

    Console

    1. In the Cloud Console, go to the Workflows page:

      Go to Workflows
    2. Click the name of the workflow that you just executed.

      The state of the workflow execution is displayed.

    3. Click the Logs tab.
    4. Look for a log entry similar to the following:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. Copy the callback URL to use in the next command.

    gcloud

    1. First, retrieve the execution ID:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      Replace DURATION with an appropriate amount of time to limit the log entries returned (if you have executed the workflow multiple times).

      For example, --freshness=t10m returns log entries that are not older than 10 minutes. For details, see gcloud topic datetimes.

      The execution ID is returned. Note that the callback URL is also returned in the textPayload field. Copy both values to use in the following steps.

    2. Run the following command:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      
      The state of the workflow execution is returned.
  3. You can now call the callback endpoint using a curl command:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    
  4. Through the Google Cloud console or using the gcloud command-line tool, confirm that the state of the workflow execution is now SUCCEEDED.
  5. Look for the log entry with the returned textPayload that resembles the following:
    Received {"body":null,"headers":...
    

Samples

These samples demonstrate the syntax.

Catch timeout errors

This sample adds to the previous sample by catching any timeout errors, and writing the errors to the system log.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

Wait on a callback in a retry loop

This sample modifies the previous sample by implementing a retry step. Using a custom retry predicate, the workflow logs a warning when a timeout occurs and then retries the wait on the callback endpoint, up to five times. If the retry quota is exhausted before the callback is received, the final timeout error causes the workflow to fail.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

What's next