Working with Incoming Data in Cloud Workflows
Olive Power
Practice Team Lead
In this post I will be looking at how we can use Google Workflows to orchestrate tasks and processes. I specifically want to detail how to process data within a Workflow. This is useful when looking to parse incoming data for information relevant to one or more next stages within the Workflow.
Google Workflows allows the orchestration of hosted or custom services like Cloud Functions, Cloud Run, BigQuery or any HTTP-based API, in a defined order of steps.
Therefore, Workflows are useful where we want to control the flow of execution of steps.
Workflows are also useful when we are looking to pass information like event data, error codes, response body, onto the other orchestrated services.
But how do we capture and examine the incoming data to the Workflow and use it in the Workflow?
Setting the scene
Let's consider a simple online flower store. In this scenario, our online store accepts orders for flowers - name, productID, quantity - and generates an orderID.
To process each order successfully in the system, we want to capture the orderID of an order of flowers, and pass that value on to any other services in our online store process.
The sample order data we are looking to process is a simple one value array as follows:
To achieve this using Google Workflows we have set up the following architecture:
First we will publish our flower order to a Pub/Sub topic. This PubSub topic will then trigger a Workflow, using Eventarc.
Then the input data - our flowers orders information - in the Pub/Sub message is processed by the Workflow.
This can then be passed as input to other services orchestrated by the Workflow, for example a Cloud Function or a Cloud Run service
Let's set this up by first creating the Pub/Sub topic, then the Workflow and finally the Eventarc trigger to link them together.
Create the Pub/Sub Topic
First let's enable the relevant apis in cloud shell:
Create the pub/sub topic
where new_pusub is the name of your Pub/Sub topic
Building the Workflow
To create a Workflow, we need a configuration file. This is where all the logic for the Workflow steps and data processing we require is written.
Create a file called workflow.yaml
and paste in the following content. Save the file.
Understanding the Workflow syntax
The Workflows service receives incoming events and converts it to a JSON object (following the CloudEvents specification). This is passed to the Workflow execution request as a runtime argument, and used in the Workflows workspace.
Note the following key steps in our Workflow:
params
- runtime arguments passed to the Workflow as part of the execution request. This names the variable that stores the data - in this case ‘event’.init_vars
- defining global variablesdecode_pubsub_message
- decoding and parsing the incoming event data. This is a key step for our use case and is described below.finish_workflow
- return output
The data processing logic
This is built into the Decode_pubsub_message
step in a few stages. Let's go through each one:
Firstly, the ‘inbound_message’ variable is assigned the message data value of the decoded inbound event data
${base64.decode(event.data.message.data)};
The JSON format of the event is documented in the CloudEvents - JSON event format document.
Then the ‘full_msg’ variable is assigned the decoded json strings from the ‘inbound_message’ variable:
${json.decode(text.decode(inbound_message))}
Lastly the ‘input’ variable is assigned the orderID value that we want in the Workflow. Query the message array - ${‘inbound_message} for the first value - flowers[0] , and then for the orderID value:
${full_msg.flowers[0].orderID)}
This can then be used as input to steps listed in the Workflow. Here, we simply return this value as output of the Workflow ( ${input} ).
So let's create the Workflow
Create the workflow
where new-workflow is the workflow name
OK, now we have looked at the required logic, and built our simple architecture, let's see this in action.
Putting it all together
Create a service account used to invoke our workflow:
Enable the relevant permissions:
where ${PROJECT_ID} is the ID of your GCP project
Create an EventArc trigger that will invoke the Workflows execution when we publish a message to the PubSub topic.
Create the Eventarc trigger
where ${PROJECT_ID} is the ID of your GCP project
Now we have everything we need, let's run it.
Publish a message in the PubSub topic:
From the console open Workflows / new-workflow / Executions. Here we can see the successful execution of the Workflow:
We can also see the output of the message in the latest execution by clicking the execution ID link:
Note the output on the right hand side of the panel, showing our total flower order, and the separate orderID (001233).
So, to wrap up, we have passed input data to a Workflow, looked at the logic required to extract the input event data within the Workflow,, and finally returned this as output. The ability to share data in the Workflow workspace across all the steps within extends the ability of Cloud Workflow to orchestrate complex solutions, with end-to-end observability.
Learn more
Interested in finding out more about Workflows? Check out this Google Codelab Triggering Workflows with Eventarc; There are also lots of Workflow code examples on the All Workflows code samples page. Finally, a comprehensive tutorial on building event driven systems with Workflows and EventArc is available on YouTube Build an event-driven orchestration with Eventarc and Workflows.