为 Dataflow 模板创建用户定义的函数

某些 Google 提供的 Dataflow 模板支持用户定义的函数 (UDF)。借助 UDF,您可以扩展模板的功能,而无需修改模板代码。

概览

如需创建 UDF,请编写 JavaScript 函数或 Python 函数(具体取决于模板)。将 UDF 代码文件存储在 Cloud Storage 中,并以模板参数的形式指定位置。对于每个输入元素,模板都会调用您的函数。函数会转换元素或执行其他自定义逻辑,并将结果返回给模板。

例如,您可以使用 UDF 执行以下操作:

  • 重新设置输入数据的格式,使其与目标架构相匹配。
  • 隐去敏感数据。
  • 从输出中过滤某些元素。

UDF 函数的输入是单个数据元素,序列化为 JSON 字符串。函数会返回一个序列化 JSON 字符串作为输出。数据格式取决于模板。例如,在 Pub/Sub Subscription to BigQuery 模板中,输入是序列化为 JSON 对象的 Pub/Sub 消息数据,而输出是一个表示 BigQuery 表行的序列化 JSON 对象。如需了解详情,请参阅每个模板对应的文档

使用 UDF 运行模板

如需使用 UDF 运行模板,您需要将 JavaScript 文件的 Cloud Storage 位置和函数名称指定为模板参数。

对于某些 Google 提供的模板,您还可以直接在 Google Cloud 控制台中创建 UDF,如下所示:

  1. 在 Google Cloud 控制台中,转到 Dataflow 页面。

    转到 Dataflow 页面

  2. 点击 基于模板创建作业

  3. 选择要运行的 Google 提供的模板。

  4. 展开可选参数。如果模板支持 UDF,则该模板具有一个 UDF 的 Cloud Storage 位置参数和另一个函数名称参数。

  5. 点击模板参数旁边的创建 UDF

  6. 选择或创建用户定义的函数 (UDF) 面板中:

    1. 输入文件名。示例:my_udf.js
    2. 选择 Cloud Storage 文件夹。示例:gs://your-bucket/your-folder
    3. 使用内嵌代码编辑器编写函数。该编辑器预先填充了样板代码供您参考。
    4. 点击创建 UDF

      Google Cloud 控制台会保存 UDF 文件并填充 Cloud Storage 位置。

    5. 在相应字段中输入函数的名称。

编写 JavaScript UDF

以下代码展示了您可以从中启动的无操作 JavaScript UDF:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

JavaScript 代码在 Nashorn JavaScript 引擎上运行。我们建议您先在 Nashorn 引擎上测试 UDF,然后再部署它。Nashorn 引擎与 JavaScript 的 Node.js 实现不完全匹配。常见的问题是使用 console.log()Number.isNaN(),两者都未在 Nashorn 引擎中定义。

您可以使用 Cloud Shell(其中预安装了 JDK 11)在 Nashorn 引擎上测试 UDF。请在交互模式下启动 Nashorn,如下所示:

jjs --language=es6

在 Nashorn 交互式 Shell 中,执行以下步骤:

  1. 调用 load 来加载 UDF JavaScript 文件。
  2. 根据流水线的预期消息定义输入 JSON 对象。
  3. 使用 JSON.stringify 函数将输入序列化为 JSON 字符串。
  4. 调用 UDF 函数来处理 JSON 字符串。
  5. 调用 JSON.parse 对输出进行反序列化。
  6. 验证结果。

示例:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

编写 Python UDF

以下代码展示了您可以从中启动的无操作 Python UDF:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Python UDF 支持 Python 和 Apache Beam 的标准依赖项软件包。它们不能使用第三方软件包。

错误处理

通常,如果在 UDF 执行期间发生错误,则错误会写入死信位置。具体详情取决于模板。例如,Pub/Sub Subscription to BigQuery 模板会创建一个 _error_records 表并将错误写入其中。运行时 UDF 错误可能是因语法错误或未捕获的异常导致的。如需检查是否存在语法错误,请在本地测试 UDF。

您可以通过编程方式针对不应处理的元素抛出异常。在此示例中,该元素会写入死信位置(如果模板支持死信位置)。如需查看展示此方法的示例,请参阅路由事件

实际使用示例

本部分根据真实的应用场景介绍了 UDF 的一些常见模式。

丰富事件

可以使用 UDF 通过新字段来丰富事件,以获取更多情境信息。

示例:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

转换事件

可以使用 UDF 来转换整个事件格式,具体取决于目标位置要求的格式。

以下示例会将 Cloud Logging 日志条目 (LogEntry) 还原为原始日志字符串(如果有)。(根据日志源,原始日志字符串有时会填充在 textPayload 字段中。)您可以使用此模式以原始格式发送原始日志,而不是从 Cloud Logging 发送整个 LogEntry

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

隐去或移除事件数据

可以使用 UDF 来隐去或移除事件的一部分。

以下示例会通过替换字段名称 sensitiveField 的值来隐去该字段名称,并完全移除名为 redundantField 的字段。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

路由事件

可以使用 UDF 将事件路由到下游接收器中的不同目标位置。

以下示例会根据 Pub/Sub to Splunk 模板将每个事件路由到正确的 Splunk 索引。该示例会调用用户定义的本地函数以将事件映射到索引。

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

以下示例会将无法识别的事件路由到死信队列(假设模板支持死信队列)。(例如,请参阅 Pub/Sub to JDBC 模板。)您可以使用此模式过滤掉意外条目,然后再写入目标位置。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

过滤事件

可以使用 UDF 来过滤输出中不需要或无法识别的事件。

以下示例会丢弃 data.severity 等于 "DEBUG" 的事件。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

后续步骤