Información general sobre el análisis de registros

Disponible en:

En este documento se ofrece una descripción general de cómo analiza Google Security Operations los registros sin procesar para convertirlos al formato del modelo de datos unificado (UDM).

Google SecOps puede recibir datos de registro procedentes de las siguientes fuentes de ingesta:

  • Reenvío de Google SecOps
  • Feed de la API Chronicle
  • API Chronicle Ingestion
  • Partner tecnológico externo

Por lo general, los clientes envían los datos como registros sin procesar originales. Google SecOps identifica de forma única el dispositivo que ha generado los registros mediante LogType. LogType identifica lo siguiente:

  • el proveedor y el dispositivo que generaron el registro, como Cisco Firewall, el servidor DHCP de Linux o Bro DNS.
  • qué analizador convierte el registro sin procesar en UDM estructurado. Hay una relación de uno a uno entre un analizador y un LogType. Cada analizador convierte los datos recibidos por un solo LogType.

Google SecOps proporciona un conjunto de analizadores predeterminados que leen los registros sin procesar originales y generan registros UDM estructurados a partir de los datos de los registros sin procesar originales. Google SecOps mantiene estos analizadores. Los clientes también pueden definir instrucciones de asignación de datos personalizadas creando un analizador específico para cada cliente.

Flujo de trabajo de ingestión y normalización

El analizador contiene instrucciones de asignación de datos. Define cómo se asignan los datos del registro sin procesar original a uno o varios campos de la estructura de datos del UDM.

Si no hay errores de análisis, Google SecOps crea un registro estructurado de UDM con los datos del registro sin procesar. El proceso de convertir un registro sin procesar en un registro de UDM se denomina normalización.

Un analizador predeterminado puede asignar un subconjunto de valores principales del registro sin procesar. Por lo general, estos campos principales son los más importantes para proporcionar estadísticas de seguridad en Google SecOps. Los valores sin asignar permanecen en el registro sin procesar, pero no se almacenan en el registro de UDM.

Los clientes también pueden usar la API Ingestion para enviar datos en formato UDM estructurado.

Personalizar cómo se analizan los datos ingeridos

Google SecOps ofrece las siguientes funciones que permiten a los clientes personalizar el análisis de datos de registro originales entrantes.

  • Analizadores específicos de clientes: los clientes crean una configuración de analizador personalizado para un tipo de registro específico que se ajuste a sus requisitos. Un analizador específico de un cliente sustituye al analizador predeterminado de un LogType concreto. Para obtener más información, consulta Gestionar analizadores precompilados y personalizados.
  • Extensiones de analizador: los clientes pueden añadir instrucciones de asignación personalizadas además de la configuración del analizador predeterminado. Cada cliente puede crear su propio conjunto único de instrucciones de asignación personalizadas. Estas instrucciones de asignación definen cómo extraer y transformar campos adicionales de los registros sin procesar originales en campos de UDM. Una extensión de analizador no sustituye al analizador predeterminado ni al específico del cliente.

Ejemplo con un registro de proxy web de Squid

En esta sección se proporciona un ejemplo de registro de proxy web de Squid y se describe cómo se asignan los valores a un registro de UDM. Para ver una descripción de todos los campos del esquema del UDM, consulta la lista de campos del modelo de datos unificado.

El registro de proxy web de Squid de ejemplo contiene valores separados por espacios. Cada registro representa un evento y almacena los siguientes datos: marca de tiempo, duración, cliente, código o estado del resultado, bytes transmitidos, método de solicitud, URL, usuario, código de jerarquía y tipo de contenido. En este ejemplo, se extraen los siguientes campos y se asignan a un registro de UDM: hora, cliente, estado del resultado, bytes, método de solicitud y URL.

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Ejemplo de proxy web Squid

Al comparar estas estructuras, observará que solo se incluye un subconjunto de los datos de registro originales en el registro de UDM. Algunos campos son obligatorios y otros opcionales. Además, solo un subconjunto de las secciones del registro de UDM contiene datos. Si el analizador no asigna datos del registro original al registro UDM, no verás esa sección del registro UDM en Google SecOps.

Valores de registro asignados a UDM

La sección metadata almacena la marca de tiempo del evento. Ten en cuenta que el valor se ha convertido del formato EPOCH al formato RFC 3339. Esta conversión es opcional. La marca de tiempo se puede almacenar en formato EPOCH, con un preprocesamiento para separar las partes de segundos y milisegundos en campos independientes.

El campo metadata.event_type almacena el valor NETWORK_HTTP, que es un valor enumerado que identifica el tipo de evento. El valor de metadata.event_type determina qué campos UDM adicionales son obligatorios y cuáles opcionales. Los valores product_name y vendor_name contienen descripciones sencillas del dispositivo que registró el registro original.

El metadata.event_type de un registro de eventos de UDM no es el mismo que el log_type definido al ingerir datos mediante la API Ingestion. Estos dos atributos almacenan información diferente.

La sección network contiene valores del evento de registro original. En este ejemplo, el valor de estado del registro original se ha analizado a partir del campo "result code/status" antes de escribirse en el registro de UDM. Solo se incluyó el result_code en el registro UDM.

Valores de registro asignados a UDM

La sección principal almacena la información del cliente del registro original. La sección target almacena tanto la URL completa como la dirección IP.

La sección security_result almacena uno de los valores de enumeración para representar la acción que se registró en el registro original.

Este es el registro de UDM con formato JSON. Ten en cuenta que solo se incluyen las secciones que contienen datos. No se incluyen las secciones src, observer, intermediary, about y extensions.

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

Pasos de las instrucciones del analizador

Las instrucciones de asignación de datos de un analizador siguen un patrón común, como se indica a continuación:

  1. Analiza y extrae datos del registro original.
  2. Manipula los datos extraídos. Esto incluye el uso de lógica condicional para analizar valores de forma selectiva, convertir tipos de datos, sustituir subcadenas en un valor, convertir a mayúsculas o minúsculas, etc.
  3. Asigna valores a los campos de UDM.
  4. Genera el registro de UDM asignado en la clave @output.

Analizar y extraer datos del registro original

Definir la instrucción de filtro

La instrucción filter es la primera instrucción del conjunto de instrucciones de análisis. Todas las instrucciones de análisis adicionales se incluyen en la instrucción filter.

filter {

}

Inicializar variables que almacenarán los valores extraídos

En la instrucción filter, inicializa las variables intermedias que el analizador usará para almacenar los valores extraídos del registro.

Estas variables se usan cada vez que se analiza un registro individual. El valor de cada variable intermedia se asignará a uno o varios campos de UDM más adelante en las instrucciones de análisis.

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

Extraer valores individuales del registro

Google SecOps proporciona un conjunto de filtros basados en Logstash para extraer campos de archivos de registro originales. En función del formato del registro, puede usar uno o varios filtros de extracción para extraer todos los datos del registro. Si la cadena es:

  • JSON nativo. La sintaxis del analizador es similar a la del filtro JSON, que admite registros con formato JSON. No se admite JSON anidado.
  • El formato XML y la sintaxis del analizador son similares al filtro XML, que admite registros con formato XML.
  • pares clave-valor. La sintaxis del analizador es similar a la del filtro Kv, que admite mensajes con formato clave-valor.
  • El formato CSV y la sintaxis del analizador son similares al filtro CSV, que admite mensajes con formato CSV.
  • En el resto de los formatos, la sintaxis del analizador es similar al filtro GROK con patrones integrados de GROK . Se usan instrucciones de extracción de estilo Regex.

Google SecOps ofrece un subconjunto de las funciones disponibles en cada filtro. Google SecOps también proporciona una sintaxis de asignación de datos personalizada que no está disponible en los filtros. Consulte la referencia de sintaxis del analizador para ver una descripción de las funciones admitidas y las funciones personalizadas.

Siguiendo con el ejemplo del registro del proxy web Squid, la siguiente instrucción de extracción de datos incluye una combinación de sintaxis Grok de Logstash y expresiones regulares.

La siguiente instrucción de extracción almacena valores en las siguientes variables intermedias:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

Esta instrucción de ejemplo también usa la palabra clave overwrite para almacenar los valores extraídos en cada variable. Si el proceso de extracción devuelve un error, la instrucción on_error asigna el valor True a not_valid_log.

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

Manipular y transformar los valores extraídos

Google SecOps aprovecha las funciones del complemento de filtro mutate de Logstash para permitir la manipulación de los valores extraídos del registro original. Google SecOps ofrece un subconjunto de las funciones disponibles en el complemento. Consulta la sintaxis del analizador para ver una descripción de las funciones admitidas y las funciones personalizadas, como las siguientes:

  • Convertir valores a otro tipo de datos
  • reemplazar valores en la cadena
  • Combinar dos arrays o añadir una cadena a un array. Los valores de cadena se convierten en un array antes de combinarse.
  • Convertir a minúsculas o mayúsculas

En esta sección se proporcionan ejemplos de transformación de datos basados en el registro del proxy web Squid que se ha presentado anteriormente.

Transformar la marca de tiempo del evento

Todos los eventos almacenados como registro de UDM deben tener una marca de tiempo de evento. En este ejemplo se comprueba si se ha extraído un valor de los datos del registro. Después, usa la función de fecha de Grok para asociar el valor al formato de hora UNIX.

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

Transformar el valor de username

La siguiente instrucción de ejemplo convierte el valor de la variable username a minúsculas.

mutate {
   lowercase => [ "username"]
   }

Transformar el valor de action

En el siguiente ejemplo se evalúa el valor de la variable intermedia action y se cambia el valor a ALLOW, BLOCK o UNKNOWN_ACTION, que son valores válidos para el campo security_result.action de UDM. El campo security_result.action UDM es un tipo enumerado que solo almacena valores específicos.

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

Transformar la dirección IP de destino

En el siguiente ejemplo se comprueba si hay un valor en la variable intermedia tgtip. Si se encuentra, el valor se compara con un patrón de dirección IP mediante un patrón Grok predefinido. Si se produce un error al asociar el valor con un patrón de dirección IP, la función on_error asigna el valor True a la propiedad not_valid_tgtip. Si la coincidencia se realiza correctamente, no se define la propiedad not_valid_tgtip.

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

Cambiar el tipo de datos y el tamaño de returnCode

En el siguiente ejemplo, se convierte el valor de la variable size a uinteger y el valor de la variable returnCode a integer. Esto es obligatorio porque la variable size se guardará en el campo network.received_bytes de UDM, que almacena un tipo de datos int64. La variable returnCode se guardará en el campo network.http.response_code de UDM, que almacena un tipo de datos int32.

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

Asignar valores a campos de UDM en un evento

Una vez que se han extraído y preprocesado los valores, asígnalos a los campos de un registro de evento de UDM. Puede asignar tanto valores extraídos como valores estáticos a un campo de UDM.

Si rellena event.disambiguation_key, asegúrese de que este campo sea único para cada evento que se genere en el registro correspondiente. Si dos eventos diferentes tienen el mismo disambiguation_key, se producirá un comportamiento inesperado en el sistema.

Los ejemplos de analizadores de esta sección se basan en el ejemplo de registro de proxy web de Squid anterior.

Guardar la marca de tiempo del evento

Todos los registros de eventos de UDM deben tener un valor asignado al campo metadata.event_timestamp de UDM. En el siguiente ejemplo se guarda la marca de tiempo del evento extraída del registro en la variable integrada @timestamp. Google Security Operations guarda esta información en el campo metadata.event_timestampUDM de forma predeterminada.

mutate {
  rename => {
    "when" => "timestamp"
  }
}

Definir el tipo de evento

Todos los registros de eventos de UDM deben tener un valor asignado al campo metadata.event_type de UDM. Este campo es un tipo enumerado. El valor de este campo determina qué campos de UDM adicionales se deben rellenar para que se guarde el registro de UDM. El proceso de análisis y normalización fallará si alguno de los campos obligatorios no contiene datos válidos.

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

Guarda los valores username y method con la instrucción replace.

Los valores de los campos intermedios username y method son cadenas. En el siguiente ejemplo se comprueba si existe un valor válido y, si es así, se almacena el valor de username en el campo de UDM principal.user.userid y el valor de method en el campo de UDM network.http.method.

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

Guardar el action en el campo security_result.action de UDM

En la sección anterior, se ha evaluado el valor de la variable intermedia action y se ha transformado en uno de los valores estándar del campo security_result.action de UDM.

Tanto el campo security_result como el action de UDM almacenan un array de elementos, por lo que debes seguir un enfoque ligeramente diferente al guardar este valor.

Primero, guarda el valor transformado en un campo security_result.action intermediario. El campo security_result es el elemento superior del campo action.

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

A continuación, guarda el campo security_result.actionintermediarysecurity_result en el campo UDM. El campo de UDM security_result almacena un array de elementos, por lo que el valor se añade a este campo.

 # save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

Almacena la dirección IP de destino y la dirección IP de origen mediante la instrucción merge.

Almacena los siguientes valores en el registro de eventos de UDM:

  • Valor de la variable intermedia srcip al campo de UDM principal.ip.
  • Valor de la variable intermedia tgtip al campo de UDM target.ip.

Los campos principal.ip y target.ip de UDM almacenan un array de elementos, por lo que los valores se añaden a cada campo.

En los siguientes ejemplos se muestran diferentes enfoques para guardar estos valores. Durante el paso de transformación, la tgtipvariable intermedia se ha asociado a una dirección IP mediante un patrón Grok predefinido. En la siguiente instrucción de ejemplo se comprueba si la propiedad not_valid_tgtip es true, lo que indica que el valor tgtip no se ha podido asociar a un patrón de dirección IP. Si es "false", guarda el valor de tgtip en el campo de UDM target.ip.

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

La variable intermedia srcip no se ha transformado. La siguiente instrucción comprueba si se ha extraído un valor del registro original y, si es así, guarda el valor en el campo principal.ip de UDM.

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

Guardar url, returnCode y size con la instrucción rename

En la siguiente instrucción de ejemplo se almacenan los valores mediante la instrucción rename:

  • La variable url se ha guardado en el campo target.url de la gestión de datos de usuario.
  • La variable intermedia returnCode se ha guardado en el campo network.http.response_code de la gestión de datos de marketing.
  • La variable intermedia size se ha guardado en el campo network.received_bytes de la gestión de datos de marketing.
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

Vincular el registro UDM a la salida

La instrucción final de la asignación de datos genera los datos procesados en un registro de evento de UDM.

mutate {
    merge => {
      "@output" => "event"
    }
  }

El código completo del analizador

Este es el ejemplo de código completo del analizador. El orden de las instrucciones no es el mismo que en las secciones anteriores de este documento, pero el resultado es el mismo.

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }


if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.