Sugestões e resolução de problemas ao escrever analisadores
Este documento descreve os problemas que pode encontrar quando escreve código de análise sintática.
Ao escrever código de análise, pode encontrar erros quando as instruções de análise não funcionam como esperado. As situações que podem gerar erros incluem o seguinte:
- Um padrão
Grok
falha - Uma operação
rename
oureplace
falha - Erros de sintaxe no código do analisador
Práticas comuns no código do analisador
As secções seguintes descrevem as práticas recomendadas, as sugestões e as soluções para ajudar a resolver problemas.
Evite usar pontos ou travessões nos nomes das variáveis
A utilização de hífenes e pontos nos nomes das variáveis pode causar um comportamento inesperado, geralmente quando realiza operações merge
para armazenar valores em campos de UDM. Também pode encontrar problemas de análise intermitentes.
Por exemplo, não use os seguintes nomes de variáveis:
my.variable.result
my-variable-result
Em alternativa, use o seguinte nome de variável: my_variable_result
.
Não use termos com significado especial como nome de variável
Determinadas palavras, como event
e timestamp
, podem ter um significado especial no código do analisador.
A string event
é frequentemente usada para representar um único registo de UDM e é usada na declaração @output
. Se uma mensagem de registo incluir um campo denominado event
ou se definir uma variável intermédia denominada event
e o código do analisador usar a palavra event
na declaração @output
, recebe uma mensagem de erro sobre um conflito de nomes.
Mude o nome da variável intermédia para outra coisa ou use o termo event1
como prefixo nos nomes dos campos da UDM e na declaração @output
.
A palavra timestamp
representa a data/hora de criação do registo não processado original. Um valor definido nesta variável intermédia é guardado no campo metadata.event_timestamp
UDM. O termo @timestamp
representa a data e a hora em que o registo não processado foi analisado para criar um registo de UDM.
O exemplo seguinte define o campo metadata.event_timestamp
UDM para a data e a hora em que o registo não processado foi analisado.
# Save the log parse date and time to the timestamp variable
mutate {
rename => {
"@timestamp" => "timestamp"
}
}
O exemplo seguinte define o campo metadata.event_timestamp
UDM para a data e a hora extraídas do registo não processado original e armazenadas na variável when
intermédia.
# Save the event timestamp to timestamp variable
mutate {
rename => {
"when" => "timestamp"
}
}
Não use os seguintes termos como variáveis:
- collectiontimestamp
- createtimestamp
- evento
- nome de ficheiro
- mensagem
- espaço de nome
- saída
- onerrorcount
- timestamp
- fuso horário
Armazene cada valor de dados num campo da UDM separado
Não armazene vários campos num único campo de UDM concatenando-os com um delimitador. Segue-se um exemplo:
"principal.user.first_name" => "first:%{first_name},last:%{last_name}"
Em alternativa, armazene cada valor num campo da UDM separado.
"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"
Use espaços em vez de tabulações no código
Não use separadores no código do analisador. Use apenas espaços e avance 2 espaços de cada vez.
Não execute várias ações de união numa única operação
Se unir vários campos numa única operação, isto pode gerar resultados inconsistentes. Em alternativa, coloque as declarações merge
em operações separadas.
Por exemplo, substitua o seguinte exemplo:
mutate {
merge => {
"security_result.category_details" => "category_details"
"security_result.category_details" => "super_category_details"
}
}
Com esta opção:
mutate {
merge => {
"security_result.category_details" => "category_details"
}
}
mutate {
merge => {
"security_result.category_details" => "super_category_details"
}
}
Escolher expressões condicionais if
em vez de if else
Se o valor condicional que está a testar só puder ter uma correspondência,
use a declaração condicional if else
. Esta abordagem é ligeiramente mais
eficiente. No entanto, se tiver um cenário em que o valor testado possa corresponder mais
do que uma vez, use várias declarações if
distintas e ordene as declarações
do caso mais genérico para o caso mais específico.
Escolha um conjunto representativo de ficheiros de registo para testar as alterações do analisador
Uma prática recomendada é testar o código do analisador com exemplos de registos não processados com uma grande variedade de formatos. Isto permite-lhe encontrar registos únicos ou casos extremos que o analisador pode ter de processar.
Adicione comentários descritivos ao código do analisador
Adicione comentários ao código do analisador que expliquem por que motivo a declaração é importante, em vez de o que a declaração faz. O comentário ajuda qualquer pessoa que faça a manutenção do analisador a seguir o fluxo. Segue-se um exemplo:
# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
mutate {
replace => {
"event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
}
}
}
Inicialize as variáveis intermédias antecipadamente
Antes de extrair valores do registo não processado original, inicialize as variáveis intermédias que vão ser usadas para armazenar valores de teste.
Isto evita que seja devolvido um erro a indicar que a variável intermédia não existe.
A seguinte declaração atribui o valor na variável product
ao campo da UDM metadata.product_name
.
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
Se a variável product
não existir, recebe o seguinte erro:
"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"
Pode adicionar uma declaração on_error
para detetar o erro. Segue-se um exemplo:
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
on_error => "_error_does_not_exist"
}
A declaração de exemplo anterior deteta com êxito o erro de análise numa variável intermédia booleana, denominada _error_does_not_exist
. Não lhe permite usar a variável product
numa declaração condicional, por exemplo, if
.
Segue-se um exemplo:
if [product] != "" {
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
on_error => "_error_does_not_exist"
}
O exemplo anterior devolve o seguinte erro porque a cláusula condicional if
não suporta declarações on_error
:
"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"
Para resolver este problema, adicione um bloco de declarações separado que inicialize as variáveis intermédias antes de executar as declarações do filtro de extração (json
, csv
, xml
, kv
ou grok
).
Segue-se um exemplo.
filter {
# Initialize intermediate variables for any field you will use for a conditional check
mutate {
replace => {
"timestamp" => ""
"does_not_exist" => ""
}
}
# load the logs fields from the message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
}
O fragmento atualizado do código do analisador processa os vários cenários através de uma declaração condicional para verificar se o campo existe. Além disso, a declaração
on_error
processa os erros que podem ser encontrados.
Converta SHA-256 em base64
O exemplo seguinte extrai o valor SHA-256, codifica-o em base64, converte os dados codificados numa string hexadecimal e, em seguida, substitui campos específicos pelos valores extraídos e processados.
if [Sha256] != ""
{
base64
{
encoding => "RawStandard"
source => "Sha256"
target => "base64_sha256"
on_error => "base64_message_error"
}
mutate
{
convert =>
{
"base64_sha256" => "bytestohex"
}
on_error => "already_a_string"
}
mutate
{
replace =>
{
"event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
"event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
}
}
}
Trate erros em declarações do analisador
Não é invulgar que os registos recebidos estejam num formato de registo inesperado ou tenham dados com formatação incorreta.
Pode criar o analisador para processar estes erros. Uma prática recomendada é adicionar controladores on_error
ao filtro de extração e, em seguida, testar a variável intermédia antes de avançar para o segmento seguinte da lógica do analisador.
O exemplo seguinte usa o filtro de extração json
com uma declaração on_error
para definir a variável booleana _not_json
. Se _not_json
estiver definido como true
, significa que a entrada de registo recebida não estava num formato JSON válido e que a entrada de registo não foi analisada com êxito. Se a variável _not_json
for false
,
a entrada de registo recebida estava num formato JSON válido.
# load the incoming log from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
Também pode testar se um campo está no formato correto. O exemplo seguinte
verifica se _not_json
está definido como true
, o que indica que o registo não estava no
formato esperado.
# Test that the received log matches the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
# timestamp is always expected
if [timestamp] != "" {
# ...additional parser logic goes here …
} else {
# if the timestamp field does not exist, it's not a log source
drop { tag => "TAG_UNSUPPORTED" }
}
}
Isto garante que a análise não falha se os registos forem carregados com um formato incorreto para o tipo de registo especificado.
Use o filtro drop
com a variável tag
para que a condição seja capturada na
tabela de métricas de carregamento no BigQuery.
TAG_UNSUPPORTED
TAG_MALFORMED_ENCODING
TAG_MALFORMED_MESSAGE
TAG_NO_SECURITY_VALUE
O filtro drop
impede que o analisador processe o registo não processado, normalize os campos e crie um registo de UDM. O registo não processado original continua a ser carregado para o Google Security Operations
e pode ser pesquisado através da pesquisa de registos não processados no Google SecOps.
O valor transmitido à variável tag
é armazenado no campo drop_reason_code
' na tabela de métricas de carregamento. Pode executar uma consulta ad hoc na tabela semelhante à seguinte:
SELECT
log_type,
drop_reason_code,
COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC
Resolva problemas de erros de validação
Ao criar um analisador, pode encontrar erros relacionados com a validação, por exemplo, um campo obrigatório não está definido no registo de UDM. O erro pode ter um aspeto semelhante ao seguinte:
Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"
O código do analisador é executado com êxito, mas o registo de UDM gerado não inclui todos os campos de UDM necessários, conforme definido pelo conjunto de valores para o metadata.event_type
. Seguem-se exemplos adicionais que podem causar este erro:
- Se o
metadata.event_type
forUSER_LOGIN
e o campo UDMtarget.user value
não estiver definido. - Se o
metadata.event_type
forNETWORK_CONNECTION
e o campotarget.hostname
UDM não estiver definido.
Para mais informações sobre o campo metadata.event_type
UDM e os campos obrigatórios, consulte o guia de utilização do UDM.
Uma opção para resolver este tipo de erro é começar por definir valores estáticos para os campos da UDM. Depois de definir todos os campos da UDM necessários, examine o registo não processado original para ver que valores analisar e guardar no registo da UDM. Se o registo não processado original não contiver determinados campos, pode ter de definir valores predefinidos.
Segue-se um modelo de exemplo, específico de um tipo de evento USER_LOGIN
, que
ilustra esta abordagem.
Tenha em atenção o seguinte:
- O modelo inicializa as variáveis intermédias e define cada uma como uma string estática.
- O código na secção Atribuição de campos define os valores nas variáveis intermédias para campos de UDM.
Pode expandir este código adicionando variáveis intermédias e campos da UDM adicionais. Depois de identificar todos os campos de UDM que têm de ser preenchidos, faça o seguinte:
Na secção Configuração de entrada, adicione código que extrai campos do registo não processado original e define os valores para as variáveis intermédias.
Na secção Extração de data, adicione código que extrai a data/hora do evento do registo não processado original, transforma-a e define-a para a variável intermédia.
Conforme necessário, substitua o valor inicializado definido em cada variável intermédia por uma string vazia.
filter {
mutate {
replace => {
# UDM > Metadata
"metadata_event_timestamp" => ""
"metadata_vendor_name" => "Example"
"metadata_product_name" => "Example SSO"
"metadata_product_version" => "1.0"
"metadata_product_event_type" => "login"
"metadata_product_log_id" => "12345678"
"metadata_description" => "A user logged in."
"metadata_event_type" => "USER_LOGIN"
# UDM > Principal
"principal_ip" => "192.168.2.10"
# UDM > Target
"target_application" => "Example Connect"
"target_user_user_display_name" => "Mary Smith"
"target_user_userid" => "mary@example.com"
# UDM > Extensions
"auth_type" => "SSO"
"auth_mechanism" => "USERNAME_PASSWORD"
# UDM > Security Results
"securityResult_action" => "ALLOW"
"security_result.severity" => "LOW"
}
}
# ------------ Input Configuration --------------
# Extract values from the message using one of the extraction filters: json, kv, grok
# ------------ Date Extract --------------
# If the date {} function is not used, the default is the normalization process time
# ------------ Field Assignment --------------
# UDM Metadata
mutate {
replace => {
"event1.idm.read_only_udm.metadata.vendor_name" => "%{metadata_vendor_name}"
"event1.idm.read_only_udm.metadata.product_name" => "%{metadata_product_name}"
"event1.idm.read_only_udm.metadata.product_version" => "%{metadata_product_version}"
"event1.idm.read_only_udm.metadata.product_event_type" => "%{metadata_product_event_type}"
"event1.idm.read_only_udm.metadata.product_log_id" => "%{metadata_product_log_id}"
"event1.idm.read_only_udm.metadata.description" => "%{metadata_description}"
"event1.idm.read_only_udm.metadata.event_type" => "%{metadata_event_type}"
}
}
# Set the UDM > auth fields
mutate {
replace => {
"event1.idm.read_only_udm.extensions.auth.type" => "%{auth_type}"
}
merge => {
"event1.idm.read_only_udm.extensions.auth.mechanism" => "auth_mechanism"
}
}
# Set the UDM > principal fields
mutate {
merge => {
"event1.idm.read_only_udm.principal.ip" => "principal_ip"
}
}
# Set the UDM > target fields
mutate {
replace => {
"event1.idm.read_only_udm.target.user.userid" => "%{target_user_userid}"
"event1.idm.read_only_udm.target.user.user_display_name" => "%{target_user_user_display_name}"
"event1.idm.read_only_udm.target.application" => "%{target_application}"
}
}
# Set the UDM > security_results fields
mutate {
merge => {
"security_result.action" => "securityResult_action"
}
}
# Set the security result
mutate {
merge => {
"event1.idm.read_only_udm.security_result" => "security_result"
}
}
# ------------ Output the event --------------
mutate {
merge => {
"@output" => "event1"
}
}
}
Analise texto não estruturado com uma função Grok
Quando usa uma função Grok para extrair valores de texto não estruturado, pode usar padrões Grok predefinidos e declarações de expressões regulares. Compreender padrões
facilita a leitura do código. Se a expressão regular não incluir carateres abreviados (como \w
, \s
), pode copiar e colar a declaração diretamente no código do analisador.
Uma vez que os padrões Grok são uma camada de abstração adicional na declaração, podem tornar a resolução de problemas mais complexa quando encontra um erro. Segue-se um exemplo de uma função Grok que contém padrões Grok predefinidos e expressões regulares.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
}
Uma declaração de extração sem padrões Grok pode ter um melhor desempenho. Por exemplo, o exemplo seguinte requer menos de metade dos passos de tratamento para estabelecer correspondência. Para uma origem de registo com um volume potencialmente elevado, esta é uma consideração importante.
Compreenda as diferenças entre as expressões regulares RE2 e PCRE
Os analisadores do Google SecOps usam o RE2 como o motor de expressões regulares. Se tiver familiaridade com a sintaxe PCRE, pode notar diferenças. Segue-se um exemplo:
Segue-se uma declaração PCRE: (?<_custom_field>\w+)\s
Segue-se uma declaração RE2 para o código do analisador: (?P<_custom_field>\\w+)\\s
Certifique-se de que escapa aos carateres de escape
O Google SecOps armazena os dados de registo não processados recebidos no formato codificado JSON. Isto destina-se a garantir que as strings de carateres que parecem ser abreviaturas de expressões regulares são interpretadas como a string literal. Por exemplo, \t
é interpretado como a string literal, em vez de um caráter de tabulação.
O exemplo seguinte mostra um registo não processado original e o registo de formato codificado JSON.
Repare no caráter de escape adicionado antes de cada caráter de barra invertida
que envolve o termo entry
.
Segue-se o registo não processado original:
field=\entry\
Segue-se o registo convertido para o formato codificado JSON:
field=\\entry\\
Quando usa uma expressão regular no código do analisador, tem de adicionar carateres de escape adicionais se quiser extrair apenas o valor. Para fazer corresponder uma barra invertida no registo não processado original, use quatro barras invertidas na declaração de extração.
Segue-se uma expressão regular para o código do analisador:
^field=\\\\(?P<_value>.*)\\\\$
Segue-se o resultado gerado. O grupo denominado _value
armazena o termo entry
:
"_value": "entry"
Quando move uma declaração de expressão regular padrão para o código do analisador, escape os carateres abreviados de expressão regular na declaração de extração.
Por exemplo, altere \s
para \\s
.
Deixe os carateres especiais de expressão regular inalterados quando tiverem escape duplo na instrução de extração. Por exemplo, \\
permanece inalterado como \\
.
Segue-se uma expressão regular padrão:
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
A seguinte expressão regular é modificada para funcionar no código do analisador.
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
A tabela seguinte resume quando uma expressão regular padrão tem de incluir carateres de escape adicionais antes de a incluir no código do analisador.
Expressão regular | Expressão regular modificada para o código do analisador | Descrição da alteração |
---|---|---|
\s |
\\s |
Os carateres abreviados têm de ter um escape. |
\. |
\\. |
Os carateres reservados têm de ter escape. |
\\" |
\\\" |
Os carateres reservados têm de ter escape. |
\] |
\\] |
Os carateres reservados têm de ter escape. |
\| |
\\| |
Os carateres reservados têm de ter escape. |
[^\\]+ |
[^\\\\]+ |
Os carateres especiais num grupo de classes de carateres têm de ser interpretados de forma literal. |
\\\\ |
\\\\ |
Os carateres especiais fora de um grupo de classe de carateres ou os carateres abreviados não requerem uma escape adicional. |
As expressões regulares têm de incluir um grupo de captura com nome
Uma expressão regular, como "^.*$"
, é uma sintaxe RE2 válida. No entanto, no código do analisador, ocorre a seguinte falha:
"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"
Tem de adicionar um grupo de captura válido à expressão. Se usar padrões Grok, estes incluem um grupo de captura com nome por predefinição. Quando usar substituições de expressões regulares, certifique-se de que inclui um grupo com nome.
Segue-se um exemplo de uma expressão regular no código do analisador:
"^(?P<_catchall>.*$)"
Segue-se o resultado, que mostra o texto atribuído ao grupo denominado _catchall
.
"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."
Use um grupo com nome genérico para começar à medida que cria a expressão
Quando criar uma declaração de extração, comece com uma expressão que capte mais do que pretende. Em seguida, expanda a expressão um campo de cada vez.
O exemplo seguinte começa por usar um grupo com nome (_catchall
) que corresponde à mensagem completa. Em seguida, cria a expressão passo a passo fazendo corresponder
porções adicionais do texto. A cada passo, o grupo nomeado _catchall
contém menos texto original. Continue e repita um passo de cada vez para fazer corresponder a mensagem até já não precisar do grupo denominado _catchall
.
Passo | Expressão regular no código do analisador | Saída do grupo de captura denominado _catchall |
---|---|---|
1 | "^(?P<_catchall>.*$)" |
User \"BOB\" logged on to workstation \"DESKTOP-01\". |
2 | ^User\s\\\"(?P<_catchall>.*$) |
BOB\" logged on to workstation \"DESKTOP-01\". |
3 | ^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$) |
logged on to workstation \"DESKTOP-01\". |
Continue até que a expressão corresponda a toda a string de texto. |
Escape carateres abreviados na expressão regular
Lembre-se de usar carateres abreviados de expressões regulares com carateres de escape quando usar a expressão no código do analisador. Segue-se um exemplo de uma string de texto e da expressão regular padrão que extrai a primeira palavra, This
.
This is a sample log.
A seguinte expressão regular padrão extrai a primeira palavra, This
.
No entanto, quando executa esta expressão no código do analisador, o resultado não tem a letra
s
.
Expressão regular padrão | Saída do grupo de captura denominado _firstWord |
---|---|
"^(?P<_firstWord>[^\s]+)\s.*$" |
"_firstWord": "Thi", |
Isto deve-se ao facto de as expressões regulares no código do analisador exigirem um caráter de escape adicional adicionado aos carateres abreviados. No exemplo anterior, \s
tem de ser alterado para \\s
.
Expressão regular revista para o código do analisador | Saída do grupo de captura denominado _firstWord |
---|---|
"^(?P<_firstWord>[^\\s]+)\\s.*$" |
"_firstWord": "This", |
Isto aplica-se apenas aos carateres abreviados, como \s
, \r
e \t
.
Não é necessário aplicar mais escapes a outros carateres, como ``,.
Um exemplo completo
Esta secção descreve as regras anteriores como um exemplo integral. Segue-se uma string de texto não estruturada e a expressão regular padrão escrita para analisar a string. Por último, inclui a expressão regular modificada que funciona no código do analisador.
Segue-se a string de texto original.
User "BOB" logged on to workstation "DESKTOP-01".
Segue-se uma expressão regular RE2 padrão que analisa a string de texto.
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
Esta expressão extrai os seguintes campos.
Grupo de correspondências | Posição do caráter | String de texto |
---|---|---|
Correspondência total | 0-53 | User \"BOB\" logged on to workstation \"DESKTOP-01\". |
Grupo `_user` | 7-10 | BOB |
Grupo 2. | 13-22 | logged on |
Grupo `_device` | 40-50 | DESKTOP-01 |
Esta é a expressão modificada. A expressão regular RE2 padrão foi modificada para funcionar no código do analisador.
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.