Sugestões e resolução de problemas ao escrever analisadores

Compatível com:

Este documento descreve os problemas que pode encontrar quando escreve código de análise sintática.

Ao escrever código de análise, pode encontrar erros quando as instruções de análise não funcionam como esperado. As situações que podem gerar erros incluem o seguinte:

  • Um padrão Grok falha
  • Uma operação rename ou replace falha
  • Erros de sintaxe no código do analisador

Práticas comuns no código do analisador

As secções seguintes descrevem as práticas recomendadas, as sugestões e as soluções para ajudar a resolver problemas.

Evite usar pontos ou travessões nos nomes das variáveis

A utilização de hífenes e pontos nos nomes das variáveis pode causar um comportamento inesperado, geralmente quando realiza operações merge para armazenar valores em campos de UDM. Também pode encontrar problemas de análise intermitentes.

Por exemplo, não use os seguintes nomes de variáveis:

  • my.variable.result
  • my-variable-result

Em alternativa, use o seguinte nome de variável: my_variable_result.

Não use termos com significado especial como nome de variável

Determinadas palavras, como event e timestamp, podem ter um significado especial no código do analisador.

A string event é frequentemente usada para representar um único registo de UDM e é usada na declaração @output. Se uma mensagem de registo incluir um campo denominado event ou se definir uma variável intermédia denominada event e o código do analisador usar a palavra event na declaração @output, recebe uma mensagem de erro sobre um conflito de nomes.

Mude o nome da variável intermédia para outra coisa ou use o termo event1 como prefixo nos nomes dos campos da UDM e na declaração @output.

A palavra timestamp representa a data/hora de criação do registo não processado original. Um valor definido nesta variável intermédia é guardado no campo metadata.event_timestamp UDM. O termo @timestamp representa a data e a hora em que o registo não processado foi analisado para criar um registo de UDM.

O exemplo seguinte define o campo metadata.event_timestampUDM para a data e a hora em que o registo não processado foi analisado.

 # Save the log parse date and time to the timestamp variable
  mutate {
     rename => {
       "@timestamp" => "timestamp"
     }
   }

O exemplo seguinte define o campo metadata.event_timestamp UDM para a data e a hora extraídas do registo não processado original e armazenadas na variável when intermédia.

   # Save the event timestamp to timestamp variable
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

Não use os seguintes termos como variáveis:

  • collectiontimestamp
  • createtimestamp
  • evento
  • nome de ficheiro
  • mensagem
  • espaço de nome
  • saída
  • onerrorcount
  • timestamp
  • fuso horário

Armazene cada valor de dados num campo da UDM separado

Não armazene vários campos num único campo de UDM concatenando-os com um delimitador. Segue-se um exemplo:

"principal.user.first_name" => "first:%{first_name},last:%{last_name}"

Em alternativa, armazene cada valor num campo da UDM separado.

"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"

Use espaços em vez de tabulações no código

Não use separadores no código do analisador. Use apenas espaços e avance 2 espaços de cada vez.

Não execute várias ações de união numa única operação

Se unir vários campos numa única operação, isto pode gerar resultados inconsistentes. Em alternativa, coloque as declarações merge em operações separadas.

Por exemplo, substitua o seguinte exemplo:

mutate {
  merge => {
      "security_result.category_details" => "category_details"
      "security_result.category_details" => "super_category_details"
  }
}

Com esta opção:

mutate {
  merge => {
    "security_result.category_details" => "category_details"
  }
}

mutate {
  merge => {
    "security_result.category_details" => "super_category_details"
  }
}

Escolher expressões condicionais if em vez de if else

Se o valor condicional que está a testar só puder ter uma correspondência, use a declaração condicional if else. Esta abordagem é ligeiramente mais eficiente. No entanto, se tiver um cenário em que o valor testado possa corresponder mais do que uma vez, use várias declarações if distintas e ordene as declarações do caso mais genérico para o caso mais específico.

Escolha um conjunto representativo de ficheiros de registo para testar as alterações do analisador

Uma prática recomendada é testar o código do analisador com exemplos de registos não processados com uma grande variedade de formatos. Isto permite-lhe encontrar registos únicos ou casos extremos que o analisador pode ter de processar.

Adicione comentários descritivos ao código do analisador

Adicione comentários ao código do analisador que expliquem por que motivo a declaração é importante, em vez de o que a declaração faz. O comentário ajuda qualquer pessoa que faça a manutenção do analisador a seguir o fluxo. Segue-se um exemplo:

# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
  mutate {
    replace => {
      "event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
    }
  }
}

Inicialize as variáveis intermédias antecipadamente

Antes de extrair valores do registo não processado original, inicialize as variáveis intermédias que vão ser usadas para armazenar valores de teste.

Isto evita que seja devolvido um erro a indicar que a variável intermédia não existe.

A seguinte declaração atribui o valor na variável product ao campo da UDM metadata.product_name.

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
  }
}

Se a variável product não existir, recebe o seguinte erro:

"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"

Pode adicionar uma declaração on_error para detetar o erro. Segue-se um exemplo:

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  on_error => "_error_does_not_exist"
  }

A declaração de exemplo anterior deteta com êxito o erro de análise numa variável intermédia booleana, denominada _error_does_not_exist. Não lhe permite usar a variável product numa declaração condicional, por exemplo, if. Segue-se um exemplo:

if [product] != "" {
  mutate{
    replace => {
      "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  }
  on_error => "_error_does_not_exist"
}

O exemplo anterior devolve o seguinte erro porque a cláusula condicional if não suporta declarações on_error:

"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"

Para resolver este problema, adicione um bloco de declarações separado que inicialize as variáveis intermédias antes de executar as declarações do filtro de extração (json, csv, xml, kv ou grok). Segue-se um exemplo.

filter {
  # Initialize intermediate variables for any field you will use for a conditional check
  mutate {
    replace => {
      "timestamp" => ""
      "does_not_exist" => ""
    }
  }

  # load the logs fields from the message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }
}

O fragmento atualizado do código do analisador processa os vários cenários através de uma declaração condicional para verificar se o campo existe. Além disso, a declaração on_error processa os erros que podem ser encontrados.

Converta SHA-256 em base64

O exemplo seguinte extrai o valor SHA-256, codifica-o em base64, converte os dados codificados numa string hexadecimal e, em seguida, substitui campos específicos pelos valores extraídos e processados.

if [Sha256] != "" 
{
  base64
  {
  encoding => "RawStandard"
  source => "Sha256"
  target => "base64_sha256"
  on_error => "base64_message_error"
  }
  mutate
  {
    convert =>
    {
      "base64_sha256" => "bytestohex"
    }
    on_error => "already_a_string"
  }
  mutate
  {
    replace => 
  {
     "event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
     "event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
  }
  }
}

Trate erros em declarações do analisador

Não é invulgar que os registos recebidos estejam num formato de registo inesperado ou tenham dados com formatação incorreta.

Pode criar o analisador para processar estes erros. Uma prática recomendada é adicionar controladores on_error ao filtro de extração e, em seguida, testar a variável intermédia antes de avançar para o segmento seguinte da lógica do analisador.

O exemplo seguinte usa o filtro de extração json com uma declaração on_error para definir a variável booleana _not_json. Se _not_json estiver definido como true, significa que a entrada de registo recebida não estava num formato JSON válido e que a entrada de registo não foi analisada com êxito. Se a variável _not_json for false, a entrada de registo recebida estava num formato JSON válido.

 # load the incoming log from the default message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }

Também pode testar se um campo está no formato correto. O exemplo seguinte verifica se _not_json está definido como true, o que indica que o registo não estava no formato esperado.

 # Test that the received log matches the expected format
  if [_not_json] {
    drop { tag => "TAG_MALFORMED_MESSAGE" }
  } else {
    # timestamp is always expected
    if [timestamp] != "" {

      # ...additional parser logic goes here …

    } else {

      # if the timestamp field does not exist, it's not a log source
      drop { tag => "TAG_UNSUPPORTED" }
    }
  }

Isto garante que a análise não falha se os registos forem carregados com um formato incorreto para o tipo de registo especificado.

Use o filtro drop com a variável tag para que a condição seja capturada na tabela de métricas de carregamento no BigQuery.

  • TAG_UNSUPPORTED
  • TAG_MALFORMED_ENCODING
  • TAG_MALFORMED_MESSAGE
  • TAG_NO_SECURITY_VALUE

O filtro drop impede que o analisador processe o registo não processado, normalize os campos e crie um registo de UDM. O registo não processado original continua a ser carregado para o Google Security Operations e pode ser pesquisado através da pesquisa de registos não processados no Google SecOps.

O valor transmitido à variável tag é armazenado no campo drop_reason_code' na tabela de métricas de carregamento. Pode executar uma consulta ad hoc na tabela semelhante à seguinte:

SELECT
  log_type,
  drop_reason_code,
  COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC

Resolva problemas de erros de validação

Ao criar um analisador, pode encontrar erros relacionados com a validação, por exemplo, um campo obrigatório não está definido no registo de UDM. O erro pode ter um aspeto semelhante ao seguinte:

Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"

O código do analisador é executado com êxito, mas o registo de UDM gerado não inclui todos os campos de UDM necessários, conforme definido pelo conjunto de valores para o metadata.event_type. Seguem-se exemplos adicionais que podem causar este erro:

  • Se o metadata.event_type for USER_LOGIN e o campo UDM target.user value não estiver definido.
  • Se o metadata.event_type for NETWORK_CONNECTION e o campo target.hostnameUDM não estiver definido.

Para mais informações sobre o campo metadata.event_typeUDM e os campos obrigatórios, consulte o guia de utilização do UDM.

Uma opção para resolver este tipo de erro é começar por definir valores estáticos para os campos da UDM. Depois de definir todos os campos da UDM necessários, examine o registo não processado original para ver que valores analisar e guardar no registo da UDM. Se o registo não processado original não contiver determinados campos, pode ter de definir valores predefinidos.

Segue-se um modelo de exemplo, específico de um tipo de evento USER_LOGIN, que ilustra esta abordagem.

Tenha em atenção o seguinte:

  • O modelo inicializa as variáveis intermédias e define cada uma como uma string estática.
  • O código na secção Atribuição de campos define os valores nas variáveis intermédias para campos de UDM.

Pode expandir este código adicionando variáveis intermédias e campos da UDM adicionais. Depois de identificar todos os campos de UDM que têm de ser preenchidos, faça o seguinte:

  • Na secção Configuração de entrada, adicione código que extrai campos do registo não processado original e define os valores para as variáveis intermédias.

  • Na secção Extração de data, adicione código que extrai a data/hora do evento do registo não processado original, transforma-a e define-a para a variável intermédia.

  • Conforme necessário, substitua o valor inicializado definido em cada variável intermédia por uma string vazia.

filter {
 mutate {
   replace => {
     # UDM > Metadata
     "metadata_event_timestamp"    => ""
     "metadata_vendor_name"        => "Example"
     "metadata_product_name"       => "Example SSO"
     "metadata_product_version"    => "1.0"
     "metadata_product_event_type" => "login"
     "metadata_product_log_id"     => "12345678"
     "metadata_description"        => "A user logged in."
     "metadata_event_type"         => "USER_LOGIN"

     # UDM > Principal
     "principal_ip"       => "192.168.2.10"

     # UDM > Target
     "target_application"            => "Example Connect"
     "target_user_user_display_name" => "Mary Smith"
     "target_user_userid"            => "mary@example.com"

     # UDM > Extensions
     "auth_type"          => "SSO"
     "auth_mechanism"     => "USERNAME_PASSWORD"

     # UDM > Security Results
     "securityResult_action"         => "ALLOW"
     "security_result.severity"       => "LOW"

   }
 }

 # ------------ Input Configuration  --------------
  # Extract values from the message using one of the extraction filters: json, kv, grok

 # ------------ Date Extract  --------------
 # If the  date {} function is not used, the default is the normalization process time

  # ------------ Field Assignment  --------------
  # UDM Metadata
  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.vendor_name"        =>  "%{metadata_vendor_name}"
      "event1.idm.read_only_udm.metadata.product_name"       =>  "%{metadata_product_name}"
      "event1.idm.read_only_udm.metadata.product_version"    =>  "%{metadata_product_version}"
      "event1.idm.read_only_udm.metadata.product_event_type" =>  "%{metadata_product_event_type}"
      "event1.idm.read_only_udm.metadata.product_log_id"     =>  "%{metadata_product_log_id}"
      "event1.idm.read_only_udm.metadata.description"        =>  "%{metadata_description}"
      "event1.idm.read_only_udm.metadata.event_type"         =>  "%{metadata_event_type}"
    }
  }

  # Set the UDM > auth fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.extensions.auth.type"        => "%{auth_type}"
    }
    merge => {
      "event1.idm.read_only_udm.extensions.auth.mechanism"   => "auth_mechanism"
    }
  }

  # Set the UDM > principal fields
  mutate {
    merge => {
      "event1.idm.read_only_udm.principal.ip"                => "principal_ip"
    }
  }

  # Set the UDM > target fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.target.user.userid"             =>  "%{target_user_userid}"
      "event1.idm.read_only_udm.target.user.user_display_name"  =>  "%{target_user_user_display_name}"
      "event1.idm.read_only_udm.target.application"             =>  "%{target_application}"
    }
  }

  # Set the UDM > security_results fields
  mutate {
    merge => {
      "security_result.action" => "securityResult_action"
    }
  }

  # Set the security result
  mutate {
    merge => {
      "event1.idm.read_only_udm.security_result" => "security_result"
    }
  }

 # ------------ Output the event  --------------
  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

Analise texto não estruturado com uma função Grok

Quando usa uma função Grok para extrair valores de texto não estruturado, pode usar padrões Grok predefinidos e declarações de expressões regulares. Compreender padrões facilita a leitura do código. Se a expressão regular não incluir carateres abreviados (como \w, \s), pode copiar e colar a declaração diretamente no código do analisador.

Uma vez que os padrões Grok são uma camada de abstração adicional na declaração, podem tornar a resolução de problemas mais complexa quando encontra um erro. Segue-se um exemplo de uma função Grok que contém padrões Grok predefinidos e expressões regulares.

grok {
  match => {
    "message" => [
      "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
    ]
  }
}

Uma declaração de extração sem padrões Grok pode ter um melhor desempenho. Por exemplo, o exemplo seguinte requer menos de metade dos passos de tratamento para estabelecer correspondência. Para uma origem de registo com um volume potencialmente elevado, esta é uma consideração importante.

Compreenda as diferenças entre as expressões regulares RE2 e PCRE

Os analisadores do Google SecOps usam o RE2 como o motor de expressões regulares. Se tiver familiaridade com a sintaxe PCRE, pode notar diferenças. Segue-se um exemplo:

Segue-se uma declaração PCRE: (?<_custom_field>\w+)\s

Segue-se uma declaração RE2 para o código do analisador: (?P<_custom_field>\\w+)\\s

Certifique-se de que escapa aos carateres de escape

O Google SecOps armazena os dados de registo não processados recebidos no formato codificado JSON. Isto destina-se a garantir que as strings de carateres que parecem ser abreviaturas de expressões regulares são interpretadas como a string literal. Por exemplo, \t é interpretado como a string literal, em vez de um caráter de tabulação.

O exemplo seguinte mostra um registo não processado original e o registo de formato codificado JSON. Repare no caráter de escape adicionado antes de cada caráter de barra invertida que envolve o termo entry.

Segue-se o registo não processado original:

field=\entry\

Segue-se o registo convertido para o formato codificado JSON:

field=\\entry\\

Quando usa uma expressão regular no código do analisador, tem de adicionar carateres de escape adicionais se quiser extrair apenas o valor. Para fazer corresponder uma barra invertida no registo não processado original, use quatro barras invertidas na declaração de extração.

Segue-se uma expressão regular para o código do analisador:

^field=\\\\(?P<_value>.*)\\\\$

Segue-se o resultado gerado. O grupo denominado _value armazena o termo entry:

"_value": "entry"

Quando move uma declaração de expressão regular padrão para o código do analisador, escape os carateres abreviados de expressão regular na declaração de extração. Por exemplo, altere \s para \\s.

Deixe os carateres especiais de expressão regular inalterados quando tiverem escape duplo na instrução de extração. Por exemplo, \\ permanece inalterado como \\.

Segue-se uma expressão regular padrão:

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

A seguinte expressão regular é modificada para funcionar no código do analisador.

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

A tabela seguinte resume quando uma expressão regular padrão tem de incluir carateres de escape adicionais antes de a incluir no código do analisador.

Expressão regular Expressão regular modificada para o código do analisador Descrição da alteração
\s
\\s
Os carateres abreviados têm de ter um escape.
\.
\\.
Os carateres reservados têm de ter escape.
\\"
\\\"
Os carateres reservados têm de ter escape.
\]
\\]
Os carateres reservados têm de ter escape.
\|
\\|
Os carateres reservados têm de ter escape.
[^\\]+
[^\\\\]+
Os carateres especiais num grupo de classes de carateres têm de ser interpretados de forma literal.
\\\\
\\\\
Os carateres especiais fora de um grupo de classe de carateres ou os carateres abreviados não requerem uma escape adicional.

As expressões regulares têm de incluir um grupo de captura com nome

Uma expressão regular, como "^.*$", é uma sintaxe RE2 válida. No entanto, no código do analisador, ocorre a seguinte falha:

"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"

Tem de adicionar um grupo de captura válido à expressão. Se usar padrões Grok, estes incluem um grupo de captura com nome por predefinição. Quando usar substituições de expressões regulares, certifique-se de que inclui um grupo com nome.

Segue-se um exemplo de uma expressão regular no código do analisador:

"^(?P<_catchall>.*$)"

Segue-se o resultado, que mostra o texto atribuído ao grupo denominado _catchall.

"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."

Use um grupo com nome genérico para começar à medida que cria a expressão

Quando criar uma declaração de extração, comece com uma expressão que capte mais do que pretende. Em seguida, expanda a expressão um campo de cada vez.

O exemplo seguinte começa por usar um grupo com nome (_catchall) que corresponde à mensagem completa. Em seguida, cria a expressão passo a passo fazendo corresponder porções adicionais do texto. A cada passo, o grupo nomeado _catchall contém menos texto original. Continue e repita um passo de cada vez para fazer corresponder a mensagem até já não precisar do grupo denominado _catchall.

Passo Expressão regular no código do analisador Saída do grupo de captura denominado _catchall
1
"^(?P<_catchall>.*$)"
User \"BOB\" logged on to workstation \"DESKTOP-01\".
2
^User\s\\\"(?P<_catchall>.*$)
BOB\" logged on to workstation \"DESKTOP-01\".
3
^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$)
logged on to workstation \"DESKTOP-01\".
Continue até que a expressão corresponda a toda a string de texto.

Escape carateres abreviados na expressão regular

Lembre-se de usar carateres abreviados de expressões regulares com carateres de escape quando usar a expressão no código do analisador. Segue-se um exemplo de uma string de texto e da expressão regular padrão que extrai a primeira palavra, This.

  This is a sample log.

A seguinte expressão regular padrão extrai a primeira palavra, This. No entanto, quando executa esta expressão no código do analisador, o resultado não tem a letra s.

Expressão regular padrão Saída do grupo de captura denominado _firstWord
"^(?P<_firstWord>[^\s]+)\s.*$" "_firstWord": "Thi",

Isto deve-se ao facto de as expressões regulares no código do analisador exigirem um caráter de escape adicional adicionado aos carateres abreviados. No exemplo anterior, \s tem de ser alterado para \\s.

Expressão regular revista para o código do analisador Saída do grupo de captura denominado _firstWord
"^(?P<_firstWord>[^\\s]+)\\s.*$" "_firstWord": "This",

Isto aplica-se apenas aos carateres abreviados, como \s, \r e \t. Não é necessário aplicar mais escapes a outros carateres, como ``,.

Um exemplo completo

Esta secção descreve as regras anteriores como um exemplo integral. Segue-se uma string de texto não estruturada e a expressão regular padrão escrita para analisar a string. Por último, inclui a expressão regular modificada que funciona no código do analisador.

Segue-se a string de texto original.

User "BOB" logged on to workstation "DESKTOP-01".

Segue-se uma expressão regular RE2 padrão que analisa a string de texto.

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

Esta expressão extrai os seguintes campos.

Grupo de correspondências Posição do caráter String de texto
Correspondência total 0-53
User \"BOB\" logged on to workstation \"DESKTOP-01\".
Grupo `_user` 7-10
BOB
Grupo 2. 13-22
logged on
Grupo `_device` 40-50
DESKTOP-01

Esta é a expressão modificada. A expressão regular RE2 padrão foi modificada para funcionar no código do analisador.

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.