Dicas e solução de problemas ao escrever analisadores

Este documento descreve problemas que você pode encontrar ao escrever o código analisador.

Ao escrever o código analisador, talvez você encontre erros se as instruções de análise não funcionarem como esperado. Estas são algumas situações que podem gerar erros:

  • Um padrão Grok falha
  • Uma operação rename ou replace falha
  • Erros de sintaxe no código do analisador

Práticas comuns no código do analisador

As seções a seguir descrevem as práticas recomendadas, dicas e soluções para resolver problemas.

Evite usar pontos ou hifens nos nomes de variáveis

O uso de hifens e pontos em nomes de variáveis pode causar um comportamento inesperado, muitas vezes ao executar operações merge para armazenar valores em campos de UDM. Você também pode encontrar problemas de análise intermitente.

Por exemplo, não use os seguintes nomes de variáveis:

  • my.variable.result
  • my-variable-result

Em vez disso, use o seguinte nome de variável: my_variable_result.

Não use termos com significados especiais como um nome de variável

Certas palavras, como event e timestamp, podem ter um significado especial no código do analisador.

A string event costuma ser usada para representar um único registro UDM e é usada na instrução @output. Se uma mensagem de registro incluir um campo chamado event ou se você definir uma variável intermediária chamada event e o código do analisador usar a palavra event na instrução @output, você receberá uma mensagem de erro sobre um conflito de nome.

Renomeie a variável intermediária para outra coisa ou use o termo event1 como prefixo nos nomes de campos do UDM e na instrução @output.

A palavra timestamp representa o carimbo de data/hora criado do registro bruto original. Um valor definido nessa variável intermediária é salvo no campo de UDM metadata.event_timestamp. O termo @timestamp representa a data e a hora em que o registro bruto foi analisado para criar um registro UDM.

O exemplo a seguir define o campo UDM metadata.event_timestamp como a data e a hora em que o registro bruto foi analisado.

 # Save the log parse date and time to the timestamp variable
  mutate {
     rename => {
       "@timestamp" => "timestamp"
     }
   }

O exemplo a seguir define o campo de UDM metadata.event_timestamp como a data e a hora extraídas do registro bruto original e armazenados na variável intermediária when.

   # Save the event timestamp to timestamp variable
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

Não use os seguintes termos como variáveis:

  • data/hora da coleção
  • criar carimbo de data/hora
  • event
  • filename
  • mensagem
  • namespace
  • saída
  • Contagem onerror
  • carimbo de data/hora
  • timezone

Armazenar cada valor de dados em um campo de UDM separado

Não armazene vários campos em um único campo de UDM concatenando-os com um delimitador. Veja um exemplo abaixo.

"principal.user.first_name" => "first:%{first_name},last:%{last_name}"

Em vez disso, armazene cada valor em um campo de UDM separado.

"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"

Usar espaços em vez de tabulações no código

Não use tabulações no código do analisador. Use apenas espaços e recue dois espaços por vez.

Não realize várias ações de mesclagem em uma única operação

Mesclar vários campos em uma única operação pode gerar resultados inconsistentes. Em vez disso, coloque instruções merge em operações separadas.

Por exemplo, substitua o seguinte exemplo:

mutate {
  merge => {
      "security_result.category_details" => "category_details"
      "security_result.category_details" => "super_category_details"
  }
}

Por este código:

mutate {
  merge => {
    "security_result.category_details" => "category_details"
  }
}

mutate {
  merge => {
    "security_result.category_details" => "super_category_details"
  }
}

Como escolher expressões condicionais if versus if else

Se o valor condicional que você está testando puder ter somente uma correspondência, use a instrução condicional if else. Essa abordagem é um pouco mais eficiente. No entanto, em um cenário em que o valor testado pode corresponder mais de uma vez, use várias instruções if distintas e ordene as instruções do caso mais genérico para o mais específico.

Escolher um conjunto representativo de arquivos de registros para testar as alterações do analisador

Uma prática recomendada é testar o código do analisador usando amostras de registros brutos com uma ampla variedade de formatos. Isso permite que você encontre registros exclusivos ou casos extremos que o analisador possa precisar processar.

Adicionar comentários descritivos ao código do analisador

Adicionar comentários ao código do analisador que explicam por que a instrução é importante, em vez do que ela faz. O comentário ajuda os funcionários que cuidam do analisador a seguir o fluxo. Veja um exemplo abaixo.

# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
  mutate {
    replace => {
      "event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
    }
  }
}

Inicializar variáveis intermediárias antecipadamente

Antes de extrair valores do registro bruto original, inicialize variáveis intermediárias que serão usadas para armazenar valores de teste.

Isso evita que um erro seja retornado, indicando que a variável intermediária não existe.

A instrução a seguir atribui o valor na variável product ao campo de UDM metadata.product_name.

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
  }
}

Se a variável product não existir, este erro será exibido:

"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"

É possível adicionar uma instrução on_error para capturar o erro. Veja um exemplo abaixo.

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  on_error => "_error_does_not_exist"
  }

A instrução de exemplo anterior captura o erro de análise em uma variável intermediária booleana chamada _error_does_not_exist. Ele não permite usar a variável product em uma instrução condicional, como if. Veja um exemplo abaixo.

if [product] != "" {
  mutate{
    replace => {
      "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  }
  on_error => "_error_does_not_exist"
}

O exemplo anterior retorna o seguinte erro porque a cláusula condicional if não é compatível com instruções on_error:

"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"

Para resolver isso, adicione um bloco de instruções separado que inicialize as variáveis intermediárias antes de executar as instruções de filtro de extração (json, csv, xml, kv ou grok). Veja um exemplo.

filter {
  # Initialize intermediate variables for any field you will use for a conditional check
  mutate {
    replace => {
      "timestamp" => ""
      "does_not_exist" => ""
    }
  }

  # load the logs fields from the message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }
}

O snippet atualizado do código do analisador lida com os vários cenários usando uma instrução condicional para verificar se o campo existe. Além disso, a instrução on_error gerencia erros que podem ser encontrados.

Converter SHA-256 em base64

O exemplo a seguir extrai o valor SHA-256, o codifica em base64, converte os dados codificados em uma string hexadecimal e substitui campos específicos pelos valores extraídos e processados.

if [Sha256] != "" 
{
  base64
  {
  encoding => "RawStandard"
  source => "Sha256"
  target => "base64_sha256"
  on_error => "base64_message_error"
  }
  mutate
  {
    convert =>
    {
      "base64_sha256" => "bytestohex"
    }
    on_error => "already_a_string"
  }
  mutate
  {
    replace => 
  {
     "event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
     "event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
  }
  }
}

Tratar erros em instruções do analisador

Não é incomum que os registros de entrada estejam em um formato de registro inesperado ou tenham dados formatados incorretamente.

É possível criar o analisador para lidar com esses erros. Uma prática recomendada é adicionar gerenciadores on_error ao filtro de extração e, em seguida, testar a variável intermediária antes de prosseguir para o próximo segmento da lógica do analisador.

O exemplo a seguir usa o filtro de extração json com uma instrução on_error para definir a variável booleana _not_json. Se _not_json estiver definido como true, isso significa que a entrada de registro de entrada não estava no formato JSON válido e não foi analisada com êxito. Se a variável _not_json for false, a entrada de registro de entrada estava em formato JSON válido.

 # load the incoming log from the default message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }

Você também pode testar se um campo está no formato correto. O exemplo a seguir verifica se _not_json está definido como true, indicando que o registro não estava no formato esperado.

 # Test that the received log matches the expected format
  if [_not_json] {
    drop { tag => "TAG_MALFORMED_MESSAGE" }
  } else {
    # timestamp is always expected
    if [timestamp] != "" {

      # ...additional parser logic goes here …

    } else {

      # if the timestamp field does not exist, it's not a log source
      drop { tag => "TAG_UNSUPPORTED" }
    }
  }

Isso garante que a análise não falhe se os registros forem ingeridos com um formato incorreto para o tipo de registro especificado.

Use o filtro drop com a variável tag para que a condição seja capturada na tabela de métricas de ingestão no BigQuery.

  • TAG_UNSUPPORTED
  • TAG_MALFORMED_ENCODING
  • TAG_MALFORMED_MESSAGE
  • TAG_NO_SECURITY_VALUE

O filtro drop impede o analisador de processar o registro bruto, normalizando os campos e criando um registro UDM. O registro bruto original ainda é processado no Google Security Operations e pode ser pesquisado usando a pesquisa de registro bruto no Google Security Operations.

O valor transmitido à variável tag é armazenado no campo drop_reason_code da tabela de métricas de ingestão. É possível executar uma consulta ad hoc na tabela semelhante a esta:

SELECT
  log_type,
  drop_reason_code,
  COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC

Resolver erros de validação

Ao criar um analisador, você pode encontrar erros relacionados à validação, por exemplo, um campo obrigatório não está definido no registro UDM. O erro pode ser semelhante a este:

Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"

O código do analisador é executado com sucesso, mas o registro UDM gerado não inclui todos os campos de UDM obrigatórios, conforme definido pelo valor definido como metadata.event_type. Confira a seguir outros exemplos que podem causar esse erro:

  • Se o metadata.event_type for USER_LOGIN e o campo de UDM target.user value não estiver definido.
  • Se metadata.event_type for NETWORK_CONNECTION e o campo target.hostnameUDM não estiver definido.

Para mais informações sobre o campo UDM metadata.event_type e os campos obrigatórios, consulte o Guia de uso do UDM.

Uma opção para solucionar esse tipo de erro é começar definindo valores estáticos para campos de UDM. Depois de definir todos os campos do UDM necessários, examine o registro bruto original para ver quais valores analisar e salvar no registro UDM. Se o registro bruto original não contiver determinados campos, talvez seja necessário definir valores padrão.

Confira a seguir um modelo de exemplo específico para um tipo de evento USER_LOGIN que ilustra essa abordagem.

Observe o seguinte:

  • O modelo inicializa variáveis intermediárias e as define como uma string estática.
  • O código na seção Atribuição de campo define os valores em variáveis intermediárias para campos do UDM.

É possível expandir esse código adicionando outras variáveis intermediárias e campos UDM. Depois de identificar todos os campos do UDM que precisam ser preenchidos, faça o seguinte:

  • Na seção Configuração de entrada, adicione o código que extraia campos do registro bruto original e define os valores para as variáveis intermediárias.

  • Na seção Extração de data, adicione o código que extraia o carimbo de data/hora do evento do registro bruto original, o transforma e o define como a variável intermediária.

  • Conforme necessário, substitua o valor inicializado definido em cada variável intermediária por uma string vazia.

filter {
 mutate {
   replace => {
     # UDM > Metadata
     "metadata_event_timestamp"    => ""
     "metadata_vendor_name"        => "Example"
     "metadata_product_name"       => "Example SSO"
     "metadata_product_version"    => "1.0"
     "metadata_product_event_type" => "login"
     "metadata_product_log_id"     => "12345678"
     "metadata_description"        => "A user logged in."
     "metadata_event_type"         => "USER_LOGIN"

     # UDM > Principal
     "principal_ip"       => "192.168.2.10"

     # UDM > Target
     "target_application"            => "Example Connect"
     "target_user_user_display_name" => "Mary Smith"
     "target_user_userid"            => "mary@example.com"

     # UDM > Extensions
     "auth_type"          => "SSO"
     "auth_mechanism"     => "USERNAME_PASSWORD"

     # UDM > Security Results
     "securityResult_action"         => "ALLOW"
     "security_result.severity"       => "LOW"

   }
 }

 # ------------ Input Configuration  --------------
  # Extract values from the message using one of the extraction filters: json, kv, grok

 # ------------ Date Extract  --------------
 # If the  date {} function is not used, the default is the normalization process time

  # ------------ Field Assignment  --------------
  # UDM Metadata
  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.vendor_name"        =>  "%{metadata_vendor_name}"
      "event1.idm.read_only_udm.metadata.product_name"       =>  "%{metadata_product_name}"
      "event1.idm.read_only_udm.metadata.product_version"    =>  "%{metadata_product_version}"
      "event1.idm.read_only_udm.metadata.product_event_type" =>  "%{metadata_product_event_type}"
      "event1.idm.read_only_udm.metadata.product_log_id"     =>  "%{metadata_product_log_id}"
      "event1.idm.read_only_udm.metadata.description"        =>  "%{metadata_description}"
      "event1.idm.read_only_udm.metadata.event_type"         =>  "%{metadata_event_type}"
    }
  }

  # Set the UDM > auth fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.extensions.auth.type"        => "%{auth_type}"
    }
    merge => {
      "event1.idm.read_only_udm.extensions.auth.mechanism"   => "auth_mechanism"
    }
  }

  # Set the UDM > principal fields
  mutate {
    merge => {
      "event1.idm.read_only_udm.principal.ip"                => "principal_ip"
    }
  }

  # Set the UDM > target fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.target.user.userid"             =>  "%{target_user_userid}"
      "event1.idm.read_only_udm.target.user.user_display_name"  =>  "%{target_user_user_display_name}"
      "event1.idm.read_only_udm.target.application"             =>  "%{target_application}"
    }
  }

  # Set the UDM > security_results fields
  mutate {
    merge => {
      "security_result.action" => "securityResult_action"
    }
  }

  # Set the security result
  mutate {
    merge => {
      "event1.idm.read_only_udm.security_result" => "security_result"
    }
  }

 # ------------ Output the event  --------------
  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

Analisar texto não estruturado usando uma função Grok

Ao usar uma função Grok para extrair valores de texto não estruturado, você pode usar padrões do Grok predefinidos e instruções de expressão regular. Os padrões Grok tornam o código mais fácil de ler. Se a expressão regular não incluir caracteres abreviados (como \w, \s), copie e cole a instrução diretamente no código do analisador.

Como os padrões Grok são uma camada de abstração adicional na instrução, eles podem tornar a solução de problemas mais complexa quando você encontra um erro. Veja a seguir um exemplo de função do Grok que contém padrões Grok predefinidos e expressões regulares.

grok {
  match => {
    "message" => [
      "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
    ]
  }
}

Uma instrução de extração sem padrões Grok pode ter um desempenho melhor. Por exemplo, o exemplo a seguir requer menos da metade das etapas de processamento para corresponder. Para uma origem de registros de alto volume, essa é uma consideração importante.

Entender as diferenças entre as expressões regulares RE2 e PCRE

Os analisadores das Operações de segurança do Google usam RE2 como mecanismo de expressão regular. Se você tiver familiaridade com a sintaxe do PCRE, vai notar diferenças. Confira um exemplo:

Confira a seguir uma instrução PCRE: (?<_custom_field>\w+)\s

Confira a seguir uma instrução RE2 para o código do analisador: (?P<_custom_field>\\w+)\\s

Não se esqueça dos caracteres de escape.

As Operações de segurança do Google armazenam dados de registro brutos de entrada no formato codificado JSON. Isso serve para garantir que as strings de caracteres que parecem ser a abreviação de uma expressão regular sejam interpretadas como a string literal. Por exemplo, \t é interpretado como uma string literal, e não um caractere de tabulação.

O exemplo a seguir é um registro bruto original e o registro de formato codificado JSON. Observe o caractere de escape adicionado à frente de cada caractere de barra invertida ao redor do termo entry.

Veja a seguir o registro bruto original:

field=\entry\

Veja a seguir o registro convertido para o formato codificado JSON:

field=\\entry\\

Ao usar uma expressão regular no código do analisador, adicione mais caracteres de escape se quiser extrair apenas o valor. Para corresponder a uma barra invertida no registro bruto original, use quatro barras invertidas na instrução de extração.

Veja a seguir uma expressão regular para o código do analisador:

^field=\\\\(?P<_value>.*)\\\\$

Veja a seguir o resultado gerado. O grupo nomeado _value armazena o termo entry:

"_value": "entry"

Ao mover uma instrução de expressão regular padrão para o código do analisador, faça o escape dos caracteres abreviados da expressão regular na instrução de extração. Por exemplo, mude \s para \\s.

Deixe os caracteres especiais da expressão regular inalterados quando houver escape duplo na instrução de extração. Por exemplo, \\ permanece inalterado como \\.

Veja a seguir uma expressão regular padrão:

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

A expressão regular a seguir é modificada para funcionar no código do analisador.

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

A tabela a seguir resume quando uma expressão regular padrão precisa incluir outros caracteres de escape antes de incluí-la no código do analisador.

Expressão regular Expressão regular modificada para código do analisador Descrição da mudança
\s
\\s
Caracteres abreviados devem ser evitados.
\.
\\.
Os caracteres reservados devem ter escape.
\\"
\\\"
Os caracteres reservados devem ter escape.
\]
\\]
Os caracteres reservados devem ter escape.
\|
\\|
Os caracteres reservados devem ter escape.
[^\\]+
[^\\\\]+
Os caracteres especiais em um grupo de classe de caracteres precisam ter escape.
\\\\
\\\\
Caracteres especiais fora de um grupo de classe de caracteres ou caracteres abreviados não precisam de um escape extra.

As expressões regulares precisam incluir um grupo de captura nomeado

Uma expressão regular, como "^.*$", é uma sintaxe RE2 válida. No entanto, no código do analisador, ela falha com o seguinte erro:

"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"

É necessário adicionar um grupo de captura válido à expressão. Se você usar os padrões do Grok, eles incluirão um grupo de captura nomeado por padrão. Ao usar substituições de expressões regulares, inclua um grupo nomeado.

Veja a seguir um exemplo de expressão regular no código do analisador:

"^(?P<_catchall>.*$)"

Este é o resultado, mostrando o texto atribuído ao grupo nomeado _catchall.

"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."

Usar um grupo nomeado abrangente para começar a criar a expressão

Ao criar uma instrução de extração, comece com uma expressão que capture mais do que você quer. Em seguida, expanda a expressão um campo por vez.

O exemplo a seguir começa usando um grupo nomeado (_catchall) que corresponde à mensagem inteira. Em seguida, ele cria a expressão em etapas combinando outras partes do texto. A cada etapa, o grupo nomeado _catchall contém menos partes do texto original. Continue e itere uma etapa de cada vez para corresponder à mensagem até que não precise mais do grupo nomeado _catchall.

Etapa Expressão regular no código do analisador Saída do grupo de captura nomeado _catchall
1
"^(?P<_catchall>.*$)"
User \"BOB\" logged on to workstation \"DESKTOP-01\".
2
^User\s\\\"(?P<_catchall>.*$)
BOB\" logged on to workstation \"DESKTOP-01\".
3
^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$)
logged on to workstation \"DESKTOP-01\".
Continue até que a expressão corresponda a toda a string de texto.

Evitar caracteres abreviados na expressão regular

Lembre-se de fazer o escape de caracteres abreviados da expressão regular ao usar a expressão no código do analisador. Veja a seguir um exemplo de string de texto e a expressão regular padrão que extrai a primeira palavra, This.

  This is a sample log.

A expressão regular padrão a seguir extrai a primeira palavra, This. No entanto, quando você executa essa expressão no código do analisador, o resultado não inclui a letra s.

Expressão regular padrão Saída do grupo de captura nomeado _firstWord
"^(?P<_firstWord>[^\s]+)\s.*$" "_firstWord": "Thi",

Isso ocorre porque as expressões regulares no código do analisador exigem um caractere de escape extra adicionado aos caracteres abreviados. No exemplo anterior, \s precisa ser alterado para \\s.

Expressão regular revisada para código do analisador Saída do grupo de captura nomeado _firstWord
"^(?P<_firstWord>[^\\s]+)\\s.*$" "_firstWord": "This",

Isso se aplica apenas a caracteres abreviados, como \s, \r e \t. Outros caracteres, como ``, não precisam de mais escape.

Um exemplo completo

Esta seção descreve as regras anteriores como um exemplo completo. Veja aqui uma string de texto não estruturada e a expressão regular padrão escrita para analisar a string. Por fim, inclui a expressão regular modificada que funciona no código do analisador.

Veja a seguir a string de texto original.

User "BOB" logged on to workstation "DESKTOP-01".

Veja a seguir uma expressão regular RE2 padrão que analisa a string de texto.

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

Essa expressão extrai os seguintes campos.

Grupo de correspondência Posição do caractere String de texto
Correspondência total 0-53
User \"BOB\" logged on to workstation \"DESKTOP-01\".
Grupo "_user" 7-10
BOB
Grupo 2. 13-22
logged on
Grupo "_device" 40-50
DESKTOP-01

Essa é a expressão modificada. A expressão regular RE2 padrão foi modificada para funcionar no código do analisador.

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$