日志解析概览

支持以下语言:

本文档简要介绍了 Google Security Operations 如何将原始日志解析为统一数据模型 (UDM) 格式。

Google Security Operations 可以接收源自以下提取的日志数据 来源:

  • Google Security Operations 转发器
  • Google Security Operations API Feed
  • Google Security Operations Ingestion API
  • 第三方技术合作伙伴

一般来说,客户会将数据作为原始原始日志发送。Google Security Operations 独有 使用 LogType 识别生成日志的设备。LogType 用于标识这两者:

  • 生成日志的供应商和设备,例如思科防火墙 Linux DHCP 服务器,即 Bro DNS。
  • 该解析器将原始日志转换为结构化统一数据模型 (UDM)。 解析器与 LogType 之间是一对一的关系。每个解析器都会转换单个 LogType 收到的数据。

Google Security Operations 提供了一组默认解析器,用于读取原始日志并使用原始日志中的数据生成结构化 UDM 记录。Google Security Operations 会维护这些解析器。客户还可以通过创建特定于客户的解析器来定义自定义数据映射说明。与您的 Google 安全运营团队联系 了解有关创建客户专用解析器的信息。

注入和标准化工作流

解析器包含数据映射指令。它定义了如何将数据从原始原始日志映射到 UDM 数据结构中的一个或多个字段。

如果没有解析错误,Google Security Operations 会使用原始日志中的数据创建 UDM 结构化记录。将原始日志转换为 UDM 记录的过程如下: 称为标准化

默认解析器可能会映射原始日志中核心值的子集。通常,这些核心字段对于在 Google Security Operations 中提供安全数据分析至关重要。未映射的值会保留在原始日志中,但不会 存储在 UDM 记录中。

客户还可以使用 Ingestion API 以结构化统一数据模型 (UDM) 格式发送数据。

自定义提取数据的解析方式

Google Security Operations 提供以下功能,可让客户对传入的原始日志数据自定义数据解析。

  • 特定于客户的解析器:客户为满足其特定要求的特定日志类型创建自定义解析器配置。客户专用解析器会替换特定 LogType 的默认解析器。如需了解如何创建特定于客户的解析器,请与您的 Google Security Operations 代表联系。
  • 解析器扩展程序:客户可以在 作为默认解析器配置的补充每个客户都可以创建自己的一组自定义映射说明。这些映射说明定义了如何提取和转换额外的字段,将其从原始原始日志转换为 UDM 字段。解析器扩展不会替换 默认或特定于客户的解析器。

使用 Squid Web 代理日志的示例

本部分提供了一个 Squid 网络代理日志示例,并介绍了如何将值映射到 UDM 记录。有关 UDM 架构中所有字段的说明, 请参阅统一数据模型字段列表

示例 Squid 网站代理日志包含以空格分隔的值。每个记录代表一个事件,并存储以下数据:时间戳、时长、客户端、结果代码/结果状态、传输的字节数、请求方法、网址、用户、层次结构代码和内容类型。在此示例中,以下字段 提取并映射到一条 UDM 记录:时间、客户端、结果状态、字节 请求方法和网址

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

示例 squid Web 代理

比较这两种结构时,请注意 UDM 记录中仅包含原始日志数据的一部分。某些字段是必填字段,其他字段 都是选填的此外,UDM 记录中只有部分部分包含数据。如果解析器没有将原始日志中的数据映射到 UDM 则您不会在 Google Security Operations 中看到 UDM 记录的相应部分。

映射到 UDM 的日志值

metadata 部分存储了事件时间戳。请注意,该值已从 EPOCH 格式转换为 RFC 3339 格式。此转换是可选的。时间戳可以是 以 EPOCH 格式存储,进行预处理以将秒和秒和 转换为单独的字段。

metadata.event_type 字段存储值 NETWORK_HTTP,这是一个枚举值 用于标识事件类型。metadata.event_type 的值决定了 哪些是必填字段和选填的 UDM 字段。product_namevendor_name 值包含对记录原始日志的设备的简单易懂的说明。

UDM 事件记录中的 metadata.event_type 与 log_type 不同 使用 Ingestion API 提取数据时定义。这两个属性存储的信息不同。

network 部分包含原始日志事件中的值。请注意, 原始日志中的状态值是从“result” 代码/状态”字段,然后再写入 UDM 记录。仅 result_code 已包含在 UDM 记录中。

映射到 UDM 的日志值

principal 部分存储原始日志中的客户端信息。target 部分会同时存储完全限定网址和 IP 地址。

security_result 部分会存储一个枚举值,用于表示原始日志中记录的操作。

这是采用 JSON 格式的 UDM 记录。请注意,仅包含数据的部分会被纳入。srcobserverintermediaryaboutextensions 部分均不包含在内。

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

解析器说明中的步骤

解析器中的数据映射指令遵循一种常见模式,如 如下:

  1. 解析原始日志并提取数据。
  2. 处理提取的数据。这包括使用条件逻辑 有选择地解析值、转换数据类型、替换 值、转换为大写或小写等。
  3. 为 UDM 字段分配值。
  4. 将映射的 UDM 记录输出到 @output 键。

解析原始日志并提取数据

设置过滤条件语句

filter 语句是该解析说明集中的第一个语句。所有其他解析指令都包含在 filter 语句中。

filter {

}

初始化用于存储提取值的变量

filter 语句中,初始化解析器将用来存储从日志中提取的值的中间变量。

每次解析单个日志时都会使用这些变量。每个中间变量中的值将在解析说明的后面部分设置为一个或多个 UDM 字段。

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

从日志中提取各个值

Google Security Operations 提供了一组基于 Logstash 的过滤器,用于从原始日志文件中提取字段。根据日志的格式,您可以使用一个或多个 提取过滤条件,从日志中提取所有数据。如果字符串为:

  • 原生 JSON,解析器语法类似于支持 JSON 格式日志的 JSON 过滤器。不支持嵌套 JSON。
  • XML 格式,则解析器语法类似于 XML 过滤器,该过滤器支持 XML 格式的日志。
  • 键值对,解析器语法类似于 Kv 过滤条件,支持键值对格式的消息。
  • CSV 格式,解析器语法类似于支持 CSV 格式消息的 Csv 过滤器
  • 对于其他所有格式,解析器的语法与 使用 GROK 内置模式GROK 过滤器。 此方法使用正则表达式风格的提取说明。

Google Security Operations 提供每个过滤器中可用功能的一部分。Google Security Operations 还提供不支持的自定义数据映射语法 。请参阅解析器语法参考文档 了解支持的功能和自定义函数。

继续以 Squid 网络代理日志示例为例,以下数据提取 指令包含 Logstash Grok 语法和常规 表达式。

以下提取语句将值存储在以下中间 变量:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

此示例语句还使用 overwrite 关键字将提取的值存储在每个变量中。如果提取过程返回错误,on_error 语句将 not_valid_log 设置为 True

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

处理和转换提取值

Google Security Operations 利用 Logstash mutate 过滤条件插件功能,可对从原始日志中提取的值进行操作。Google Security Operations 提供插件可用的部分功能。 如需查看功能说明,请参阅解析器语法 自定义函数,例如:

  • 将值转换为其他数据类型
  • 替换字符串中的值
  • 合并两个数组或将字符串附加到数组。字符串值包括 转换为数组。
  • 转换为小写或大写

本部分提供了基于 Squid Web 代理日志的数据转换示例。 。

转换事件时间戳

所有存储为 UDM 记录的事件都必须具有事件时间戳。本示例 检查是否从日志中提取了数据值。然后,它会使用 Grok 日期函数 以将该值与 UNIX 时间格式相匹配。

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

转换 username

以下示例语句会转换 username 变量中的值 转换为小写形式。

mutate {
   lowercase => [ "username"]
   }

转换 action

以下示例会对 action 中间变量中的值进行求值,并将该值更改为 ALLOW、BLOCK 或 UNKNOWN_ACTION,这些值是 security_result.action UDM 字段的有效值。security_result.action UDM 字段是一种枚举类型,仅存储特定值。

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

转换目标 IP 地址

以下示例会检查 tgtip 中间变量中的值。如果找到,系统会使用预定义的 Grok 模式将该值与 IP 地址模式进行匹配。如果在将值与 IP 地址模式进行匹配时出现错误,on_error 函数会将 not_valid_tgtip 属性设置为 True。如果匹配成功,则不会设置 not_valid_tgtip 属性。

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

更改 returnCode 和大小的数据类型

以下示例将 size 变量中的值转换为 uinteger,并将 returnCode 变量中的值转换为 integer。这是 因为 size 变量将保存到 network.received_bytes UDM 字段,用于存储 int64 数据类型。通过 returnCode 变量将保存到 network.http.response_code UDM 字段 用于存储 int32 数据类型。

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

在事件中为 UDM 字段分配值

提取并预处理值后,将它们分配给 UDM 中的字段 事件记录。您可以为 UDM 字段同时指定提取的值和静态值。

如果您填充了 event.disambiguation_key,请确保此字段对于 为给定日志生成的每个事件。如果两个不同的事件具有 同一个 disambiguation_key,则会导致 系统。

本部分中的解析器示例基于 Squid Web 代理日志示例 。

保存事件时间戳

必须为每条 UDM 事件记录设置 metadata.event_timestamp 值 UDM 字段。以下示例将从日志中提取的事件时间戳保存到 @timestamp 内置变量。Google Security Operations 会将此保存到 metadata.event_timestamp UDM 字段。

mutate {
  rename => {
    "when" => "timestamp"
  }
}

设置事件类型

必须为每条 UDM 事件记录设置 metadata.event_type UDM 的值 字段。此字段是枚举类型。此字段的值决定了必须填充哪些其他 UDM 字段才能保存 UDM 记录。如果任何必填字段都不包含有效数据,解析和标准化流程将会失败。

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

使用 replace 语句保存 usernamemethod

usernamemethod 中间字段中的值是字符串。以下示例会检查是否存在有效值,如果存在,则将 username 值存储到 principal.user.userid UDM 字段,并将 method 值存储到 network.http.method UDM 字段。

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

action 保存到 security_result.action UDM 字段

在上一部分中,action 中间变量中的值是 求值并转换为 security_result.action UDM 字段的一个标准值。

security_resultaction UDM 字段都存储项数组, 也就是说,在保存 值。

首先,将转换后的值保存到中间 security_result.action 字段。security_result 字段是 action 字段的父字段。

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

接下来,将中间 security_result.action 中间字段保存到 security_result UDM 字段。security_result UDM 字段用于存储 项,因此该值将附加到此字段。

# save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

使用 merge 语句存储目标 IP 地址和来源 IP 地址

将以下值存储到 UDM 事件记录中:

  • srcip 中间变量的值传递给 principal.ip UDM 字段。
  • tgtip 中间变量的值传递给 target.ip UDM 字段。

principal.iptarget.ip UDM 字段都存储项数组,因此 值会附加到每个字段。

以下示例展示了保存这些值的不同方法。 在转换步骤期间,系统使用预定义的 Grok 模式将 tgtip 中间变量与 IP 地址进行匹配。以下示例语句用于检查 not_valid_tgtip 属性是否为 true,表示 tgtip 值无法与 IP 地址模式匹配。如果为 false,则会将 tgtip 值保存到 target.ip UDM 字段。

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

未转换 srcip 中间变量。以下声明 检查某个值是否从原始日志中提取,如果是,则保存 principal.ip UDM 字段的值。

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

使用 rename 语句保存 urlreturnCodesize

以下示例语句使用 rename 语句存储以下值。

  • url 变量已保存到 target.url UDM 字段。
  • 保存到 network.http.response_code UDM 字段的 returnCode 中间变量。
  • 保存到 network.received_bytes UDM 字段的 size 中间变量。
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

将 UDM 记录绑定到输出

数据映射指令中的最后一条语句会将处理后的数据输出到 UDM 事件记录。

mutate {
    merge => {
      "@output" => "event"
    }
  }

完整的解析器代码

以下是完整的解析器代码示例。说明的顺序与本文档前面的部分不同,但会产生相同的输出。

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }


if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter