日志解析概览
本文档简要介绍了 Google Security Operations 如何将原始日志解析为统一数据模型 (UDM) 格式。
Google Security Operations 可以接收源自以下提取的日志数据 来源:
- Google Security Operations 转发器
- Google Security Operations API Feed
- Google Security Operations Ingestion API
- 第三方技术合作伙伴
一般来说,客户会将数据作为原始原始日志发送。Google Security Operations 独有 使用 LogType 识别生成日志的设备。LogType 用于标识这两者:
- 生成日志的供应商和设备,例如思科防火墙 Linux DHCP 服务器,即 Bro DNS。
- 该解析器将原始日志转换为结构化统一数据模型 (UDM)。 解析器与 LogType 之间是一对一的关系。每个解析器都会转换单个 LogType 收到的数据。
Google Security Operations 提供了一组默认解析器,用于读取原始日志并使用原始日志中的数据生成结构化 UDM 记录。Google Security Operations 会维护这些解析器。客户还可以通过创建特定于客户的解析器来定义自定义数据映射说明。与您的 Google 安全运营团队联系 了解有关创建客户专用解析器的信息。
解析器包含数据映射指令。它定义了如何将数据从原始原始日志映射到 UDM 数据结构中的一个或多个字段。
如果没有解析错误,Google Security Operations 会使用原始日志中的数据创建 UDM 结构化记录。将原始日志转换为 UDM 记录的过程如下: 称为标准化。
默认解析器可能会映射原始日志中核心值的子集。通常,这些核心字段对于在 Google Security Operations 中提供安全数据分析至关重要。未映射的值会保留在原始日志中,但不会 存储在 UDM 记录中。
客户还可以使用 Ingestion API 以结构化统一数据模型 (UDM) 格式发送数据。
自定义提取数据的解析方式
Google Security Operations 提供以下功能,可让客户对传入的原始日志数据自定义数据解析。
- 特定于客户的解析器:客户为满足其特定要求的特定日志类型创建自定义解析器配置。客户专用解析器会替换特定 LogType 的默认解析器。如需了解如何创建特定于客户的解析器,请与您的 Google Security Operations 代表联系。
- 解析器扩展程序:客户可以在 作为默认解析器配置的补充每个客户都可以创建自己的一组自定义映射说明。这些映射说明定义了如何提取和转换额外的字段,将其从原始原始日志转换为 UDM 字段。解析器扩展不会替换 默认或特定于客户的解析器。
使用 Squid Web 代理日志的示例
本部分提供了一个 Squid 网络代理日志示例,并介绍了如何将值映射到 UDM 记录。有关 UDM 架构中所有字段的说明, 请参阅统一数据模型字段列表。
示例 Squid 网站代理日志包含以空格分隔的值。每个记录代表一个事件,并存储以下数据:时间戳、时长、客户端、结果代码/结果状态、传输的字节数、请求方法、网址、用户、层次结构代码和内容类型。在此示例中,以下字段 提取并映射到一条 UDM 记录:时间、客户端、结果状态、字节 请求方法和网址
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
比较这两种结构时,请注意 UDM 记录中仅包含原始日志数据的一部分。某些字段是必填字段,其他字段 都是选填的此外,UDM 记录中只有部分部分包含数据。如果解析器没有将原始日志中的数据映射到 UDM 则您不会在 Google Security Operations 中看到 UDM 记录的相应部分。
metadata
部分存储了事件时间戳。请注意,该值已从 EPOCH 格式转换为 RFC 3339 格式。此转换是可选的。时间戳可以是
以 EPOCH 格式存储,进行预处理以将秒和秒和
转换为单独的字段。
metadata.event_type
字段存储值 NETWORK_HTTP
,这是一个枚举值
用于标识事件类型。metadata.event_type
的值决定了
哪些是必填字段和选填的 UDM 字段。product_name
和 vendor_name
值包含对记录原始日志的设备的简单易懂的说明。
UDM 事件记录中的 metadata.event_type
与 log_type 不同
使用 Ingestion API 提取数据时定义。这两个属性存储的信息不同。
network
部分包含原始日志事件中的值。请注意,
原始日志中的状态值是从“result”
代码/状态”字段,然后再写入 UDM 记录。仅 result_code
已包含在 UDM 记录中。
principal
部分存储原始日志中的客户端信息。target
部分会同时存储完全限定网址和 IP 地址。
security_result
部分会存储一个枚举值,用于表示原始日志中记录的操作。
这是采用 JSON 格式的 UDM 记录。请注意,仅包含数据的部分会被纳入。src
、observer
、intermediary
、about
和 extensions
部分均不包含在内。
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
解析器说明中的步骤
解析器中的数据映射指令遵循一种常见模式,如 如下:
- 解析原始日志并提取数据。
- 处理提取的数据。这包括使用条件逻辑 有选择地解析值、转换数据类型、替换 值、转换为大写或小写等。
- 为 UDM 字段分配值。
- 将映射的 UDM 记录输出到 @output 键。
解析原始日志并提取数据
设置过滤条件语句
filter
语句是该解析说明集中的第一个语句。所有其他解析指令都包含在 filter
语句中。
filter {
}
初始化用于存储提取值的变量
在 filter
语句中,初始化解析器将用来存储从日志中提取的值的中间变量。
每次解析单个日志时都会使用这些变量。每个中间变量中的值将在解析说明的后面部分设置为一个或多个 UDM 字段。
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
从日志中提取各个值
Google Security Operations 提供了一组基于 Logstash 的过滤器,用于从原始日志文件中提取字段。根据日志的格式,您可以使用一个或多个 提取过滤条件,从日志中提取所有数据。如果字符串为:
- 原生 JSON,解析器语法类似于支持 JSON 格式日志的 JSON 过滤器。不支持嵌套 JSON。
- XML 格式,则解析器语法类似于 XML 过滤器,该过滤器支持 XML 格式的日志。
- 键值对,解析器语法类似于 Kv 过滤条件,支持键值对格式的消息。
- CSV 格式,解析器语法类似于支持 CSV 格式消息的 Csv 过滤器。
- 对于其他所有格式,解析器的语法与 使用 GROK 内置模式的 GROK 过滤器。 此方法使用正则表达式风格的提取说明。
Google Security Operations 提供每个过滤器中可用功能的一部分。Google Security Operations 还提供不支持的自定义数据映射语法 。请参阅解析器语法参考文档 了解支持的功能和自定义函数。
继续以 Squid 网络代理日志示例为例,以下数据提取 指令包含 Logstash Grok 语法和常规 表达式。
以下提取语句将值存储在以下中间 变量:
when
srcip
action
returnCode
size
method
username
url
tgtip
此示例语句还使用 overwrite
关键字将提取的值存储在每个变量中。如果提取过程返回错误,on_error
语句将 not_valid_log
设置为 True
。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
处理和转换提取值
Google Security Operations 利用 Logstash mutate 过滤条件插件功能,可对从原始日志中提取的值进行操作。Google Security Operations 提供插件可用的部分功能。 如需查看功能说明,请参阅解析器语法 自定义函数,例如:
- 将值转换为其他数据类型
- 替换字符串中的值
- 合并两个数组或将字符串附加到数组。字符串值包括 转换为数组。
- 转换为小写或大写
本部分提供了基于 Squid Web 代理日志的数据转换示例。 。
转换事件时间戳
所有存储为 UDM 记录的事件都必须具有事件时间戳。本示例
检查是否从日志中提取了数据值。然后,它会使用
Grok 日期函数
以将该值与 UNIX
时间格式相匹配。
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
转换 username
值
以下示例语句会转换 username
变量中的值
转换为小写形式。
mutate {
lowercase => [ "username"]
}
转换 action
值
以下示例会对 action
中间变量中的值进行求值,并将该值更改为 ALLOW、BLOCK 或 UNKNOWN_ACTION,这些值是 security_result.action
UDM 字段的有效值。security_result.action
UDM 字段是一种枚举类型,仅存储特定值。
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
转换目标 IP 地址
以下示例会检查 tgtip
中间变量中的值。如果找到,系统会使用预定义的 Grok 模式将该值与 IP 地址模式进行匹配。如果在将值与 IP 地址模式进行匹配时出现错误,on_error
函数会将 not_valid_tgtip
属性设置为 True
。如果匹配成功,则不会设置 not_valid_tgtip
属性。
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
更改 returnCode 和大小的数据类型
以下示例将 size
变量中的值转换为 uinteger
,并将 returnCode
变量中的值转换为 integer
。这是
因为 size
变量将保存到
network.received_bytes
UDM 字段,用于存储 int64
数据类型。通过
returnCode
变量将保存到 network.http.response_code
UDM 字段
用于存储 int32
数据类型。
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
在事件中为 UDM 字段分配值
提取并预处理值后,将它们分配给 UDM 中的字段 事件记录。您可以为 UDM 字段同时指定提取的值和静态值。
如果您填充了 event.disambiguation_key
,请确保此字段对于
为给定日志生成的每个事件。如果两个不同的事件具有
同一个 disambiguation_key
,则会导致
系统。
本部分中的解析器示例基于 Squid Web 代理日志示例 。
保存事件时间戳
必须为每条 UDM 事件记录设置 metadata.event_timestamp
值
UDM 字段。以下示例将从日志中提取的事件时间戳保存到
@timestamp
内置变量。Google Security Operations 会将此保存到
metadata.event_timestamp
UDM 字段。
mutate {
rename => {
"when" => "timestamp"
}
}
设置事件类型
必须为每条 UDM 事件记录设置 metadata.event_type
UDM 的值
字段。此字段是枚举类型。此字段的值决定了必须填充哪些其他 UDM 字段才能保存 UDM 记录。如果任何必填字段都不包含有效数据,解析和标准化流程将会失败。
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
使用 replace
语句保存 username
和 method
值
username
和 method
中间字段中的值是字符串。以下示例会检查是否存在有效值,如果存在,则将 username
值存储到 principal.user.userid
UDM 字段,并将 method
值存储到 network.http.method
UDM 字段。
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
将 action
保存到 security_result.action
UDM 字段
在上一部分中,action
中间变量中的值是
求值并转换为 security_result.action
UDM 字段的一个标准值。
security_result
和 action
UDM 字段都存储项数组,
也就是说,在保存
值。
首先,将转换后的值保存到中间 security_result.action
字段。security_result
字段是 action
字段的父字段。
mutate {
merge => {
"security_result.action" => "action"
}
}
接下来,将中间 security_result.action
中间字段保存到 security_result
UDM 字段。security_result
UDM 字段用于存储
项,因此该值将附加到此字段。
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
使用 merge
语句存储目标 IP 地址和来源 IP 地址
将以下值存储到 UDM 事件记录中:
- 将
srcip
中间变量的值传递给principal.ip
UDM 字段。 - 将
tgtip
中间变量的值传递给target.ip
UDM 字段。
principal.ip
和 target.ip
UDM 字段都存储项数组,因此
值会附加到每个字段。
以下示例展示了保存这些值的不同方法。
在转换步骤期间,系统使用预定义的 Grok 模式将 tgtip
中间变量与 IP 地址进行匹配。以下示例语句用于检查 not_valid_tgtip
属性是否为 true,表示 tgtip
值无法与 IP 地址模式匹配。如果为 false,则会将 tgtip
值保存到 target.ip
UDM 字段。
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
未转换 srcip
中间变量。以下声明
检查某个值是否从原始日志中提取,如果是,则保存
principal.ip
UDM 字段的值。
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
使用 rename
语句保存 url
、returnCode
和 size
以下示例语句使用 rename
语句存储以下值。
url
变量已保存到target.url
UDM 字段。- 保存到
network.http.response_code
UDM 字段的returnCode
中间变量。 - 保存到
network.received_bytes
UDM 字段的size
中间变量。
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
将 UDM 记录绑定到输出
数据映射指令中的最后一条语句会将处理后的数据输出到 UDM 事件记录。
mutate {
merge => {
"@output" => "event"
}
}
完整的解析器代码
以下是完整的解析器代码示例。说明的顺序与本文档前面的部分不同,但会产生相同的输出。
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter