转换收到的事件

您可以使用 CEL 编写转换表达式,从而转换您的事件数据。例如,您可以修改事件载荷,以满足目标位置的特定 API 合约。

请注意,除非您指定了消息绑定,否则事件始终以 二进制内容模式的 HTTP 请求以 CloudEvents 格式传送

设置输入和输出数据格式

除了使用 CEL 编写转换表达式之外,您还可以选择指定传入事件数据的数据格式。这样,Eventarc Advanced 便知道如何解析事件的载荷。您还可以将数据从一种格式转换为另一种格式。

支持以下格式:Avro、JSON 和 Protobuf。如需了解详情,请参阅设置接收事件的格式

转换表达式

转换事件时,您可以通过预定义的 message 对象在 CEL 表达式中以变量的形式访问所有事件属性。这些变量会根据运行时的事件数据填充值。例如:

  • message.id 返回事件的 id 属性
  • message.data 会返回事件载荷的 CEL 值表示
  • message.data.some-key 从事件载荷返回名为 some-key 的字段的内容

message.data 中的字段始终表示为 String 类型,并且值是使用设置输入数据格式时指定的架构从原始事件映射的。

转换表达式应表示包含事件情境属性和事件数据载荷的完整事件。表达式采用 JSON 编写,但支持预定义的 CEL 函数、宏和运算符,以及使用 RE2 的正则表达式。Eventarc Advanced 还支持某些可用于转换事件数据的扩展函数

下面是两个使用 CEL 表达式转换事件数据的示例。如需了解更多用例和示例,请参阅转换示例

示例:设置属性值的格式

以下示例使用正则表达式函数设置 phone_number 属性值的格式。(其他属性已省略。)

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

以下是前面的示例中使用的正则表达式函数:

  • re.capture:捕获第一个无名或命名组值。 参数如下:
    • target:应解析的字符串
    • regex:用于捕获值的正则表达式

    返回第一个捕获的组值的字符串。

  • re.captureN:对给定字符串和正则表达式进行完全匹配。参数如下:
    • target:应解析的字符串
    • regex:用于捕获值的正则表达式

    返回包含已命名组(组名称、捕获的字符串)或未命名组(组编号、捕获的字符串)的键值对的映射。

  • re.extract:匹配给定目标字符串中的组值,并重写该字符串。参数如下:
    • target:要解析的字符串
    • regex:用于提取值的正则表达式
    • rewrite:用于指定结果格式的正则表达式

    返回提取的值的字符串,该字符串的格式基于 rewrite 参数。

示例:将数组映射到对象数组

以下示例将一个整数数组映射到一个对象数组。 (其他属性已省略。)

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

配置用于转换事件的流水线

您可以在 Google Cloud 控制台中或使用 gcloud CLI 配置用于转换事件数据的流水线。

请注意,每个广告渠道仅支持一个中介。

控制台

  1. 在 Google Cloud 控制台中,前往 Eventarc > 流水线页面。

    前往“流水线”

  2. 您可以创建流水线,如果要更新流水线,请点击流水线的名称。

    请注意,更新流水线可能需要 10 分钟以上。

  3. 流水线详情页面中,点击 修改

  4. 事件中介窗格中,执行以下操作:

    1. 选中应用转换复选框。
    2. 入站格式列表中,选择适用的格式。

      如需了解详情,请参阅设置接收事件的格式

    3. CEL 表达式字段中,使用 JSON 编写转换表达式。支持预定义的 CEL 函数、宏和运算符,以及正则表达式。例如:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      上述示例会执行以下操作:

      • 从原始事件中移除除 id 之外的所有属性
      • datacontenttype 属性设置为 application/json
      • 将事件载荷替换为静态 JSON 字符串
    4. 点击继续

  5. 目标窗格中,执行以下操作:

    1. 出站格式列表中,选择一种格式(如果适用)。

      如需了解详情,请参阅设置接收事件的格式

    2. (可选)应用消息绑定。如需了解详情,请参阅本文档中的定义消息绑定部分。

  6. 点击保存

gcloud

  1. 打开终端。

  2. 您可以创建流水线,也可以使用 gcloud beta eventarc pipelines update 命令更新流水线:

    请注意,更新流水线可能需要 10 分钟以上。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    替换以下内容:

    • PIPELINE_NAME:流水线的 ID 或完全限定名称
    • REGION受支持的 Eventarc Advanced 位置

      或者,您也可以设置 gcloud CLI 位置属性:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION:使用 JSON 编写的表达式。支持预定义的 CEL 函数、宏和运算符,以及正则表达式。mediations 标志用于应用 transformation_template 键。

    示例:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    上述示例会执行以下操作:

    • 从原始事件中移除除 id 之外的所有属性
    • datacontenttype 属性设置为 application/json
    • 将事件载荷替换为静态 JSON 字符串

扩展函数

Eventarc Advanced 支持以下扩展函数,这些函数可用于转换通过总线接收的事件数据。

函数 说明
denormalize

通过添加冗余数据来反规范化映射或列表,以提高读取性能。生成的映射中的字段名称使用英文句点 (.) 分隔。列表索引会转换为字符串键,从 0 开始。

请注意,由于您无法在 Avro 和 Protobuf 字段名称中使用英文句号 (.),因此请仅使用此函数定位 JSON 数据。

例如:map.() -> map(string, dyn)list() -> map(string, dyn)

merge

联接两个字段并返回合并后的字段。系统会合并名称重复的字段。

例如:message.(message) -> message

removeFields

从事件中移除特定字段。字段名称会解析为路径。英文句点字符 (.) 用作分隔符。

请注意,应使用原始 JSON。如果您将 JSON 序列化,则转换可能会应用于 JSON 字符串,从而导致错误。

例如:message.(list(string)) -> message

setField

使用给定键添加或替换事件的字段。字段名称会解析为路径。英文句点字符 (.) 用作分隔符。

例如:message.(string, dyn) -> message

示例:向事件载荷添加属性,而不会修改其他数据

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

示例:对事件载荷中的项列表进行非规范化处理

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

示例:从事件载荷中移除字段

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

定义消息绑定

默认情况下,事件始终通过二进制内容模式的 HTTP 请求以 CloudEvents 格式传送到目标位置。您也可以通过定义消息绑定并构造新的 HTTP 请求来覆盖此行为。

其他政策或控件(例如 OAuth 或 OIDC 令牌)引入的任何 HTTP 标头都会保留,并与绑定表达式生成的标头合并。

您可以在 Google Cloud 控制台中配置数据流时或使用 gcloud CLI 时定义消息绑定。

控制台

  1. 在 Google Cloud 控制台中,前往 Eventarc > 流水线页面。

    前往“流水线”

  2. 您可以创建流水线,如果要更新流水线,请点击流水线的名称。

    请注意,更新流水线可能需要 10 分钟以上。

  3. 流水线详情页面中,点击 修改

  4. 目标窗格中,应用消息绑定,这是一个用 JSON 编写的 CEL 表达式。这会生成一个新构建的 HTTP 请求,然后将其发送到流水线的目的地。

    如需了解详情,请参阅本文档中的访问入站消息构建 HTTP 请求部分。

  5. 点击保存

gcloud

  1. 打开终端。

  2. 您可以创建流水线,也可以使用 gcloud beta eventarc pipelines update 命令更新流水线:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    替换以下内容:

    • PIPELINE_NAME:流水线的 ID 或完全限定名称
    • REGION受支持的 Eventarc Advanced 位置

      或者,您也可以设置 gcloud CLI 位置属性:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING:使用 JSON 编写的 CEL 表达式,会生成一个新构建的 HTTP 请求,然后将该请求发送到流水线的目标位置。

      如需了解详情,请参阅本文档中的访问入站消息构建 HTTP 请求部分。

    示例:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    请注意,如果您使用的是 http_endpoint_message_binding_template 密钥,则还必须设置 http_endpoint_urinetwork_attachment 密钥。

访问收到的消息

您可以使用 CEL 表达式访问传入的 CloudEvents 消息,如下所示:

  • 使用 message.data 值访问入站消息的 data 字段。
  • 使用 message.key 值(其中 key 是属性的名称)访问传入消息的属性。
  • 使用 headers 变量可访问处理链中之前中介添加到 HTTP 请求的任何标头。此变量定义了一个键值对映射,该映射对应于其他 HTTP 标头,而不是初始入站请求的原始标头。

    例如,以下 CEL 表达式可用于通过向之前的流水线中介添加的标头添加其他标头来构建仅包含标头的 HTTP 请求:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

构建 HTTP 请求

CEL 表达式的结果必须是键值对的映射,其中 headersbody 字段用于构建 HTTP 请求,如下所示。

对于 headers 字段:

  • 如果 CEL 表达式产生了 headers 映射,则其键值对会直接映射到 HTTP 请求标头,并且其值使用相应数据类型的规范字符串编码构建而成。
  • 如果不存在 headers 字段,生成的 HTTP 请求将不包含任何标头。

对于 body 字段:

  • 如果 CEL 表达式产生了 body 字段,则其值会直接映射到 HTTP 请求正文。
  • 如果 body 字段值的类型为 bytesstring,则会直接用作 HTTP 请求正文;否则,系统会将其转换为 JSON 字符串。
  • 如果 body 字段不存在,则生成的 HTTP 请求正文是二进制内容模式下最终 CloudEvents HTTP 消息绑定的正文。

系统会忽略 CEL 表达式的任何其他字段。

扩展函数

Eventarc Advanced 支持以下扩展函数,这些函数可用于在指定消息绑定时转换事件数据。

函数 说明
merge

将传入的 CEL 映射合并到应用了函数的 CEL 映射中。如果两个映射中存在相同的键,或者键的值的类型为 map,则会合并这两个映射;否则,系统会使用传递的映射中的值。

示例:map1.merge(map2) -> map3

toBase64

将 CEL 值转换为 base64 网址编码的字符串。

示例:map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

将消息转换为与 CloudEvents 消息的 JSON 表示形式对应的 CEL 映射,并将 toDestinationPayloadFormat 应用于消息数据。此外,还会将事件的 datacontenttype 设置为指定的出站格式 (output_payload_format_*)。如果未设置出站格式,则使用任何现有的 datacontenttype;否则,系统不会设置 datacontenttype。如果消息不符合 CloudEvents 规范,函数将失败。请注意,如需将数据转换为 JSON 字符串,您可以使用 toJsonString

示例: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

message.data 转换为指定的出站格式 (output_payload_format_*)。如果未设置出站格式,则会原样返回 message.data

示例:message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

将 CEL 值转换为 JSON 字符串。

例如:map.toJsonString() -> string

toMap

将 CEL 映射的 CEL 列表转换为单个 CEL 映射。

示例:list(map).toMap() -> map

示例:保留标头、添加新标头、将正文设置为目标格式

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

后续步骤