Visão geral da análise de registros

Compatível com:

Este documento apresenta uma visão geral de como o Google Security Operations analisa registros brutos no formato de modelo de dados unificado (UDM, na sigla em inglês).

O Google Security Operations pode receber dados de registro das seguintes origens de ingestão:

  • Encaminhador do Google Security Operations
  • Feed da API Google Security Operations
  • API Google Security Operations Ingestion
  • Parceiro de tecnologia terceirizado

Em geral, os clientes enviam dados como registros brutos originais. O Google Security Operations identifica de forma exclusiva o dispositivo que gerou os registros usando o LogType. O LogType identifica ambos:

  • o fornecedor e o dispositivo que geraram o registro, como o Cisco Firewall, o servidor DHCP do Linux ou o DNS do Bro.
  • que converte o registro bruto em um modelo de dados unificado (UDM, na sigla em inglês) estruturado. Há uma relação de um para um entre um analisador e um LogType. Cada analisador converte os dados recebidos por um único LogType.

O Google Security Operations fornece um conjunto de analisadores padrão que leem registros brutos originais e geram registros estruturados do UDM usando dados no registro bruto original. O Google Security Operations mantém esses analisadores. Os clientes também podem definir instruções personalizadas de mapeamento de dados criando um analisador específico do cliente. Entre em contato com seu representante de operações de segurança do Google para saber como criar um analisador específico do cliente.

Fluxo de trabalho de ingestão e normalização

O analisador contém instruções de mapeamento de dados. Ele define como os dados são mapeados do registro bruto original para um ou mais campos na estrutura de dados do UDM.

Se não houver erros de análise, as Operações de segurança do Google vão criar um registro estruturado pela UDM usando dados do registro bruto. O processo de conversão de um registro bruto em um registro do UDM é chamado de normalização.

Um analisador padrão pode mapear um subconjunto de valores principais do registro bruto. Normalmente, esses campos principais são os mais importantes para fornecer insights de segurança no Google Security Operations. Os valores não mapeados permanecem no registro bruto, mas não são armazenados no registro do UDM.

O cliente também pode usar a API Ingestion para enviar dados em formato estruturado de modelo de dados unificado (UDM, na sigla em inglês).

Personalizar como os dados ingeridos são analisados

O Google Security Operations oferece os recursos abaixo, que permitem aos clientes personalizar a análise de dados nos dados de registro originais recebidos.

  • Analisadores específicos do cliente: os clientes criam uma configuração de analisador personalizado para um tipo de registro específico que atenda aos requisitos específicos. Um analisador específico do cliente substitui o analisador padrão para o LogType específico. Entre em contato com seu representante de operações de segurança do Google para saber como criar um analisador específico do cliente.
  • Extensões do analisador: os clientes podem adicionar instruções de mapeamento personalizadas, além da configuração padrão do analisador. Cada cliente pode criar um conjunto exclusivo de instruções de mapeamento personalizado. Essas instruções de mapeamento definem como extrair e transformar campos adicionais dos registros brutos originais em campos do UDM. Uma extensão de analisador não substitui o analisador padrão ou específico do cliente.

Exemplo de uso de um registro de proxy da Web Squid

Esta seção fornece um exemplo de registro de proxy da Web do Squid e descreve como os valores são mapeados para um registro do UDM. Para uma descrição de todos os campos no esquema do UDM, consulte a Lista de campos do modelo de dados unificado.

O exemplo de registro do proxy da Web Squid contém valores separados por espaços. Cada registro representa um evento e armazena os seguintes dados: carimbo de data/hora, duração, cliente, código/status do resultado, bytes transmitidos, método de solicitação, URL, usuário, código de hierarquia e tipo de conteúdo. Neste exemplo, os seguintes campos são extraídos e mapeados em um registro do UDM: hora, cliente, status do resultado, bytes, método de solicitação e URL.

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Exemplo de proxy da Web Squid

Ao comparar essas estruturas, observe que apenas um subconjunto dos dados de registro originais são incluídos no registro do UDM. Alguns campos são obrigatórios e outros são opcionais. Além disso, apenas um subconjunto das seções no registro do UDM contém dados. Se o analisador não mapear os dados do registro original para o registro do UDM, essa seção do registro do UDM não será mostrada nas Operações de segurança do Google.

Valores de registro mapeados para o UDM

A seção metadata armazena o carimbo de data/hora do evento. O valor foi convertido do formato EPOCH para RFC 3339. Essa conversão é opcional. O carimbo de data/hora pode ser armazenado no formato EPOCH, com pré-processamento para separar as partes de segundos e milissegundos em campos separados.

O campo metadata.event_type armazena o valor NETWORK_HTTP, que é um valor enumerado que identifica o tipo de evento. O valor de metadata.event_type determina quais campos adicionais do UDM são obrigatórios ou opcionais. Os valores product_name e vendor_name contêm descrições fáceis de usar do dispositivo que registrou o registro original.

O metadata.event_type em um registro de evento do UDM não é o mesmo que o log_type definido ao ingerir dados usando a API Ingestion. Esses dois atributos armazenam informações diferentes.

A seção network contém valores do evento de registro original. Observe neste exemplo que o valor de status do registro original foi analisado do campo "result code/status" antes de ser gravado no registro do UDM. Apenas o result_code foi incluído no registro do UDM.

Valores de registro mapeados para o UDM

A seção principal armazena as informações do cliente do registro original. A seção target armazena o URL totalmente qualificado e o endereço IP.

A seção security_result armazena um dos valores de tipo enumerado para representar a ação gravada no registro original.

Este é o registro do UDM formatado como JSON. Observe que apenas as seções que contêm dados são incluídas. As seções src, observer, intermediary, about e extensions não estão incluídas.

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

Etapas nas instruções do analisador

As instruções de mapeamento de dados em um analisador seguem um padrão comum, como este:

  1. Analisar e extrair dados do registro original.
  2. Manipule os dados extraídos. Isso inclui o uso de lógica condicional para analisar seletivamente valores, converter tipos de dados, substituir substrings em um valor, converter para maiúsculas ou minúsculas etc.
  3. Atribuir valores aos campos do UDM.
  4. Envia o registro do UDM mapeado para a chave @output.

Analisar e extrair dados do registro original

Definir a instrução de filtro

A instrução filter é a primeira instrução no conjunto de instruções de análise. Todas as outras instruções de análise estão contidas na instrução filter.

filter {

}

Inicializar variáveis que vão armazenar valores extraídos

Na instrução filter, inicialize variáveis intermediárias que o parser vai usar para armazenar valores extraídos do registro.

Essas variáveis são usadas sempre que um registro individual é analisado. O valor em cada variável intermediária será definido como um ou mais campos do UDM mais adiante nas instruções de análise.

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

Extrair valores individuais do registro

O Google Security Operations fornece um conjunto de filtros, com base no Logstash, para extrair campos dos arquivos de registro originais. Dependendo do formato do registro, use um ou vários filtros de extração para extrair todos os dados dele. Se a string for:

  • JSON nativo, a sintaxe do analisador é semelhante ao filtro JSON, que oferece suporte a registros formatados em JSON. JSONs aninhados não são aceitos.
  • O formato XML e a sintaxe do analisador são semelhantes ao filtro XML, que oferece suporte a registros formatados em XML.
  • pares de chave-valor, a sintaxe do analisador é semelhante ao filtro Kv, que aceita mensagens formatadas com chave-valor.
  • O formato CSV e a sintaxe do analisador são semelhantes ao filtro CSV, que aceita mensagens formatadas em CSV.
  • todos os outros formatos, a sintaxe do analisador é semelhante ao filtro GROK com padrões GROK integrados . Ele usa instruções de extração no estilo Regex.

O Google Security Operations oferece um subconjunto dos recursos disponíveis em cada filtro. O Google Security Operations também oferece uma sintaxe de mapeamento de dados personalizada que não está disponível nos filtros. Consulte a referência de sintaxe do analisador para uma descrição dos recursos com suporte e das funções personalizadas.

Continuando com o exemplo de registro do proxy da Web Squid, a instrução de extração de dados a seguir inclui uma combinação de sintaxe Logstash Grok e expressões regulares.

A instrução de extração a seguir armazena valores nas seguintes variáveis intermediárias:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

Essa instrução de exemplo também usa a palavra-chave overwrite para armazenar os valores extraídos em cada variável. Se o processo de extração retornar um erro, a instrução on_error vai definir o not_valid_log como True.

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

Manipular e transformar os valores extraídos

O Google Security Operations aproveita os recursos do plug-in de filtro de mutação do Logstash para permitir a manipulação de valores extraídos do registro original. O Google Security Operations oferece um subconjunto dos recursos disponíveis no plug-in. Consulte a sintaxe do analisador para uma descrição dos recursos com suporte e das funções personalizadas, como:

  • converter valores em um tipo de dados diferente
  • substituir valores na string
  • mescle duas matrizes ou anexe uma string a uma matriz. Os valores de strings são convertidos em uma matriz antes da mesclagem.
  • converter em letras minúsculas ou maiúsculas

Esta seção apresenta exemplos de transformação de dados que se baseiam no registro do proxy da Web Squid apresentado anteriormente.

Transformar o carimbo de data/hora do evento

Todos os eventos armazenados como um registro do UDM precisam ter um carimbo de data/hora. Este exemplo verifica se um valor para os dados foi extraído do registro. Em seguida, ele usa a função de data do Grok para corresponder o valor ao formato de hora UNIX.

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

Transforme o valor de username

O exemplo de instrução abaixo converte o valor na variável username em letras minúsculas.

mutate {
   lowercase => [ "username"]
   }

Transforme o valor de action

O exemplo a seguir avalia o valor na variável intermediária action e muda o valor para ALLOW, BLOCK ou UNKNOWN_ACTION, que são valores válidos para o campo UDM security_result.action. O campo UDM security_result.action é um tipo enumerado que armazena apenas valores específicos.

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

Transformar o endereço IP de destino

O exemplo a seguir verifica um valor na variável intermediária tgtip. Se encontrado, o valor é associado a um padrão de endereço IP usando um padrão predefinido do Grok. Se houver um erro que corresponde ao valor de um padrão de endereço IP, a função on_error vai definir a propriedade not_valid_tgtip como True. Se a correspondência for bem-sucedida, a propriedade not_valid_tgtip não será definida.

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

Mudar o tipo de dados de returnCode e size

O exemplo a seguir converte o valor na variável size em uinteger e o valor na variável returnCode em integer. Isso é necessário porque a variável size será salva no campo UDM network.received_bytes, que armazena um tipo de dados int64. A variável returnCode será salva no campo UDM network.http.response_code, que armazena um tipo de dados int32.

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

Atribuir valores a campos do UDM em um evento

Depois que os valores forem extraídos e pré-processados, atribua-os a campos em um registro de evento da UDM. É possível atribuir valores extraídos e estáticos a um campo do UDM.

Se você preencher event.disambiguation_key, verifique se esse campo é exclusivo para cada evento gerado para o registro especificado. Se dois eventos diferentes tiverem o mesmo disambiguation_key, isso resultará em um comportamento inesperado no sistema.

Os exemplos de analisador nesta seção são baseados no exemplo de registro de proxy da Web Squid acima.

Salvar o carimbo de data/hora do evento

Cada registro de evento da UDM precisa ter um valor definido para o campo metadata.event_timestamp da UDM. O exemplo a seguir salva o carimbo de data/hora do evento extraído do registro na variável integrada @timestamp. O Google Security Operations salva isso no campo UDM metadata.event_timestamp por padrão.

mutate {
  rename => {
    "when" => "timestamp"
  }
}

Definir o tipo de evento

Cada registro de evento da UDM precisa ter um valor definido para o campo metadata.event_type da UDM. Esse campo é um tipo enumerado. O valor desse campo determina quais outros campos do UDM precisam ser preenchidos para que o registro do UDM seja salvo. O processo de análise e normalização vai falhar se algum dos campos obrigatórios não contiver dados válidos.

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

Salve os valores username e method usando a instrução replace

Os valores nos campos intermediários username e method são strings. O exemplo a seguir verifica se um valor válido existe e, se sim, armazena o valor username no campo UDM principal.user.userid e o valor method no campo UDM network.http.method.

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

Salve o action no campo UDM security_result.action

Na seção anterior, o valor na variável intermediária action foi avaliado e transformado em um dos valores padrão do campo UDM security_result.action.

Os campos de UDM security_result e action armazenam uma matriz de itens, o que significa que você precisa seguir uma abordagem um pouco diferente ao salvar esse valor.

Primeiro, salve o valor transformado em um campo security_result.action intermediário. O campo security_result é pai do campo action.

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

Em seguida, salve o campo intermediário security_result.action no campo UDM security_result. O campo UDM security_result armazena uma matriz de itens. Portanto, o valor é anexado a esse campo.

# save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

Armazene o endereço IP de destino e o endereço IP de origem usando a instrução merge

Armazene os seguintes valores no registro de evento do UDM:

  • Valor na variável intermediária srcip para o campo UDM principal.ip.
  • Valor na variável intermediária tgtip para o campo UDM target.ip.

Os campos UDM principal.ip e target.ip armazenam uma matriz de itens. Portanto, os valores são anexados a cada campo.

Os exemplos abaixo demonstram abordagens diferentes para salvar esses valores. Durante a etapa de transformação, a variável intermediária tgtip foi associada a um endereço IP usando um padrão Grok predefinido. A instrução de exemplo a seguir verifica se a propriedade not_valid_tgtip é verdadeira, indicando que o valor tgtip não pode ser associado a um padrão de endereço IP. Se for falso, o valor tgtip será salvo no campo UDM target.ip.

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

A variável intermediária srcip não foi transformada. A instrução a seguir verifica se um valor foi extraído do registro original e, se sim, salva o valor no campo UDM principal.ip.

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

Salvar url, returnCode e size usando a instrução rename

O exemplo abaixo armazena os seguintes valores usando a instrução rename.

  • A variável url foi salva no campo UDM target.url.
  • A variável intermediária returnCode salva no campo UDM network.http.response_code.
  • A variável intermediária size salva no campo UDM network.received_bytes.
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

Vincular o registro do UDM à saída

A declaração final na instrução de mapeamento de dados gera os dados processados para um registro de evento da UDM.

mutate {
    merge => {
      "@output" => "event"
    }
  }

O código completo do analisador

Este é o exemplo de código completo do analisador. A ordem das instruções não segue a mesma ordem das seções anteriores deste documento, mas resulta na mesma saída.

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }


if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter