JavaScript 用户定义的函数 (UDF) 是一种单条消息转换 (SMT)。UDF 提供了一种灵活的方式,可在 Pub/Sub 中实现自定义转换逻辑,类似于 BigQuery JavaScript UDF。
UDF 接受一条消息作为输入,对输入执行定义的操作,并返回处理结果。
UDF 具有以下关键属性:
- 函数名称:所提供代码中 Pub/Sub 应用于消息的 JavaScript 函数的名称。 
- 代码:用于定义转换逻辑的 JavaScript 代码。 此代码必须包含具有以下签名的函数: - /** * Transforms a Pub/Sub message. * @return {(Object<string, (string | Object<string, string>)>|* null)} - To * filter a message, return `null`. To transform a message, return a map with * the following keys: * - (required) 'data' : {string} * - (optional) 'attributes' : {Object<string, string>} * Returning empty `attributes` will remove all attributes from the message. * * @param {(Object<string, (string | Object<string, string>)>} - Pub/Sub * message. Keys: * - (required) 'data' : {string} * - (required) 'attributes' : {Object<string, string>} * * @param {Object<string, any>} metadata - Pub/Sub message metadata. * Keys: * - (optional) 'message_id' : {string} * - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format * - (optional) 'ordering_key': {string} */ function <function_name>(message, metadata) { // Perform custom transformation logic return message; // to filter a message instead, return `null` }
输入
- message实参:表示 Pub/Sub 消息的 JavaScript 对象。它包含以下属性:- data:(- String,必需)消息载荷。
- attributes:(- Object<String, String>,可选)表示消息属性的键值对映射。
 
- metadata实参:一个 JavaScript 对象,包含有关 Pub/Sub 消息的不可变元数据:
输出
- 如需转换消息,请修改 - message.data和- message.attributes的内容,然后返回更改后的- message对象。
- 如需过滤消息,请返回 - null。
输入 / 输出要求
- 如果 UDF 转换消息载荷,则载荷输入和输出必须是 UTF-8 编码的字符串。
- 如果 UDF 不转换消息载荷,则载荷可以使用任何编码。
- 属性键值对必须是采用 UTF-8 编码的字符串。
UDF 如何转换消息
对消息运行 UDF 的结果可以是以下任一项:
- UDF 会转换消息。 
- UDF 返回 - null。- 主题 SMT:Pub/Sub 会向发布者返回成功消息,并在响应中包含过滤后消息的消息 ID。 Pub/Sub 不会存储消息,也不会将其发送给任何订阅者。 
- 订阅 SMT:Pub/Sub 确认消息已递送,但不会将消息发送给订阅者。 
 
- UDF 抛出错误。 - 主题 SMT:Pub/Sub 会向发布者返回错误,并且无法发布任何消息。 
- 订阅 SMT:Pub/Sub 否定确认消息。 
 
限制
Pub/Sub 会对 UDF 强制执行资源限制,以确保高效的转换操作。限制包括:
- 每个 UDF 的代码上限为 20 KB
- 每条消息的执行时间上限为 500 毫秒
- 仅支持 ECMAScript 标准内置函数
- 未调用外部 API
- 不导入外部库
UDF 示例
以下是一些用于发布和订阅的 UDF 示例。您可以在 UDF 库中找到其他示例。
函数:将表示周几的整数转换为相应的字符串
将以下 UDF 添加到主题或订阅后,在发布或传送消息期间会发生以下变化:
- Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。 
- 该 UDF 会查找名为 - dayOfWeek的字段,如果该字段的值是介于 0 到 6 之间的数字,则将其转换为相应的一周中的某一天,例如- Monday。如果该字段不存在或者相应数字不在 0 到 6 的范围内,则代码会将- dayOfWeek字段设置为- Unknown。
- UDF 将修改后的载荷序列化回消息中。 
- Pub/Sub 会将更新后的消息传递给流水线中的下一步。 
function intToString(message, metadata) {
  const data = JSON.parse(message.data);
  switch(`data["dayOfWeek"]`) {
    case 0:
      data["dayOfWeek"] = "Sunday";
      break;
    case 1:
      data["dayOfWeek"] = "Monday";
      break;
    case 2:
      data["dayOfWeek"] = "Tuesday";
      break;
    case 3:
      data["dayOfWeek"] = "Wednesday";
      break;
    case 4:
      data["dayOfWeek"] = "Thursday";
      break;
    case 5:
      data["dayOfWeek"] = "Friday";
      break;
    case 6:
      data["dayOfWeek"] = "Saturday";
      break;
    default:
      data["dayOfWeek"] = "Unknown";
  }
  message.data = JSON.stringify(data);
  return message;
}
功能:遮盖社会保障号
将以下 UDF 添加到主题或订阅后,在发布或传送消息期间会发生以下变化:
- Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。 
- 该 UDF 会从消息载荷中移除字段 - ssn(如果存在)。
- UDF 将修改后的载荷序列化回消息中。 
- Pub/Sub 会将更新后的消息传递给流水线中的下一步。 
function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}
功能:过滤掉特定消息并自动确认
将以下 UDF 添加到主题或订阅后,在发布或传送消息期间会发生以下变化:
- Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。 
- 该 UDF 会检查载荷是否包含名为 - region的字段。
- 如果 - region字段的值不是- US,该函数会返回 null,从而导致 Pub/Sub 过滤相应消息。
- 如果 - region字段的值为- US,Pub/Sub 会将原始消息传递到流水线中的下一步。
function filterForUSRegion(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["region"] !== "US") {
    return null;
  }
  return message;
}
功能:验证消息内容,确保金额不大于 100
将以下 UDF 添加到主题或订阅后,在发布或传送消息期间会发生以下变化:
- Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。 
- UDF 会检查消息是否包含名为 - amount的字段。
- 如果 - amount字段的值大于- 100,则该函数会抛出错误。
- 如果 - amount字段的值不大于- 100,则该函数会返回原始消息。
- 然后,Pub/Sub 会将消息标记为失败,或者将原始消息传递到流水线的下一步。 
function validateAmount(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["amount"] > 100) {
    throw new Error("Amount is invalid");
  }
  return message;
}
后续步骤
- 在 UDF 库中探索其他示例