You can transform your event data by writing transformation expressions using CEL. For example, you can modify event payloads to satisfy a destination's specific API contract.
Note that events are always delivered in a CloudEvents format using an HTTP request in binary content mode unless you specify a message binding.
Set the input and output data formats
In addition to writing a transformation expression in CEL, you can optionally specify the data format of the incoming event data. This lets Eventarc Advanced know how to parse the payload of the event. You can also convert the data from one format to another.
The following formats are supported: Avro, JSON, and Protobuf. For more information, see Format received events.
Transformation expressions
When transforming events, all event attributes can be accessed in a CEL
expression as variables through a predefined message
object. These variables
are populated with values based on the event data at runtime. For example:
message.id
returns theid
attribute of the eventmessage.data
returns a CEL value representation of the event payloadmessage.data.some-key
returns the content of a field namedsome-key
from the event payload
Fields in message.data
are always represented as String
types and values are
mapped from the original event using the schema specified when setting the input
data format.
The transformation expression should express a complete event that includes the event context attributes and the event data payload. Expressions are written in JSON but predefined CEL functions, macros, and operators, as well as regular expressions using RE2 are supported. Eventarc Advanced also supports certain extension functions that can be used to transform the event data.
The following are two examples of using CEL expressions to transform your event data. For more use cases and examples, see Transformation examples.
Example: Format attribute values
The following example formats phone_number
attribute values
using regular expression functions. (Other attributes have been omitted.)
// Input: // { // "data": // { // "email_address": "charlie@altostrat.com", // "phone_number": "8005550100", // } // } // Output: // { // "data": // { // "email_domain": "altostrat.com", // "phone_number": "(800) 555-0100", // "area_code": "800", // "local_number": "5550100", // } // } { "data": { "email_domain": re.capture( message.data.email_address, "\\S+@(\\S+)"), "phone_number": re.extract( message.data.phone_number, "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3" ), }.merge ( re.captureN(message.data.phone_number, "^(?P\d{3})[\w\-)(]*(?P ) ) }\d{7})"
These are the regular expression functions used in the preceding example:
re.capture
: captures the first unnamed or named group value. Arguments are the following:target
: string that should be parsedregex
: regular expression used to capture values
Returns a string of the first captured group value.
re.captureN
: does a full match on the given string and regular expression. Arguments are the following:target
: string that should be parsedregex
: regular expression used to capture values
Returns a map with key and value pairs for a named group (group name, captured string) or an unnamed group (group index, captured string).
re.extract
: matches group values from the given target string and rewrites the string. Arguments are the following:target
: string that should be parsedregex
: regular expression used to extract valuesrewrite
: regular expression for how the result should be formatted
Returns a string of the extracted values that is formatted based on the
rewrite
argument.
Example: Map an array to an array of objects
The following example maps an array of integers into an array of objects. (Other attributes have been omitted.)
// Input: // { // "data": // { // "product_ids": [1, 2, 3] // } // } // Output: // { // "data": // { // "products": [ // { // "name": "apple", // "price": 70 // }, // { // "name": "orange", // "price": 80 // }, // { // "name": "Product(3)", // "price": 0 // }, // { // "name": "apple", // "price": 70 // } // ] // } // } { "data": { "products": message.data.product_ids.map(product_id, product_id == 1? { "name": "apple", "price": 70 } : product_id == 2? { "name": "orange", "price": 80 } : // Default: { "name": "Product(" + string(product_id) + ")", "price": 0 } ) } }
Configure a pipeline to transform events
You can configure a pipeline to transform event data in the Google Cloud console or by using the gcloud CLI.
Note that only one mediation per pipeline is supported.
Console
In the Google Cloud console, go to the Eventarc > Pipelines page.
You can create a pipeline or, if you are updating a pipeline, click the name of the pipeline.
Note that updating a pipeline might take more than 10 minutes.
In the Pipeline details page, click
Edit.In the Event mediation pane, do the following:
- Select the Apply a transformation checkbox.
In the Inbound format list, select the applicable format.
For more information, see Format received events.
In the CEL expression field, write a transformation expression in JSON. Predefined CEL functions, macros, and operators, as well as regular expressions are supported. For example:
{ "id": message.id, "datacontenttype": "application/json", "data": "{ \"scrubbed\": \"true\" }" }
The preceding example does the following:
- Removes all attributes from the original event except its
id
- Sets the
datacontenttype
attribute toapplication/json
- Replaces the event payload with a static JSON string
- Removes all attributes from the original event except its
Click Continue.
In the Destination pane, do the following:
If applicable, in the Outbound format list, select a format.
For more information, see Format received events.
Optionally, apply a Message binding. For more information, see the Define a message binding section in this document.
Click Save.
gcloud
Open a terminal.
You can create a pipeline or you can update a pipeline using the
gcloud beta eventarc pipelines update
command:Note that updating a pipeline might take more than 10 minutes.
gcloud beta eventarc pipelines update PIPELINE_NAME \ --location=REGION \ --mediations=transformation_template= \ { TRANSFORMATION_EXPRESSION }
Replace the following:
PIPELINE_NAME
: the ID of the pipeline or a fully qualified nameREGION
: a supported Eventarc Advanced locationAlternatively, you can set the gcloud CLI location property:
gcloud config set eventarc/location REGION
TRANSFORMATION_EXPRESSION
: an expression written in JSON. Predefined CEL functions, macros, and operators, as well as regular expressions are supported. Amediations
flag is used to apply atransformation_template
key.
Example:
gcloud beta eventarc pipelines update my-pipeline \ --location=us-central1 \ --mediations=transformation_template= \ { "id": message.id, "datacontenttype": "application/json", "data": "{ \"scrubbed\": \"true\" }" }
The preceding example does the following:
- Removes all attributes from the original event except its
id
- Sets the
datacontenttype
attribute toapplication/json
- Replaces the event payload with a static JSON string
Extension functions
Eventarc Advanced supports the following extension functions which can be used to transform the event data received through a bus.
Function | Description |
---|---|
denormalize |
Denormalizes a map or a list by adding redundant data to improve read
performance. Field names in the resulting map are delimited using a period
( Note that because you can't use a period ( For example: |
merge |
Joins two fields and returns the combined field. Fields with duplicate names are merged. For example:
|
removeFields |
Removes specific fields from an event. Field names are resolved as
paths. The period character ( Note that raw JSON is expected. If you marshal the JSON, the transformation might be applied to a JSON string and result in an error. For example: |
setField |
Adds or replaces a field of the event with a given key. The field
name is resolved as a path. The period character ( For example: |
Example: Add attribute to event payload without modifying other data
// Input: // { // "data": // { // "credit_card_number": "XXXX-XXXX-XXXX-XXXX" // } // } // Output: // { // "data": // { // "credit_card_number": "XXXX-XXXX-XXXX-XXXX", // "card_type": "credit" // } // } { "data": message.data.merge( { "card_type": "credit" } ) }
Example: Denormalize list of items from event payload
// Input: //{ //"data": // { // "products": [ // { // "number": 021774, // "type": "perishable", // "price": 2.00 // }, // { // "number": 95602, // "type": "diy", // "price": 120.00 // }, // { // "number": 568302, // "type": "toys", // "price": 12.00 // } // ] // } //} // // Output: //{ //"data": // { // "products": { // "0.number": 021774, // "0.type": "perishable", // "0.price": 2.00, // "1.number": 95602, // "1.type": "diy", // "1.price": 120.00, // "2.number": 568302, // "2.type": "toys", // "2.price": 12.00 // } // } //} // // message.setField("data.products", message.data.products.denormalize())
Example: Remove field from event payload
// Input: // { // "data": // { // "payment": { // "card_number": "XXXX-XXXX-XXXX-XXXX", // "card_type": "credit", // } // } // } // Output: // { // "data": // { // "payment": { // "card_type": "credit" // } // } // } message.removeFields(["data.payment.card_number"])
Define a message binding
By default, events are always delivered to a destination in a CloudEvents format using an HTTP request in binary content mode. Optionally, you can override this behavior by defining a message binding and constructing a new HTTP request.
Any HTTP headers that are introduced by other policies or controls (for example, OAuth or OIDC tokens) are preserved and merged with the headers resulting from the binding expression.
You can define a message binding when configuring a pipeline in the Google Cloud console or by using the gcloud CLI.
Console
In the Google Cloud console, go to the Eventarc > Pipelines page.
You can create a pipeline or, if you are updating a pipeline, click the name of the pipeline.
Note that updating a pipeline might take more than 10 minutes.
In the Pipeline details page, click
Edit.In the Destination pane, apply a Message binding which is a CEL expression written in JSON. This results in a newly constructed HTTP request which is then sent to the pipeline's destination.
For more information, see the Access inbound messages and Construct HTTP requests sections in this document.
Click Save.
gcloud
Open a terminal.
You can create a pipeline or you can update a pipeline using the
gcloud beta eventarc pipelines update
command:gcloud beta eventarc pipelines update PIPELINE_NAME \ --location=REGION \ --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'
Replace the following:
PIPELINE_NAME
: the ID of the pipeline or a fully qualified nameREGION
: a supported Eventarc Advanced locationAlternatively, you can set the gcloud CLI location property:
gcloud config set eventarc/location REGION
MESSAGE_BINDING
: a CEL expression written in JSON that results in a newly constructed HTTP request which is then sent to the pipeline's destination.For more information, see the Access inbound messages and Construct HTTP requests sections in this document.
Example:
gcloud beta eventarc pipelines create my-pipeline \ --location=us-central1 \ --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \ http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'
Note that if you are using an
http_endpoint_message_binding_template
key, you must also set thehttp_endpoint_uri
andnetwork_attachment
keys.
Access inbound messages
You can use a CEL expression to access an inbound CloudEvents message as follows:
- Use the
message.data
value to access thedata
field of the inbound message. - Use the
message.key
values (wherekey
is the name of the attribute) to access the attributes of the inbound message. Use a
headers
variable to access any headers added to the HTTP request by previous mediations in the processing chain. This variable defines a map of key-value pairs corresponding to the additional HTTP headers and not to the original headers of the initial inbound request.For example, the following CEL expression can be used to construct a headers-only HTTP request by adding an additional header to those added in previous pipeline mediations:
{"headers": headers.merge({"new-header-key": "new-header-value"})}
Construct HTTP requests
The result of the CEL expression must be a map of key-value pairs whose
headers
and body
fields are used to construct the HTTP request as follows.
For headers
fields:
- If a
headers
map exists as a result of the CEL expression, its key-value pairs are directly mapped to the HTTP request headers, and its values are constructed using the canonical string-encoding of the corresponding data type. - If a
headers
field doesn't exist, the resulting HTTP request won't contain any headers.
For body
fields:
- If a
body
field exists as a result of the CEL expression, its value is directly mapped to the HTTP request body. - If the
body
field value is of typebytes
orstring
, it's used as the HTTP request body as is; otherwise, it's converted to a JSON string. - If the
body
field doesn't exist, the resulting HTTP request won't contain a body.
Any other fields as a result of the CEL expression are ignored.
Extension functions
Eventarc Advanced supports the following extension functions which can be used to transform the event data when specifying a message binding.
Function | Description |
---|---|
merge |
Merges a passed CEL map into the CEL map to which the function is
applied. If the same key exists in both maps, or if the key's value is type
Example: |
toBase64 |
Converts a CEL value to a base64 URL-encoded string. Example: |
toCloudEventJsonWithPayloadFormat |
Converts a message to a CEL map that corresponds to a JSON
representation of a CloudEvents message, and applies
Example: |
toDestinationPayloadFormat |
Converts Example: |
toJsonString |
Converts a CEL value to a JSON string. For example:
|
toMap |
Converts a CEL list of CEL maps to a single CEL map. Example: |
Example: Retain headers, add new header, set body to destination format
gcloud beta eventarc pipelines create my-pipeline \ --location=us-central1 \ --input-payload-format-json='{}' \ --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'