日志解析概览
本文档简要介绍了 Google 安全运维团队如何将原始日志解析为 Unified Data Model (UDM) 格式。
Google Security Operations 可以接收来自以下提取来源的日志数据:
- Google Security Operations 转发器
- Google Security Operations API Feed
- Google Security Operations Ingestion API
- 第三方技术合作伙伴
通常,客户会以原始原始日志的形式发送数据。Google Security Operations 使用 LogType 唯一标识生成日志的设备。LogType 用于标识这两者:
- 生成日志的供应商和设备,例如 Cisco 防火墙、Linux DHCP 服务器或 Bro DNS。
- 解析器将原始日志转换为结构化的统一数据模型 (UDM)。解析器与 LogType 之间是一对一的关系。每个解析器都会转换单个 LogType 收到的数据。
Google Security Operations 提供了一组默认解析器,用于读取原始日志并使用原始日志中的数据生成结构化 UDM 记录。Google Security Operations 会维护这些解析器。客户还可以通过创建特定于客户的解析器来定义自定义数据映射说明。如需了解如何创建特定于客户的解析器,请与您的 Google Security Operations 代表联系。
解析器包含数据映射说明。它定义了如何将数据从原始原始日志映射到 UDM 数据结构中的一个或多个字段。
如果没有解析错误,Google Security Operations 会使用原始日志中的数据创建 UDM 结构化记录。将原始日志转换为 UDM 记录的过程称为标准化。
默认解析器可能会映射原始日志中的部分核心值。通常,这些核心字段对于在 Google Security Operations 中提供安全数据分析至关重要。未映射的值会保留在原始日志中,但不会存储在 UDM 记录中。
客户还可以使用 Ingestion API 以结构化统一数据模型 (UDM) 格式发送数据。
自定义提取数据的解析方式
Google Security Operations 提供以下功能,可让客户对传入的原始日志数据自定义数据解析。
- 特定于客户的解析器:客户为满足其特定要求的特定日志类型创建自定义解析器配置。客户专用解析器会替换特定 LogType 的默认解析器。如需了解如何创建特定于客户的解析器,请与您的 Google Security Operations 代表联系。
- 解析器扩展:除了默认的解析器配置之外,客户还可以添加自定义映射说明。每个客户都可以创建自己的一组唯一的自定义映射说明。这些映射说明定义了如何提取和转换其他字段,将原始原始日志转换为 UDM 字段。解析器扩展程序不会替换默认解析器或特定于客户的解析器。
使用 Squid 网站代理日志的示例
本部分提供了一个 Squid 网络代理日志示例,并介绍了如何将值映射到 UDM 记录。如需了解 UDM 架构中的所有字段的说明,请参阅统一数据模型字段列表。
示例 Squid 网站代理日志包含以空格分隔的值。每个记录代表一个事件,并存储以下数据:时间戳、时长、客户端、结果代码/结果状态、传输的字节数、请求方法、网址、用户、层次结构代码和内容类型。在此示例中,系统会提取以下字段并将其映射到 UDM 记录:时间、客户端、结果状态、字节数、请求方法和网址。
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
比较这两种结构时,请注意 UDM 记录中仅包含原始日志数据的一部分。某些字段是必填字段,其他字段为选填字段。此外,UDM 记录中只有部分部分包含数据。如果解析器未将原始日志中的数据映射到 UDM 记录,则您在 Google Security Operations 中不会看到 UDM 记录的该部分。
metadata
部分存储事件时间戳。请注意,该值已从 EPOCH 格式转换为 RFC 3339 格式。此转换是可选的。时间戳可以存储为 EPOCH 格式,并通过预处理将秒和毫秒部分分隔到单独的字段中。
metadata.event_type
字段存储值 NETWORK_HTTP
,该值是一个枚举值,用于标识事件类型。metadata.event_type
的值决定了哪些其他 UDM 字段是必填字段,哪些是可选字段。product_name
和 vendor_name
值包含对记录原始日志的设备的简单易懂的说明。
UDM 事件记录中的 metadata.event_type
与使用 Ingestion API 提取数据时定义的 log_type 不同。这两个属性存储的信息不同。
network
部分包含原始日志事件中的值。请注意,在此示例中,系统会先从“result code/status”字段解析原始日志中的状态值,然后再将其写入 UDM 记录。UDM 记录中仅包含 result_code。
principal
部分存储原始日志中的客户端信息。target
部分会同时存储完全限定网址和 IP 地址。
security_result
部分存储一个枚举值,用于表示原始日志中记录的操作。
这是采用 JSON 格式的 UDM 记录。请注意,仅包含包含数据的部分。不包括 src
、observer
、intermediary
、about
和 extensions
部分。
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
解析器说明中的步骤
解析器中的数据映射说明遵循常见模式,如下所示:
- 解析原始日志并提取数据。
- 操控提取的数据。这包括使用条件逻辑选择性地解析值、转换数据类型、替换值中的子字符串、转换为大写或小写等。
- 为 UDM 字段分配值。
- 将映射的 UDM 记录输出到 @output 键。
解析原始日志并提取数据
设置过滤器语句
filter
语句是该解析说明集中的第一个语句。所有其他解析说明都包含在 filter
语句中。
filter {
}
初始化用于存储提取值的变量
在 filter
语句中,初始化解析器将用来存储从日志中提取的值的中间变量。
系统会在每次解析单个日志时使用这些变量。每个中间变量中的值将在解析说明的后面部分设置为一个或多个 UDM 字段。
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
从日志中提取各个值
Google Security Operations 提供了一组基于 Logstash 的过滤器,用于从原始日志文件中提取字段。根据日志的格式,您可以使用一个或多个提取过滤条件从日志中提取所有数据。如果字符串为:
- 原生 JSON,解析器语法类似于支持 JSON 格式日志的 JSON 过滤器。不支持嵌套 JSON。
- XML 格式,解析器语法类似于支持 XML 格式日志的 XML 过滤器。
- 键值对,解析器语法类似于支持键值格式消息的 Kv 过滤器。
- CSV 格式,解析器语法类似于支持 CSV 格式消息的 Csv 过滤器。
- 所有其他格式,解析器语法类似于使用 GROK 内置模式的 GROK 过滤器。此方法使用正则表达式风格的提取说明。
Google Security Operations 提供每个过滤器中可用功能的一部分。Google Security Operations 还提供过滤器中不提供的自定义数据映射语法。如需了解支持的功能和自定义函数,请参阅解析器语法参考文档。
继续使用 Squid Web 代理日志示例,以下数据提取说明包含 Logstash Grok 语法和正则表达式的组合。
以下提取语句会将值存储在以下中间变量中:
when
srcip
action
returnCode
size
method
username
url
tgtip
此示例语句还使用 overwrite
关键字将提取的值存储在每个变量中。如果提取过程返回错误,则 on_error
语句会将 not_valid_log
设置为 True
。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
操控和转换提取的值
Google Security Operations 利用 Logstash mutate 过滤条件插件功能,可对从原始日志中提取的值进行操作。Google Security Operations 提供插件提供的部分功能。如需了解支持的功能和自定义函数(例如:解析器语法),请参阅:
- 将值转换为其他数据类型
- 替换字符串中的值
- 合并两个数组或将字符串附加到数组。字符串值会在合并之前转换为数组。
- 转换为小写或大写
本部分提供了基于前面介绍的 Squid 网络代理日志的数据转换示例。
转换事件时间戳
所有存储为 UDM 记录的事件都必须具有事件时间戳。此示例用于检查是否从日志中提取了数据值。然后,它使用 Grok 日期函数将值与 UNIX
时间格式进行匹配。
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
转换 username
值
以下示例语句会将 username
变量中的值转换为小写形式。
mutate {
lowercase => [ "username"]
}
转换 action
值
以下示例会对 action
中间变量中的值进行求值,并将该值更改为 ALLOW、BLOCK 或 UNKNOWN_ACTION,这些值是 security_result.action
UDM 字段的有效值。security_result.action
UDM 字段是一种枚举类型,仅存储特定值。
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
转换目标 IP 地址
以下示例会检查 tgtip
中间变量中的值。如果找到,系统会使用预定义的 Grok 模式将该值与 IP 地址模式进行匹配。如果在将值与 IP 地址模式进行匹配时出现错误,on_error
函数会将 not_valid_tgtip
属性设置为 True
。如果匹配成功,则不会设置 not_valid_tgtip
属性。
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
更改 returnCode 和 size 的数据类型
以下示例将 size
变量中的值转换为 uinteger
,并将 returnCode
变量中的值转换为 integer
。这是必需的,因为 size
变量将保存到存储 int64
数据类型的 network.received_bytes
UDM 字段。returnCode
变量将保存到存储 int32
数据类型的 network.http.response_code
UDM 字段。
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
为事件中的 UDM 字段分配值
提取和预处理值后,将其分配给 UDM 事件记录中的字段。您可以为 UDM 字段同时指定提取的值和静态值。
如果您填充 event.disambiguation_key
,请确保此字段对于为给定日志生成的每个事件都是唯一的。如果两个不同的事件具有相同的 disambiguation_key
,则会导致系统出现意外行为。
本部分中的解析器示例基于上面的 Squid Web 代理日志示例。
保存事件时间戳
每个 UDM 事件记录都必须为 metadata.event_timestamp
UDM 字段设置值。以下示例会将从日志中提取的事件时间戳保存到 @timestamp
内置变量。Google Security Operations 默认会将其保存到 metadata.event_timestamp
UDM 字段。
mutate {
rename => {
"when" => "timestamp"
}
}
设置事件类型
每个 UDM 事件记录都必须为 metadata.event_type
UDM 字段设置值。此字段是枚举类型。此字段的值决定了必须填充哪些其他 UDM 字段才能保存 UDM 记录。如果任何必填字段都不包含有效数据,解析和标准化流程将会失败。
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
使用 replace
语句保存 username
和 method
值
username
和 method
中间字段中的值为字符串。以下示例会检查是否存在有效值,如果存在,则将 username
值存储到 principal.user.userid
UDM 字段,并将 method
值存储到 network.http.method
UDM 字段。
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
将 action
保存到 security_result.action
UDM 字段
在上一部分中,系统会对 action
中间变量的值进行求值,并将其转换为 security_result.action
UDM 字段的标准值之一。
security_result
和 action
UDM 字段都存储一组项,这意味着您在保存此值时必须采用略有不同的方法。
首先,将转换后的值保存到中间 security_result.action
字段。security_result
字段是 action
字段的父级。
mutate {
merge => {
"security_result.action" => "action"
}
}
接下来,将中间 security_result.action
中间字段保存到 security_result
UDM 字段。security_result
UDM 字段存储项的数组,因此值会附加到此字段。
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
使用 merge
语句存储目标 IP 地址和来源 IP 地址
将以下值存储到 UDM 事件记录中:
- 将
srcip
中间变量的值传递给principal.ip
UDM 字段。 - 将
tgtip
中间变量的值传递给target.ip
UDM 字段。
principal.ip
和 target.ip
UDM 字段都存储一个项数组,因此值会附加到每个字段。
以下示例展示了保存这些值的不同方法。
在转换步骤期间,系统使用预定义的 Grok 模式将 tgtip
中间变量与 IP 地址进行匹配。以下示例语句用于检查 not_valid_tgtip
属性是否为 true,表示 tgtip
值无法与 IP 地址模式匹配。如果为 false,则会将 tgtip
值保存到 target.ip
UDM 字段。
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
未转换 srcip
中间变量。以下语句会检查是否从原始日志中提取了值,如果是,则将值保存到 principal.ip
UDM 字段。
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
使用 rename
语句保存 url
、returnCode
和 size
以下示例语句使用 rename
语句存储以下值。
- 保存到
target.url
UDM 字段的url
变量。 - 保存到
network.http.response_code
UDM 字段的returnCode
中间变量。 - 保存到
network.received_bytes
UDM 字段的size
中间变量。
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
将 UDM 记录绑定到输出
数据映射指令中的最后一条语句会将处理后的数据输出到 UDM 事件记录。
mutate {
merge => {
"@output" => "event"
}
}
完整的解析器代码
以下是完整的解析器代码示例。说明的顺序与本文档前面部分的顺序不同,但会产生相同的输出。
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter