编写解析器时的提示和问题排查

支持以下语言:

本文档介绍了您在编写解析器代码时可能遇到的问题。

在编写解析器代码时,如果解析指令无法按预期运行,您可能会遇到错误。以下情况可能会生成错误:

  • Grok 模式失败
  • renamereplace 操作失败
  • 解析器代码中存在语法错误

解析器代码中的常见做法

以下各部分介绍了最佳实践、提示和解决方案 以帮助排查问题

避免在变量名称中使用英文句点或连字符

在变量名称中使用连字符和英文句点可能会导致意外行为,通常是在执行 merge 操作以将值存储在 UDM 字段中时。您可能还会遇到间歇性解析问题。

例如,请勿使用以下变量名称:

  • my.variable.result
  • my-variable-result

请改用以下变量名称:my_variable_result

请勿使用具有特殊含义的字词作为变量名称

某些字词(例如 eventtimestamp)在解析器代码中可能具有特殊含义。

字符串 event 通常用于表示单个 UDM 记录,并在 @output 语句中使用。如果日志消息包含名为 event 的字段,或者您定义了名为 event 的中间变量,并且解析器代码在 @output 语句中使用了 event 字样,您将收到一条关于名称冲突的错误消息。

将中间变量重命名为其他名称,或在 UDM 字段名称和 @output 语句中使用 event1 作为前缀。

timestamp 表示原始原始日志的创建时间戳。在此中间变量中设置的值会保存到 metadata.event_timestamp UDM 字段。术语 @timestamp 表示 以及解析原始日志以创建 UDM 记录的时间。

以下示例将 metadata.event_timestamp UDM 字段设置为日期 和解析原始日志的时间。

 # Save the log parse date and time to the timestamp variable
  mutate {
     rename => {
       "@timestamp" => "timestamp"
     }
   }

以下示例将 metadata.event_timestamp UDM 字段设置为日期, 从原始原始日志中提取的时间,并存储在 when 中间 变量。

   # Save the event timestamp to timestamp variable
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

请勿将以下术语用作变量:

  • collectiontimestamp
  • createtimestamp
  • 事件
  • filename
  • 消息
  • 命名空间
  • 输出
  • onerrorcount
  • 时间戳
  • timezone

将每个数据值存储在单独的 UDM 字段中

请勿在一个 UDM 字段中存储多个字段,方法是使用 分隔符。下面给出了一个示例:

"principal.user.first_name" => "first:%{first_name},last:%{last_name}"

而是将每个值存储在单独的 UDM 字段中。

"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"

在代码中使用空格而非制表符

不要在解析器代码中使用制表符。请仅使用空格,每次缩进 2 个空格。

不要在一次操作中执行多个合并操作

如果您在一次操作中合并多个字段,则可能会导致 结果不一致。而是将 merge 语句放入单独的操作中。

例如,替换以下示例:

mutate {
  merge => {
      "security_result.category_details" => "category_details"
      "security_result.category_details" => "super_category_details"
  }
}

如下所示:

mutate {
  merge => {
    "security_result.category_details" => "category_details"
  }
}

mutate {
  merge => {
    "security_result.category_details" => "super_category_details"
  }
}

选择 ifif else 条件表达式

如果您要测试的条件值只能有一个匹配项,请使用 if else 条件语句。这种方法效率稍高。但是,在某些情况下,测试的值可能与 多次使用多种不同的 if 语句,并对语句进行排序 从最普遍的情况到最具体的情况。

选择一组具有代表性的日志文件来测试解析器更改

最佳实践是使用各种格式的原始日志示例测试解析器代码。这样,您就可以找到解析器可能需要处理的唯一日志或极端情况。

向解析器代码添加描述性注释

向解析器代码添加注释,以说明该语句为何重要, 与语句的作用完全不同此注释对维护解析器的任何人都有帮助 按照流程操作下面给出了一个示例:

# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
  mutate {
    replace => {
      "event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
    }
  }
}

尽早初始化中间变量

在从原始原始日志中提取值之前,请初始化中间步骤 用于存储测试值的变量。

这样可以防止返回指示中间变量的错误 不存在。

以下语句将 product 变量中的值分配给 metadata.product_name UDM 字段。

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
  }
}

如果 product 变量不存在,您会收到以下错误:

"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"

您可以添加 on_error 语句来捕获错误。下面给出了一个示例:

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  on_error => "_error_does_not_exist"
  }

上述示例语句成功将解析错误捕获到一个名为 _error_does_not_exist 的布尔值中间变量中。它不会 使您能够在条件语句中使用 product 变量,例如 if。 下面给出了一个示例:

if [product] != "" {
  mutate{
    replace => {
      "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  }
  on_error => "_error_does_not_exist"
}

上面的示例返回以下错误,因为 if 条件子句 不支持 on_error 语句:

"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"

要解决此问题,请单独添加一个语句块,用于初始化 变量,然后再执行提取过滤器(jsoncsvxmlkvgrok)语句。 下面给出了一个示例。

filter {
  # Initialize intermediate variables for any field you will use for a conditional check
  mutate {
    replace => {
      "timestamp" => ""
      "does_not_exist" => ""
    }
  }

  # load the logs fields from the message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }
}

更新后的解析器代码段使用 条件语句来检查字段是否存在。此外,on_error 语句会处理可能遇到的错误。

将 SHA-256 转换为 base64

以下示例会提取 SHA-256 值,将其编码为 base64,将编码后的数据转换为十六进制字符串,然后将特定字段替换为提取和处理后的值。

if [Sha256] != "" 
{
  base64
  {
  encoding => "RawStandard"
  source => "Sha256"
  target => "base64_sha256"
  on_error => "base64_message_error"
  }
  mutate
  {
    convert =>
    {
      "base64_sha256" => "bytestohex"
    }
    on_error => "already_a_string"
  }
  mutate
  {
    replace => 
  {
     "event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
     "event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
  }
  }
}

处理解析器语句中的错误

传入日志采用非预期的日志格式或 包含格式有误的数据。

您可以构建解析器来处理这些错误。最佳实践是将 on_error 处理脚本添加到提取过滤器,然后测试中间变量,然后再继续执行解析器逻辑的下一部分。

以下示例将 json 提取过滤器与 on_error 搭配使用 语句设置 _not_json 布尔值变量。如果 _not_json 设置为 true,则表示传入的日志条目不是有效的 JSON 格式,并且日志条目未成功解析。如果 _not_json 变量为 false,则表明传入的日志条目采用有效的 JSON 格式。

 # load the incoming log from the default message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }

您还可以测试字段的格式是否正确。以下示例 检查 _not_json 是否设置为 true,这表明日志不在 预期格式。

 # Test that the received log matches the expected format
  if [_not_json] {
    drop { tag => "TAG_MALFORMED_MESSAGE" }
  } else {
    # timestamp is always expected
    if [timestamp] != "" {

      # ...additional parser logic goes here …

    } else {

      # if the timestamp field does not exist, it's not a log source
      drop { tag => "TAG_UNSUPPORTED" }
    }
  }

这样可以确保,如果日志的格式不正确,系统不会在提取指定日志类型的日志时解析失败。

drop 过滤条件与 tag 变量结合使用,以便在 BigQuery 中的提取指标表

  • TAG_UNSUPPORTED
  • TAG_MALFORMED_ENCODING
  • TAG_MALFORMED_MESSAGE
  • TAG_NO_SECURITY_VALUE

drop 过滤器会阻止解析器处理原始日志、标准化字段和创建 UDM 记录。原始原始日志仍会被提取到 Google Security Operations 中 可以在 Google Security Operations 中使用原始日志搜索进行搜索。

传递给 tag 变量的值存储在 drop_reason_code 的字段中的 注入指标表。您可以对该表运行临时查询,如下所示:

SELECT
  log_type,
  drop_reason_code,
  COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC

排查验证错误

构建解析器时,您可能会遇到与验证相关的错误, 例如,未在 UDM 记录中设置必填字段。错误可能如下所示:

Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"

解析器代码成功执行,但生成的 UDM 记录未成功执行 包含所有必需的 UDM 字段,该字段由设置为 metadata.event_type 的值定义。通过 以下是可能导致此错误的其他示例:

  • 如果 metadata.event_typeUSER_LOGIN,并且未设置 target.user value UDM 字段。
  • 如果 metadata.event_typeNETWORK_CONNECTION,且 target.hostname未设置 UDM 字段。

如需详细了解 metadata.event_type UDM 字段和必填字段,请参阅 UDM 使用指南

若要排查此类错误,一种方法是先将静态值设置为 UDM 字段。定义所需的所有 UDM 字段后,请检查原始原始日志,了解要解析并保存到 UDM 记录的值。如果原始原始日志 包含某些字段,那么您可能需要设置默认值。

以下是一个特定于 USER_LOGIN 事件类型的示例模板, 说明了这种方法。

请注意以下事项:

  • 该模板会初始化中间变量,并将每个变量设置为静态字符串。
  • Field Assignment(字段分配)部分下的代码会将中间变量中的值设置为 UDM 字段。

您可以通过添加其他中间变量和 UDM 字段来扩展此代码。 确定必须填充的所有 UDM 字段后,请执行以下操作:

  • Input Configuration 部分下,添加用于从原始原始日志中提取字段并将值设置为中间变量的代码。

  • 日期提取部分下,添加用于提取事件时间戳的代码 转换该日志,并将其设置为中间变量。

  • 根据需要,将每个中间变量中设置的初始化值替换为空字符串。

filter {
 mutate {
   replace => {
     # UDM > Metadata
     "metadata_event_timestamp"    => ""
     "metadata_vendor_name"        => "Example"
     "metadata_product_name"       => "Example SSO"
     "metadata_product_version"    => "1.0"
     "metadata_product_event_type" => "login"
     "metadata_product_log_id"     => "12345678"
     "metadata_description"        => "A user logged in."
     "metadata_event_type"         => "USER_LOGIN"

     # UDM > Principal
     "principal_ip"       => "192.168.2.10"

     # UDM > Target
     "target_application"            => "Example Connect"
     "target_user_user_display_name" => "Mary Smith"
     "target_user_userid"            => "mary@example.com"

     # UDM > Extensions
     "auth_type"          => "SSO"
     "auth_mechanism"     => "USERNAME_PASSWORD"

     # UDM > Security Results
     "securityResult_action"         => "ALLOW"
     "security_result.severity"       => "LOW"

   }
 }

 # ------------ Input Configuration  --------------
  # Extract values from the message using one of the extraction filters: json, kv, grok

 # ------------ Date Extract  --------------
 # If the  date {} function is not used, the default is the normalization process time

  # ------------ Field Assignment  --------------
  # UDM Metadata
  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.vendor_name"        =>  "%{metadata_vendor_name}"
      "event1.idm.read_only_udm.metadata.product_name"       =>  "%{metadata_product_name}"
      "event1.idm.read_only_udm.metadata.product_version"    =>  "%{metadata_product_version}"
      "event1.idm.read_only_udm.metadata.product_event_type" =>  "%{metadata_product_event_type}"
      "event1.idm.read_only_udm.metadata.product_log_id"     =>  "%{metadata_product_log_id}"
      "event1.idm.read_only_udm.metadata.description"        =>  "%{metadata_description}"
      "event1.idm.read_only_udm.metadata.event_type"         =>  "%{metadata_event_type}"
    }
  }

  # Set the UDM > auth fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.extensions.auth.type"        => "%{auth_type}"
    }
    merge => {
      "event1.idm.read_only_udm.extensions.auth.mechanism"   => "auth_mechanism"
    }
  }

  # Set the UDM > principal fields
  mutate {
    merge => {
      "event1.idm.read_only_udm.principal.ip"                => "principal_ip"
    }
  }

  # Set the UDM > target fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.target.user.userid"             =>  "%{target_user_userid}"
      "event1.idm.read_only_udm.target.user.user_display_name"  =>  "%{target_user_user_display_name}"
      "event1.idm.read_only_udm.target.application"             =>  "%{target_application}"
    }
  }

  # Set the UDM > security_results fields
  mutate {
    merge => {
      "security_result.action" => "securityResult_action"
    }
  }

  # Set the security result
  mutate {
    merge => {
      "event1.idm.read_only_udm.security_result" => "security_result"
    }
  }

 # ------------ Output the event  --------------
  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

使用 Grok 函数解析非结构化文本

使用 Grok 函数从非结构化文本中提取值时,您可以使用预定义的 Grok 模式和正则表达式语句。Grok 模式可让代码更易于阅读。如果正则表达式不包含缩写字符(例如 \w\s),您可以直接将语句复制并粘贴到解析器代码中。

由于 Grok 模式是语句中的额外抽象层,因此当您遇到错误时,它们可能会使问题排查变得更加复杂。下面是一个示例 包含预定义 Grok 模式和正则表达式的 Grok 函数。

grok {
  match => {
    "message" => [
      "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
    ]
  }
}

不使用 Grok 模式的提取语句的性能可能会更好。例如,以下示例只需不到一半的处理步骤即可匹配。对于可能数量庞大的日志源,这是一个重要的考虑因素。

了解 RE2 和 PCRE 正则表达式之间的区别

Google Security Operations 解析器使用 RE2 作为正则表达式引擎。如果您熟悉 PCRE 语法, 会发现差异示例如下:

以下是 PCRE 语句:(?<_custom_field>\w+)\s

以下是解析器代码的 RE2 语句:(?P<_custom_field>\\w+)\\s

请务必对转义字符进行转义。

Google 安全运营团队会以 JSON 编码格式存储传入的原始日志数据。这是为了 请确保看似为正则表达式简写的字符串 会被解释为字面量字符串。例如,\t 会被解释为字面量字符串,而不是制表符。

以下示例是原始原始日志和 JSON 编码格式日志。请注意,每个反斜杠字符前面添加了转义字符 用字词“entry”括起来。

以下是原始的原始日志:

field=\entry\

以下是转换为 JSON 编码格式的日志:

field=\\entry\\

在解析器代码中使用正则表达式时,如果您只想提取值,则必须添加额外的转义字符。如需匹配原始原始日志中的反斜杠,请在提取语句中使用四个反斜杠。

以下是解析器代码的正则表达式:

^field=\\\\(?P<_value>.*)\\\\$

生成的结果如下。_value 命名组用于存储术语 entry

"_value": "entry"

将标准正则表达式语句移入解析器代码中时,使用转义 提取语句中的正则表达式简写字符。 例如,将 \s 更改为 \\s

在提取语句中进行双重转义时,将正则表达式特殊字符保持不变。例如,\\ 保持不变,为 \\

以下是标准正则表达式:

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

以下正则表达式经过修改,可在解析器代码中正常运行。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

下表总结了标准正则表达式在什么情况下必须包含 添加额外的转义字符。

正则表达式 经过修改的解析器代码的正则表达式 变更说明
\s
\\s
简写字符必须进行转义。
\.
\\.
预留字符必须转义。
\\"
\\\"
预留字符必须转义。
\]
\\]
预留字符必须转义。
\|
\\|
必须对预留字符进行转义。
[^\\]+
[^\\\\]+
字符类组中的特殊字符必须 已转义。
\\\\
\\\\
字符类组之外的特殊字符,或 简写字符不需要额外转义。

正则表达式必须包含已命名的捕获组

正则表达式(例如 "^.*$")是有效的 RE2 语法。不过,在解析器代码中,它会失败并显示以下错误:

"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"

您必须向表达式添加有效的捕获组。如果您使用 Grok 模式,则这些模式默认包含命名的捕获组。使用正则表达式替换项时,请务必添加命名组。

以下是解析器代码中的正则表达式示例:

"^(?P<_catchall>.*$)"

结果如下所示,显示了分配给 _catchall 命名组的文本。

"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."

从构建表达式开始,使用一个无限别名命名群组

构建提取语句时,请先选择一个捕获 超乎您的想象然后,一次展开一个字段。

以下示例首先使用与整个消息匹配的命名组 (_catchall)。然后,它会通过匹配文本的其他部分,分步构建表达式。随着每个步骤的执行,名为 _catchall 的组包含的原始文本会越来越少。继续并逐步迭代,逐步达到 匹配邮件,直到您不再需要 _catchall 命名的群组为止。

步骤 解析器代码中的正则表达式 _catchall 命名捕获组的输出
1
"^(?P<_catchall>.*$)"
User \"BOB\" logged on to workstation \"DESKTOP-01\".
2
^User\s\\\"(?P<_catchall>.*$)
BOB\" logged on to workstation \"DESKTOP-01\".
3
^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$)
logged on to workstation \"DESKTOP-01\".
继续执行,直到表达式与整个文本字符串匹配。

对正则表达式中的缩写字符进行转义

在解析器代码中使用正则表达式时,请务必对正则表达式缩写字符进行转义。以下是文本字符串示例和用于提取第一个字词 This 的标准正则表达式。

  This is a sample log.

以下标准正则表达式会提取第一个单词 This。 但是,在解析器代码中运行此表达式时,结果中缺少字母 s

标准正则表达式 _firstWord 命名捕获组的输出
"^(?P<_firstWord>[^\s]+)\s.*$" "_firstWord": "Thi",

这是因为解析器代码中的正则表达式需要在缩写字符中添加额外的转义字符。在前面的示例中,\s 必须更改为 \\s

修改了解析器代码的正则表达式 _firstWord 命名的捕获组的输出
"^(?P<_firstWord>[^\\s]+)\\s.*$" "_firstWord": "This",

这仅适用于缩写字符,例如 \s\r\t。其他字符(例如 ``)无需进一步转义。

完整示例

本部分将前面的规则作为端到端示例进行介绍。这里有 非结构化文本字符串,以及为解析 字符串。最后是经过修改的正则表达式, 。

以下是原始文本字符串。

User "BOB" logged on to workstation "DESKTOP-01".

以下是用于解析文本字符串的标准 RE2 正则表达式。

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

此表达式将提取以下字段。

匹配组 字符位置 文本字符串
完全匹配 0-53
User \"BOB\" logged on to workstation \"DESKTOP-01\".
“_user”群组 7-10
BOB
第 2 组。 13-22
logged on
组“_device” 40-50
DESKTOP-01

这是经过修改的表达式。标准 RE2 正则表达式已修改为在解析器代码中运行。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$