Vista geral da análise sintática de registos
Este documento oferece uma vista geral de como o Google Security Operations analisa os registos não processados para o formato do modelo de dados unificado (UDM).
O Google SecOps pode receber dados de registo provenientes das seguintes origens de carregamento:
- Encaminhador do Google SecOps
- Feed da API Chronicle
- Chronicle Ingestion API
- Parceiro tecnológico externo
Em geral, os clientes enviam dados como registos não processados originais. O Google SecOps identifica exclusivamente o dispositivo que gerou os registos através do LogType. O LogType identifica o seguinte:
- O fornecedor e o dispositivo que geraram o registo, como a firewall Cisco, o servidor DHCP Linux ou o DNS Bro.
- que analisador converte o registo não processado em UDM estruturado. Existe uma relação individual entre um analisador e um LogType. Cada analisador converte os dados recebidos por um único LogType.
O Google SecOps fornece um conjunto de analisadores predefinidos que leem registos não processados originais e geram registos UDM estruturados através de dados no registo não processado original. A Google SecOps mantém estes analisadores. Os clientes também podem definir instruções de mapeamento de dados personalizadas criando um analisador específico do cliente.
O analisador contém instruções de mapeamento de dados. Define como os dados são mapeados do registo não processado original para um ou mais campos na estrutura de dados do UDM.
Se não existirem erros de análise, o Google SecOps cria um registo estruturado de UDM com dados do registo não processado. O processo de conversão de um registo não processado num registo UDM chama-se normalização.
Um analisador predefinido pode mapear um subconjunto de valores principais do registo não processado. Normalmente, estes campos principais são os mais importantes para fornecer estatísticas de segurança no Google SecOps. Os valores não mapeados permanecem no registo não processado, mas não são armazenados no registo da UDM.
Um cliente também pode usar a API Ingestion para enviar dados no formato UDM estruturado.
Personalize a forma como os dados carregados são analisados
O Google SecOps oferece as seguintes capacidades que permitem aos clientes personalizar a análise de dados nos dados de registo originais recebidos.
- Analisadores específicos do cliente: os clientes criam uma configuração de analisador personalizado para um tipo de registo específico que cumpre os respetivos requisitos específicos. Um analisador específico do cliente substitui o analisador predefinido para o LogType específico. Para mais detalhes, consulte o artigo Faça a gestão de analisadores predefinidos e personalizados.
- Extensões do analisador: os clientes podem adicionar instruções de mapeamento personalizadas, além da configuração do analisador predefinida. Cada cliente pode criar o seu próprio conjunto único de instruções de mapeamento personalizado. Estas instruções de mapeamento definem como extrair e transformar campos adicionais dos registos não processados originais em campos da UDM. Uma extensão de analisador não substitui o analisador predefinido nem o analisador específico do cliente.
Um exemplo que usa um registo de proxy Web Squid
Esta secção fornece um exemplo de registo do proxy Web Squid e descreve como os valores são mapeados para um registo UDM. Para ver uma descrição de todos os campos no esquema do UDM, consulte a lista de campos do modelo de dados unificado.
O exemplo de registo do proxy Web Squid contém valores separados por espaços. Cada registo representa um evento e armazena os seguintes dados: data/hora, duração, cliente, código/estado do resultado, bytes transmitidos, método de pedido, URL, utilizador, código de hierarquia e tipo de conteúdo. Neste exemplo, os seguintes campos são extraídos e mapeados num registo UDM: hora, cliente, estado do resultado, bytes, método de pedido e URL.
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
À medida que compara estas estruturas, repare que apenas um subconjunto dos dados de registo originais está incluído no registo do UDM. Determinados campos são obrigatórios e outros são opcionais. Além disso, apenas um subconjunto das secções no registo UDM contém dados. Se o analisador não mapear os dados do registo original para o registo UDM, não vê essa secção do registo UDM no Google SecOps.
A secção metadata
armazena a data/hora do evento. Repare que o valor foi convertido do formato EPOCH para o formato RFC 3339. Esta conversão é opcional. A data/hora pode ser armazenada no formato EPOCH, com pré-processamento para separar as partes dos segundos e dos milissegundos em campos separados.
O campo metadata.event_type
armazena o valor NETWORK_HTTP
, que é um valor enumerado
que identifica o tipo de evento. O valor de metadata.event_type
determina que campos UDM adicionais são obrigatórios ou opcionais. Os valores product_name
e vendor_name
contêm descrições intuitivas do dispositivo que registou o registo original.
O metadata.event_type
num registo de eventos da UDM não é igual ao log_type definido quando são carregados dados através da API de carregamento. Estes dois atributos armazenam informações diferentes.
A secção network
contém valores do evento de registo original. Repare neste exemplo que o valor do estado do registo original foi analisado a partir do campo "result
code/status" antes de ser escrito no registo UDM. Apenas o result_code foi incluído no registo UDM.
A secção principal
armazena as informações do cliente do registo original. A secção target
armazena o URL totalmente qualificado e o endereço IP.
A secção security_result
armazena um dos valores de enumeração para representar a ação que foi registada no registo original.
Este é o registo UDM formatado como JSON. Tenha em atenção que apenas são incluídas as secções que
contêm dados. As secções src
, observer
, intermediary
, about
e extensions
não estão incluídas.
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
Passos nas instruções do analisador
As instruções de mapeamento de dados num analisador seguem um padrão comum, da seguinte forma:
- Analise e extraia dados do registo original.
- Manipular os dados extraídos. Isto inclui a utilização da lógica condicional para analisar seletivamente valores, converter tipos de dados, substituir substrings num valor, converter em maiúsculas ou minúsculas, etc.
- Atribua valores a campos da UDM.
- Produza o registo UDM mapeado para a chave @output.
Analise e extraia dados do registo original
Defina a declaração de filtro
A declaração filter
é a primeira declaração no conjunto de instruções de análise.
Todas as instruções de análise adicionais estão contidas na declaração filter
.
filter {
}
Inicialize as variáveis que vão armazenar os valores extraídos
Na declaração filter
, inicialize as variáveis intermédias que o analisador vai usar para armazenar os valores extraídos do registo.
Estas variáveis são usadas sempre que um registo individual é analisado. O valor em cada variável intermédia vai ser definido para um ou mais campos de UDM mais tarde nas instruções de análise.
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
Extraia valores individuais do registo
O Google SecOps fornece um conjunto de filtros, com base no Logstash, para extrair campos de ficheiros de registo originais. Consoante o formato do registo, usa um ou vários filtros de extração para extrair todos os dados do registo. Se a string for:
- JSON nativo, a sintaxe do analisador é semelhante ao filtro JSON que suporta registos formatados em JSON. O JSON aninhado não é suportado.
- O formato XML e a sintaxe do analisador são semelhantes ao filtro XML, que suporta registos formatados em XML.
- pares de chave-valor, a sintaxe do analisador é semelhante ao filtro Kv que suporta mensagens formatadas com chave-valor.
- No formato CSV, a sintaxe do analisador é semelhante ao filtro CSV, que suporta mensagens formatadas em CSV.
- Para todos os outros formatos, a sintaxe do analisador é semelhante ao filtro GROK com padrões incorporados do GROK . Isto usa instruções de extração no estilo Regex.
O Google SecOps oferece um subconjunto das capacidades disponíveis em cada filtro. O Google SecOps também fornece uma sintaxe de mapeamento de dados personalizada não disponível nos filtros. Consulte a referência da sintaxe do analisador para ver uma descrição das funcionalidades suportadas e das funções personalizadas.
Continuando com o exemplo de registo do proxy Web Squid, a seguinte instrução de extração de dados inclui uma combinação da sintaxe Grok do Logstash e expressões regulares.
A seguinte declaração de extração armazena valores nas seguintes variáveis intermédias:
when
srcip
action
returnCode
size
method
username
url
tgtip
Esta declaração de exemplo também usa a palavra-chave overwrite
para armazenar os valores extraídos em cada variável. Se o processo de extração devolver um erro, a instrução on_error
define not_valid_log
como True
.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
Manipular e transformar os valores extraídos
O Google SecOps tira partido das capacidades do plug-in de filtro de mutação do Logstash para permitir a manipulação de valores extraídos do registo original. O Google SecOps oferece um subconjunto das capacidades disponíveis no plug-in. Consulte a sintaxe do analisador para ver uma descrição das funcionalidades suportadas e das funções personalizadas, como:
- converter valores para um tipo de dados diferente
- substituir valores na string
- Unir duas matrizes ou acrescentar uma string a uma matriz. Os valores de strings são convertidos num array antes da união.
- Converter em minúsculas ou maiúsculas
Esta secção apresenta exemplos de transformação de dados baseados no registo do proxy Web Squid apresentado anteriormente.
Transforme a data/hora do evento
Todos os eventos armazenados como um registo da UDM têm de ter uma data/hora do evento. Este exemplo verifica se foi extraído um valor dos dados do registo. Em seguida, usa a função de data Grok para fazer corresponder o valor ao formato de hora UNIX
.
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
Transforme o valor de username
A declaração de exemplo seguinte converte o valor na variável username
em letras minúsculas.
mutate {
lowercase => [ "username"]
}
Transforme o valor de action
O exemplo seguinte avalia o valor na variável intermédia action
e altera o valor para ALLOW, BLOCK ou UNKNOWN_ACTION, que são valores válidos para o campo security_result.action
UDM. O campo security_result.action
UDM
é um tipo enumerado que armazena apenas valores específicos.
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
Transforme o endereço IP de destino
O exemplo seguinte verifica um valor na tgtip
variável intermédia.
Se for encontrado, o valor é correspondido a um padrão de endereço IP através de um padrão Grok predefinido. Se existir um erro na correspondência do valor com um padrão de endereço IP, a função on_error
define a propriedade not_valid_tgtip
como True
. Se a correspondência for bem-sucedida, a propriedade not_valid_tgtip
não é definida.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
Altere o tipo de dados de returnCode e o tamanho
O exemplo seguinte converte o valor na variável size
em uinteger
e o valor na variável returnCode
em integer
. Isto é obrigatório porque a variável size
vai ser guardada no campo network.received_bytes
UDM, que armazena um tipo de dados int64
. A variável returnCode
vai ser guardada no campo network.http.response_code
UDM, que armazena um tipo de dados int32
.
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
Atribua valores a campos de UDM num evento
Depois de os valores serem extraídos e pré-processados, atribua-os a campos num registo de eventos da UDM. Pode atribuir valores extraídos e valores estáticos a um campo de UDM.
Se preencher event.disambiguation_key
, certifique-se de que este campo é exclusivo para
cada evento gerado para o registo especificado. Se dois eventos diferentes tiverem o mesmo disambiguation_key
, isto resulta num comportamento inesperado no sistema.
Os exemplos de análise sintática nesta secção baseiam-se no exemplo de registo do proxy Web Squid anterior.
Guarde a data/hora do evento
Cada registo de evento da UDM tem de ter um valor definido para o campo da UDM.metadata.event_timestamp
O exemplo seguinte guarda a data/hora do evento extraída do registo na variável integrada @timestamp
. O Google Security Operations guarda esta informação no campo metadata.event_timestamp
do UDM por predefinição.
mutate {
rename => {
"when" => "timestamp"
}
}
Defina o tipo de evento
Cada registo de evento da UDM tem de ter um valor definido para o campo da UDM metadata.event_type
. Este campo é um tipo enumerado. O valor deste campo determina que campos UDM adicionais têm de ser preenchidos para que o registo UDM seja guardado.
O processo de análise e normalização falha se algum dos campos obrigatórios não contiver dados válidos.
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
Guarde os valores username
e method
com a declaração replace
Os valores nos campos intermédios username
e method
são strings. O exemplo seguinte verifica se existe um valor válido e, se existir, armazena o valor username
no campo principal.user.userid
UDM e o valor method
no campo network.http.method
UDM.
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
Guarde o action
no campo UDM security_result.action
Na secção anterior, o valor na variável intermédia action
foi avaliado e transformado num dos valores padrão para o campo security_result.action
UDM.
Os campos security_result
e action
UDM armazenam uma matriz de itens, o que significa que tem de seguir uma abordagem ligeiramente diferente quando guarda este valor.
Primeiro, guarde o valor transformado num campo security_result.action
intermediário. O campo security_result
é um principal do campo action
.
mutate {
merge => {
"security_result.action" => "action"
}
}
Em seguida, guarde o campo intermediário security_result.action
no campo security_result
do UDM. O campo security_result
UDM armazena uma matriz de itens, pelo que o valor é anexado a este campo.
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
Armazenar o endereço IP de destino e o endereço IP de origem através da declaração merge
Armazene os seguintes valores no registo de eventos da UDM:
- Valor na variável intermédia
srcip
para o campoprincipal.ip
UDM. - Valor na variável intermédia
tgtip
para o campotarget.ip
UDM.
Os campos da UDM principal.ip
e target.ip
armazenam uma matriz de itens, pelo que os valores são anexados a cada campo.
Os exemplos seguintes demonstram diferentes abordagens para guardar estes valores.
Durante o passo de transformação, a variável intermédia tgtip
foi associada a um endereço IP através de um padrão Grok predefinido. A declaração de exemplo seguinte verifica se a propriedade not_valid_tgtip
é verdadeira, o que indica que não foi possível fazer a correspondência do valor tgtip
com um padrão de endereço IP. Se for false, guarda o valor tgtip
no campo UDM target.ip
.
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
A variável intermédia srcip
não foi transformada. A declaração seguinte verifica se foi extraído um valor do registo original e, se tiver sido, guarda o valor no campo principal.ip
UDM.
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
Guarde url
, returnCode
e size
através da declaração rename
A declaração de exemplo seguinte armazena os valores através da declaração rename
:
- A variável
url
foi guardada no campotarget.url
da UDM. - A variável intermédia
returnCode
foi guardada no camponetwork.http.response_code
UDM. - A variável intermédia
size
foi guardada no camponetwork.received_bytes
UDM.
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
Associe o registo UDM à saída
A declaração final na instrução de mapeamento de dados produz os dados processados num registo de eventos da UDM.
mutate {
merge => {
"@output" => "event"
}
}
O código do analisador completo
Este é o exemplo de código do analisador completo. A ordem das instruções não segue a mesma ordem que as secções anteriores deste documento, mas resulta no mesmo resultado.
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.