Visão geral da análise de registros
Este documento apresenta uma visão geral de como o Google Security Operations analisa registros brutos no formato de modelo de dados unificado (UDM, na sigla em inglês).
O Google Security Operations pode receber dados de registro originados das seguintes fontes de ingestão:
- Encaminhador de Operações de Segurança do Google
- Feed da API Google Security Operations
- API Google Security Operations Ingestion
- Parceiro de tecnologia terceirizado
Em geral, os clientes enviam dados como registros brutos originais. As Operações de segurança do Google identifica o dispositivo que gerou os registros usando o LogType. O LogType identifica ambos:
- o fornecedor e o dispositivo que geraram o registro, como o Cisco Firewall, o servidor DHCP do Linux ou o DNS do Bro.
- que converte o registro bruto em um modelo de dados unificado (UDM, na sigla em inglês) estruturado. Há uma relação de um para um entre um analisador e um LogType. Cada analisador converte os dados recebidos por um único LogType.
As Operações de segurança do Google oferecem um conjunto de analisadores padrão que leem registros brutos originais e gerar registros UDM estruturados usando dados no registro bruto original. O Google Security Operations mantém esses analisadores. Os clientes também podem definir instruções personalizadas de mapeamento de dados com a criação de um analisador específico para o cliente. Entre em contato com suas Operações de segurança do Google para obter informações sobre como criar um analisador específico para o cliente.
O analisador contém instruções de mapeamento de dados. Ele define como os dados são mapeados do registro bruto original para um ou mais campos na estrutura de dados do UDM.
Se não houver erros de análise, as Operações de segurança do Google vão criar um registro estruturado UDM usando os dados do registro bruto. O processo de conversão de um registro bruto em um registro UDM é chamada normalização.
Um analisador padrão pode mapear um subconjunto de valores principais do registro bruto. Normalmente, esses campos principais são os mais importantes para fornecer insights de segurança no Google Security Operations. Os valores não mapeados permanecem no registro bruto, mas não são no registro UDM.
Os clientes também podem usar a API Ingestion, para enviar dados no formato do Modelo de dados unificado (UDM).
Personalizar como os dados ingeridos são analisados
As Operações de segurança do Google oferecem os seguintes recursos que permitem aos clientes personalizar a análise de dados de registro originais recebidos.
- Analisadores específicos do cliente: os clientes criam uma configuração de analisador personalizado para um tipo de registro específico que atenda aos requisitos específicos. Um analisador específico do cliente substitui o analisador padrão do LogType específico. Entre em contato com suas Operações de segurança do Google para obter informações sobre como criar um analisador específico para o cliente.
- Extensões de analisador: os clientes podem adicionar instruções de mapeamento personalizadas em além da configuração padrão do analisador. Cada cliente pode criar um conjunto exclusivo de instruções de mapeamento personalizado. Esses mapeamentos definem como extrair e transformar campos adicionais de registros brutos originais para campos de UDM. Uma extensão de analisador não substitui o analisador padrão ou específico do cliente.
Exemplo de uso de um registro de proxy da Web Squid
Esta seção fornece um exemplo de registro de proxy da web do Squid e descreve como os são mapeados para um registro UDM. Para uma descrição de todos os campos no esquema do UDM, consulte a Lista de campos do modelo de dados unificado.
O registro de proxy da Web do Squid contém valores separados por espaços. Cada registro representa um evento e armazena os seguintes dados: carimbo de data/hora, duração, cliente, código/status do resultado, bytes transmitidos, método de solicitação, URL, usuário, código da hierarquia e tipo de conteúdo. Neste exemplo, os seguintes campos são extraídos e mapeados em um registro do UDM: hora, cliente, status do resultado, bytes, método de solicitação e URL.
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
Ao comparar essas estruturas, observe que apenas um subconjunto dos dados de registro originais são incluídos no registro do UDM. Alguns campos são obrigatórios e outros são opcionais. Além disso, apenas um subconjunto das seções no registro do UDM contém dados. Se o analisador não mapear os dados do registro original para o registro do UDM, essa seção do registro do UDM não será mostrada nas Operações de segurança do Google.
A seção metadata
armazena o carimbo de data/hora do evento. O valor foi convertido
do formato EPOCH para RFC 3339. Essa conversão é opcional. O carimbo de data/hora pode ser
armazenado no formato EPOCH, com pré-processamento para separar os segundos e
milésimos de segundo em campos separados.
O campo metadata.event_type
armazena o valor NETWORK_HTTP
, que é um valor enumerado
que identifica o tipo de evento. O valor de metadata.event_type
determina quais campos adicionais do UDM são obrigatórios ou opcionais. Os valores product_name
e
vendor_name
contêm descrições simples do dispositivo que
registrou o registro original.
O metadata.event_type
em um registro de evento do UDM não é o mesmo que o log_type
definido ao ingerir dados usando a API Ingestion. Esses dois atributos armazenam
informações diferentes.
A seção network
contém valores do evento de registro original. Observe neste
exemplo que o valor de status do registro original foi analisado do campo "result
code/status" antes de ser gravado no registro do UDM. Apenas o result_code
foi incluído no registro UDM.
A seção principal
armazena as informações do cliente do registro original. O
A seção target
armazena o URL totalmente qualificado e o endereço IP.
A seção security_result
armazena um dos valores de tipo enumerado para representar a ação gravada no registro original.
Este é o registro do UDM formatado como JSON. Observe que apenas as seções que
que contêm dados são incluídos. As solicitações src
, observer
, intermediary
, about
,
e extensions
não foram incluídas.
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
Etapas nas instruções do analisador
As instruções de mapeamento de dados em um analisador seguem um padrão comum, conforme segue:
- Analisar e extrair dados do registro original.
- Manipule os dados extraídos. Isso inclui o uso de lógica condicional para analisar seletivamente valores, converter tipos de dados, substituir substrings em um valor, converter para maiúsculas ou minúsculas etc.
- Atribua valores aos campos de UDM.
- Envia o registro do UDM mapeado para a chave @output.
Analisar e extrair dados do registro original
Definir a instrução de filtro
A instrução filter
é a primeira instrução no conjunto de instruções de análise.
Todas as instruções de análise adicionais estão contidas na instrução filter
.
filter {
}
Inicializar variáveis que vão armazenar valores extraídos
Na instrução filter
, inicialize variáveis intermediárias que o
o analisador vai usar para armazenar os valores extraídos do registro.
Essas variáveis são usadas sempre que um registro individual é analisado. O valor em cada variável intermediária será definido como um ou mais campos do UDM mais adiante nas instruções de análise.
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
Extrair valores individuais do registro
O Google Security Operations fornece um conjunto de filtros, com base no Logstash, para extrair campos dos arquivos de registro originais. Dependendo do formato do registro, use um ou vários filtros de extração para extrair todos os dados do registro. Se a string for:
- JSON nativo, a sintaxe do analisador é semelhante à Filtro JSON compatível com registros formatados em JSON. JSON aninhado não é compatível.
- O formato XML e a sintaxe do analisador são semelhantes ao filtro XML, que oferece suporte a registros formatados em XML.
- pares de chave-valor, a sintaxe do analisador é semelhante ao filtro Kv, que aceita mensagens formatadas com chave-valor.
- O formato CSV e a sintaxe do analisador são semelhantes ao filtro CSV, que aceita mensagens formatadas em CSV.
- todos os outros formatos, a sintaxe do analisador é semelhante ao filtro GROK com padrões GROK integrados. Isso usa instruções de extração no estilo Regex.
As Operações de segurança do Google fornecem um subconjunto dos recursos disponíveis em cada filtro. O Google Security Operations também oferece uma sintaxe de mapeamento de dados personalizada que não está disponível nos filtros. Consulte a referência de sintaxe do Analisador. para conferir uma descrição dos recursos com suporte e das funções personalizadas.
Continuando com o exemplo de registro do proxy da Web do Squid, a extração de dados a seguir inclui uma combinação de sintaxe do Logstash Grok e e expressões.
A instrução de extração a seguir armazena valores no seguinte intermediário variáveis:
when
srcip
action
returnCode
size
method
username
url
tgtip
Esta instrução de exemplo também usa a palavra-chave overwrite
para armazenar os valores extraídos
em cada variável. Se o processo de extração retornar um erro, a instrução on_error
vai definir o not_valid_log
como True
.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
Manipular e transformar os valores extraídos
As Operações de segurança do Google aproveitam os recursos do plug-in de filtro mutate do Logstash para permitir a manipulação dos valores extraídos do registro original. As Operações de segurança do Google fornecem um subconjunto dos recursos disponíveis no plug-in. Consulte a Sintaxe do analisador para ver uma descrição dos recursos com suporte e funções personalizadas, como:
- converter valores para um tipo de dados diferente
- substituir valores na string
- mesclar duas matrizes ou anexar uma string a uma matriz. Os valores de strings são convertida em uma matriz antes da mesclagem.
- converter para minúsculas ou maiúsculas
Esta seção fornece exemplos de transformação de dados baseada no registro do proxy da Web do Squid que você já apresentou.
Transformar o carimbo de data/hora do evento
Todos os eventos armazenados como um registro do UDM precisam ter um carimbo de data/hora. Neste exemplo,
verifica se um valor para os dados foi extraído do registro. Em seguida, ele usa o
Função de data do Grok
para corresponder o valor ao formato de hora UNIX
.
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
Transforme o valor de username
O exemplo de instrução abaixo converte o valor na variável username
em letras minúsculas.
mutate {
lowercase => [ "username"]
}
Transformar o valor action
O exemplo a seguir avalia o valor na variável intermediária action
e muda o valor para ALLOW, BLOCK ou UNKNOWN_ACTION, que são
valores válidos para o campo UDM security_result.action
. O UDM security_result.action
é um tipo enumerado que armazena apenas valores específicos.
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
Transformar o endereço IP de destino
O exemplo a seguir verifica se há um valor na variável intermediária tgtip
.
Se encontrado, o valor é associado a um padrão de endereço IP usando um padrão
predefinido do Grok. Se houver um erro que corresponda ao valor de um padrão de endereço IP, a
função on_error
vai definir a propriedade not_valid_tgtip
como True
. Se a correspondência
for bem-sucedida, a propriedade not_valid_tgtip
não será definida.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
Mudar o tipo de dados de returnCode e o tamanho
O exemplo a seguir transmite o valor na variável size
para
uinteger
e o valor na variável returnCode
para integer
. Isso é
necessário porque a variável size
será salva no
campo UDM network.received_bytes
, que armazena um tipo de dados int64
. O
returnCode
variável será salva no campo de UDM network.http.response_code
que armazena um tipo de dados int32
.
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
Atribuir valores aos campos de UDM em um evento
Depois que os valores forem extraídos e pré-processados, atribua-os a campos em um registro de evento da UDM. É possível atribuir valores extraídos e estáticos a um campo do UDM.
Se você preencher event.disambiguation_key
, verifique se esse campo é exclusivo para
cada evento gerado para um determinado registro. Se dois eventos diferentes tiverem o mesmo disambiguation_key
, isso resultará em um comportamento inesperado no sistema.
Os exemplos de analisadores nesta seção se baseiam no exemplo de registro de proxy da Web do Squid acima.
Salvar o carimbo de data/hora do evento
Cada registro de evento do UDM precisa ter um valor definido para o campo metadata.event_timestamp
do UDM. O exemplo a seguir salva o carimbo de data/hora do evento extraído do registro na
@timestamp
. As Operações de segurança do Google salvam isso na
metadata.event_timestamp
por padrão.
mutate {
rename => {
"when" => "timestamp"
}
}
Definir o tipo de evento
Cada registro de evento de UDM precisa ter um valor definido para o UDM metadata.event_type
. Esse campo é um tipo enumerado. O valor desse campo determina
quais campos adicionais do UDM precisam ser preenchidos para que o registro seja salvo.
O processo de análise e normalização vai falhar se algum dos campos obrigatórios não
contiver dados válidos.
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
Salve os valores username
e method
usando a instrução replace
.
Os valores nos campos intermediários username
e method
são strings. O exemplo a seguir verifica se um valor válido existe e, se sim, armazena o valor username
no campo UDM principal.user.userid
e o valor method
no campo UDM network.http.method
.
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
Salve o action
no campo de UDM security_result.action
.
Na seção anterior, o valor na variável intermediária action
era
avaliado e transformado em um dos valores padrão para o campo de UDM security_result.action
.
Os campos de UDM security_result
e action
armazenam uma matriz de itens,
o que significa que você precisa seguir uma abordagem um pouco diferente ao salvar esse
valor.
Primeiro, salve o valor transformado em um security_result.action
intermediário
. O campo security_result
é um pai do campo action
.
mutate {
merge => {
"security_result.action" => "action"
}
}
Em seguida, salve o campo intermediário security_result.action
no
Campo de UDM security_result
. O campo UDM security_result
armazena uma matriz de
itens. Portanto, o valor é anexado a esse campo.
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
Armazene o endereço IP de destino e o endereço IP de origem usando a instrução merge
Armazene os seguintes valores no registro de eventos de UDM:
- Valor na variável intermediária
srcip
para o campo UDMprincipal.ip
. - Valor na variável intermediária
tgtip
no campo de UDMtarget.ip
.
Os campos do UDM principal.ip
e target.ip
armazenam uma matriz de itens. Portanto,
são anexados a cada campo.
Os exemplos abaixo demonstram abordagens diferentes para salvar esses valores.
Durante a etapa de transformação, a variável intermediária tgtip
foi correspondida a uma
usando um padrão Grok predefinido. A instrução de exemplo a seguir
verifica se a propriedade not_valid_tgtip
é verdadeira, indicando que tgtip
não corresponde a um padrão de endereço IP. Se for falso, ele salva
Valor tgtip
ao campo de UDM target.ip
.
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
A variável intermediária srcip
não foi transformada. A instrução a seguir
verifica se um valor foi extraído do registro original e, se sim, salva
o valor no campo UDM principal.ip
.
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
Salve url
, returnCode
e size
usando a instrução rename
.
A instrução de exemplo abaixo armazena os valores a seguir usando a instrução rename
.
- A variável
url
salva no campo de UDMtarget.url
. - A variável intermediária
returnCode
salva no campo UDMnetwork.http.response_code
. - A variável intermediária
size
salva no campo de UDMnetwork.received_bytes
.
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
Vincular o registro do UDM à saída
A instrução final na instrução de mapeamento de dados gera os dados processados a um registro de evento de UDM.
mutate {
merge => {
"@output" => "event"
}
}
O código completo do analisador
Este é o exemplo completo do código do analisador. A ordem das instruções não segue mesma ordem das seções anteriores deste documento, mas resulta na mesma saída.
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter