某些 Google 提供的 Dataflow 模板支持用户定义的函数 (UDF)。借助 UDF,您可以扩展模板的功能,而无需修改模板代码。
概览
如需创建 UDF,请编写 JavaScript 函数或 Python 函数(具体取决于模板)。将 UDF 代码文件存储在 Cloud Storage 中,并以模板参数的形式指定位置。对于每个输入元素,模板都会调用您的函数。函数会转换元素或执行其他自定义逻辑,并将结果返回给模板。
例如,您可以使用 UDF 执行以下操作:
- 重新设置输入数据的格式,以匹配目标架构。
- 隐去敏感数据。
- 从输出中过滤某些元素。
UDF 函数的输入是单个数据元素,序列化为 JSON 字符串。函数会返回一个序列化 JSON 字符串作为输出。数据格式取决于模板。例如,在 Pub/Sub Subscription to BigQuery 模板中,输入是序列化为 JSON 对象的 Pub/Sub 消息数据,而输出是一个表示 BigQuery 表行的序列化 JSON 对象。如需了解详情,请参阅每个模板对应的文档。
使用 UDF 运行模板
如需使用 UDF 运行模板,您需要将 JavaScript 文件的 Cloud Storage 位置和函数名称指定为模板参数。
对于某些 Google 提供的模板,您还可以直接在 Google Cloud 控制台中创建 UDF,如下所示:
在 Google Cloud 控制台中,转到 Dataflow 页面。
点击 add_box基于模板创建作业。
选择 Google 提供的要运行的模板。
展开可选参数。如果模板支持 UDF,则该模板具有一个 UDF 的 Cloud Storage 位置参数和另一个函数名称参数。
点击模板参数旁边的创建 UDF。
在选择或创建用户定义的函数 (UDF) 面板中:
- 输入文件名。示例:
my_udf.js
。 - 选择 Cloud Storage 文件夹。示例:
gs://your-bucket/your-folder
。 - 使用内嵌代码编辑器编写函数。该编辑器预先填充了样板代码供您参考。
点击创建 UDF。
Google Cloud 控制台会保存 UDF 文件并填充 Cloud Storage 位置。
在相应字段中输入函数的名称。
- 输入文件名。示例:
编写 JavaScript UDF
以下代码展示了您可以从中启动的无操作 JavaScript UDF:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
JavaScript 代码在 Nashorn JavaScript 引擎上运行。我们建议您先在 Nashorn 引擎上测试 UDF,然后再部署它。Nashorn 引擎与 JavaScript 的 Node.js 实现不完全匹配。常见的问题是使用 console.log()
或 Number.isNaN()
,两者都未在 Nashorn 引擎中定义。
您可以使用 Cloud Shell(其中预安装了 JDK 11)在 Nashorn 引擎上测试 UDF。请在交互模式下启动 Nashorn,如下所示:
jjs --language=es6
在 Nashorn 交互式 Shell 中,执行以下步骤:
- 调用
load
来加载 UDF JavaScript 文件。 - 根据流水线的预期消息定义输入 JSON 对象。
- 使用
JSON.stringify
函数将输入序列化为 JSON 字符串。 - 调用 UDF 函数以处理 JSON 字符串。
- 调用
JSON.parse
对输出进行反序列化。 - 验证结果。
示例:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
编写 Python UDF
以下代码展示了您可以从中启动的无操作 Python UDF:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Python UDF 支持 Python 和 Apache Beam 的标准依赖项软件包。它们不能使用第三方软件包。
错误处理
通常,如果在 UDF 执行期间发生错误,则错误会写入死信位置。具体细节取决于模板。例如,Pub/Sub Subscription to BigQuery 模板会创建一个 _error_records
表并在其中写入错误。运行时 UDF 错误可能是因语法错误或未捕获的异常导致的。如需检查是否存在语法错误,请在本地测试 UDF。
您可以通过编程方式针对不应处理的元素抛出异常。在此示例中,该元素会写入死信位置(如果模板支持死信位置)。如需查看展示此方法的示例,请参阅路由事件。
实际使用示例
本部分根据真实的应用场景介绍了 UDF 的一些常见模式。
丰富事件
可以使用 UDF 通过新字段来丰富事件,以获取更多情境信息。
示例:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
转换事件
可以使用 UDF 来转换整个事件格式,具体取决于目标位置要求的格式。
以下示例会将 Cloud Logging 日志条目 (LogEntry
) 还原为原始日志字符串(如果有)。(根据日志源,原始日志字符串有时会填充在 textPayload
字段中。)您可以使用此模式以原始格式发送原始日志,而不是发送 Cloud Logging 中的整个 LogEntry
。
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
隐去或移除事件数据
可以使用 UDF 来隐去或移除事件的一部分。
以下示例会通过替换字段名称 sensitiveField
的值来隐去该字段名称,并完全移除名为 redundantField
的字段。
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
路由事件
可以使用 UDF 将事件路由到下游接收器中的不同目标位置。
以下示例会根据 Pub/Sub to Splunk 模板将每个事件路由到正确的 Splunk 索引。该示例会调用用户定义的本地函数以将事件映射到索引。
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
以下示例会将无法识别的事件路由到死信队列(假设模板支持死信队列)。(例如,请参阅 Pub/Sub to JDBC 模板。)您可以使用此模式过滤掉意外条目,然后再写入目标位置。
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
过滤事件
可以使用 UDF 来过滤输出中不需要或无法识别的事件。
以下示例会删除 data.severity
等于 "DEBUG"
的事件。
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
后续步骤
- Google 提供的模板
- 构建和运行 Flex 模板
- 运行经典模板
- 使用 UDF 扩展 Dataflow 模板(博文)
- 示例 UDF (GitHub)