Descripción general del análisis de registros

En este documento, se proporciona una descripción general de cómo Chronicle analiza los registros sin procesar en el formato del Modelo de datos unificado (UDM).

Chronicle puede recibir datos de registro que se originan en las siguientes fuentes de transferencia:

  • Servidor de reenvío de Chronicle
  • Feed de la API de Chronicle
  • API de transferencia de Chronicle
  • Socio de tecnología externo

En general, los clientes envían datos como registros originales sin procesar. Chronicle identifica de forma única el dispositivo que generó los registros con el LogType. El LogType identifica lo siguiente:

  • el proveedor y el dispositivo que generaron el registro, como Cisco Firewall, Linux DHCP Server o Bro DNS.
  • qué analizador convierte el registro sin procesar en un modelo de datos unificado (UDM) estructurado. Existe una relación de uno a uno entre un analizador y un LogType. Cada analizador convierte los datos que recibe un solo LogType.

Chronicle proporciona un conjunto de analizadores predeterminados que leen registros originales sin procesar y generan registros UDM estructurados con datos del registro original sin procesar. Chronicle mantiene estos analizadores. Los clientes también pueden definir instrucciones de asignación de datos personalizadas mediante la creación de un analizador específico del cliente. Comunícate con tu representante de Chronicle para obtener información sobre cómo crear un analizador específico del cliente.

Flujo de trabajo de transferencia y normalización

El analizador contiene instrucciones de asignación de datos. Define cómo se asignan los datos del registro original sin procesar a uno o más campos en la estructura de datos de UDM.

Si no hay errores de análisis, Chronicle crea un registro estructurado en UDM con datos del registro sin procesar. El proceso de convertir un registro sin procesar en un registro UDM se denomina normalización.

Un analizador predeterminado puede asignar un subconjunto de valores principales del registro sin procesar. Por lo general, estos campos principales son los más importantes para proporcionar estadísticas de seguridad en Chronicle. Los valores sin asignar permanecen en el registro sin procesar, pero no se almacenan en el registro UDM.

Un cliente también puede usar la API de Transferencia para enviar datos en formato estructurado de Modelo de datos unificado (UDM).

Personaliza la forma en que se analizan los datos transferidos

Chronicle ofrece las siguientes funciones que permiten a los clientes personalizar el análisis de datos en los datos de registro originales entrantes.

  • Analizadores específicos del cliente: Los clientes crean una configuración de analizador personalizada para un tipo de registro específico que cumple con sus requisitos específicos. Un analizador específico del cliente reemplaza el analizador predeterminado para el LogType específico. Comunícate con tu representante de Chronicle para obtener información sobre cómo crear un analizador específico del cliente.
  • Extensiones del analizador: Los clientes pueden agregar instrucciones de asignación personalizadas además de la configuración predeterminada del analizador. Cada cliente puede crear su propio conjunto único de instrucciones de asignación personalizadas. Mediante estas instrucciones de asignación, se define cómo extraer y transformar campos adicionales de registros sin procesar originales a campos de UDM. La extensión del analizador no reemplaza el analizador predeterminado o específico del cliente.

Ejemplo con un registro del proxy web de Squid

En esta sección, se proporciona un ejemplo de registro del proxy web de Squid y se describe cómo se asignan los valores a un registro UDM. Para obtener una descripción de todos los campos en el esquema de UDM, consulta la lista de campos del Modelo de datos unificado.

El registro del proxy web de Squid de ejemplo contiene valores separados por espacios. Cada registro representa un evento y almacena los siguientes datos: marca de tiempo, duración, cliente, código de resultado/estado del resultado, bytes transmitidos, método de solicitud, URL, usuario, código de jerarquía y tipo de contenido. En este ejemplo, los siguientes campos se extraen y se asignan en un registro UDM: hora, cliente, estado del resultado, bytes, método de solicitud y URL.

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Ejemplo de proxy web de Squid

A medida que compares estas estructuras, observa que solo se incluye un subconjunto de los datos de registro originales en el registro UDM. Algunos campos son obligatorios y otros son opcionales. Además, solo un subconjunto de las secciones en el registro UDM contiene datos. Si el analizador no asigna datos del registro original al registro UDM, no verás esa sección del registro de UDM en Chronicle.

Valores de registro asignados a UDM

La sección metadata almacena la marca de tiempo del evento. Ten en cuenta que el valor se convirtió de EPOCH al formato RFC 3339. Esta conversión es opcional. La marca de tiempo se puede almacenar como formato EPOCH, con procesamiento previo para separar las partes de segundos y milisegundos en campos separados.

El campo metadata.event_type almacena el valor NETWORK_HTTP, que es un valor enumerado que identifica el tipo de evento. El valor de metadata.event_type determina qué campos adicionales de UDM son obligatorios y cuáles opcionales. Los valores product_name y vendor_name contienen descripciones fáciles de usar del dispositivo que registró el registro original.

El metadata.event_type de un registro de evento de UDM no es el mismo que el log_type definido cuando se transfieren datos con la API de transferencia. Estos dos atributos almacenan información diferente.

La sección network contiene valores del evento de registro original. Observa en este ejemplo que el valor de estado del registro original se analizó a partir del campo “código/estado del resultado” antes de escribirse en el registro de UDM. Solo se incluyó el result_code en el registro UDM.

Valores de registro asignados a UDM

En la sección principal, se almacena la información del cliente del registro original. En la sección target, se almacena la URL completamente calificada y la dirección IP.

La sección security_result almacena uno de los valores enum para representar la acción que se registró en el registro original.

Este es el registro UDM con formato JSON. Ten en cuenta que solo se incluyen las secciones que contienen datos. No se incluyen las secciones src, observer, intermediary, about y extensions.

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

Pasos en las instrucciones del analizador

Las instrucciones de asignación de datos dentro de un analizador siguen un patrón común, como el siguiente:

  1. Analiza y extrae datos del registro original.
  2. Manipular los datos extraídos. Esto incluye el uso de lógica condicional para analizar valores de forma selectiva, convertir tipos de datos, reemplazar subcadenas en un valor, convertir a mayúsculas o minúsculas, etcétera.
  3. Asigna valores a los campos de UDM.
  4. Envía el registro UDM asignado a la clave @output.

Analiza y extrae datos del registro original

Configura la sentencia filter

La sentencia filter es la primera sentencia en el conjunto de instrucciones de análisis. La sentencia filter contiene todas las instrucciones de análisis adicionales.

filter {

}

Inicializa variables que almacenarán valores extraídos

Dentro de la declaración filter, inicializa las variables intermedias que el analizador usará para almacenar valores extraídos del registro.

Estas variables se usan cada vez que se analiza un registro individual. El valor de cada variable intermedia se establecerá en uno o más campos de UDM más adelante en las instrucciones de análisis.

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

Extrae valores individuales del registro

Chronicle proporciona un conjunto de filtros, basados en Logstash, para extraer campos de los archivos de registro originales. Según el formato del registro, utilizas uno o varios filtros de extracción para extraer todos los datos del registro. Si la cadena es:

  • JSON nativo, la sintaxis del analizador es similar al filtro JSON, que admite registros con formato JSON. No se admite el formato JSON anidado.
  • XML, la sintaxis del analizador es similar al filtro XML, que admite registros con formato XML.
  • pares clave-valor, la sintaxis del analizador es similar al filtro Kv que admite mensajes con formato de clave-valor.
  • CSV, la sintaxis del analizador es similar al filtro CSV, que admite mensajes en formato CSV.
  • todos los demás formatos, la sintaxis del analizador es similar al filtro GROK con patrones integrados GROK . Se usan instrucciones de extracción de estilo Regex.

Chronicle proporciona un subconjunto de las funciones disponibles en cada filtro. Chronicle también proporciona una sintaxis de asignación de datos personalizada que no está disponible en los filtros. Consulta la referencia de la sintaxis del analizador para obtener una descripción de las características compatibles y las funciones personalizadas.

Continuando con el ejemplo de registro del proxy web de Squid, en la siguiente instrucción de extracción de datos se incluye una combinación de la sintaxis de Logstash Grok y las expresiones regulares.

La siguiente declaración de extracción almacena valores en las siguientes variables intermedias:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

En esta declaración de ejemplo, también se usa la palabra clave overwrite para almacenar los valores extraídos en cada variable. Si el proceso de extracción muestra un error, la declaración on_error establece not_valid_log en True.

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

Manipular y transformar los valores extraídos

Chronicle aprovecha las capacidades del complemento de filtro mutate de Logstash para permitir la manipulación de los valores extraídos del registro original. Chronicle proporciona un subconjunto de las capacidades disponibles del complemento. Consulta la sintaxis del analizador para obtener una descripción de las características admitidas y las funciones personalizadas, como las siguientes:

  • convertir valores a un tipo de datos diferente
  • reemplazar valores en la cadena
  • combina dos arreglos o agrega una string a uno. Los valores de strings se convierten en un arreglo antes de combinarse.
  • convertir a minúscula o mayúscula

En esta sección, se proporcionan ejemplos de transformación de datos que se basan en el registro del proxy web de Squid que se presentó antes.

Transforma la marca de tiempo del evento

Todos los eventos almacenados como un registro UDM deben tener una marca de tiempo del evento. En este ejemplo, se verifica si se extrajo un valor para los datos del registro. Luego, usa la función de fecha de Grak para hacer coincidir el valor con el formato de hora UNIX.

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

Transforma el valor username

En la siguiente declaración de ejemplo, se convierte el valor de la variable username en minúsculas.

mutate {
   lowercase => [ "username"]
   }

Transforma el valor action

En el siguiente ejemplo, se evalúa el valor de la variable intermedia action y se lo cambia a ALLOW, BLOCK o UNKNOWN_ACTION, que son valores válidos para el campo security_result.action de UDM. El campo security_result.action del UDM es un tipo enumerado que almacena solo valores específicos.

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

Transforma la dirección IP de destino

En el siguiente ejemplo, se comprueba si hay un valor en la variable intermedia tgtip. Si se encuentra, el valor coincide con un patrón de dirección IP mediante un patrón de Grok predefinido. Si hay un error que hace coincidir el valor con un patrón de dirección IP, la función on_error establece la propiedad not_valid_tgtip en True. Si la coincidencia es exitosa, no se establece la propiedad not_valid_tgtip.

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

Cómo cambiar el tipo de datos de returnCode y tamaño

En el siguiente ejemplo, se convierte el valor de la variable size en uinteger y el valor de la variable returnCode en integer. Este es obligatorio porque la variable size se guardará en el campo network.received_bytes de UDM que almacena un tipo de datos int64. La variable returnCode se guardará en el campo network.http.response_code de UDM que almacena un tipo de datos int32.

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

Asigna valores a los campos de UDM en un evento

Una vez que se extraen los valores y se procesan previamente, asígnalos a campos en un registro de eventos de UDM. Puedes asignar valores extraídos y valores estáticos a un campo de UDM.

Si propagas event.disambiguation_key, asegúrate de que este campo sea único para cada evento que se genera para el registro determinado. Si dos eventos diferentes tienen el mismo disambiguation_key, esto generará un comportamiento inesperado en el sistema.

Los ejemplos del analizador en esta sección se basan en el ejemplo de registro del proxy web de Squid anterior.

Guardar la marca de tiempo del evento

Cada registro de evento de UDM debe tener establecido un valor para el campo UDM metadata.event_timestamp. En el siguiente ejemplo, se guarda la marca de tiempo del evento extraída del registro en la variable integrada @timestamp. Chronicle lo guarda en el campo de UDM metadata.event_timestamp de forma predeterminada.

mutate {
  rename => {
    "when" => "timestamp"
  }
}

Configura el tipo de evento

Cada registro de evento de UDM debe tener establecido un valor para el campo metadata.event_type de UDM. Este campo es de tipo enumerado. El valor de este campo determina qué campos de UDM adicionales se deben propagar para que se guarde el registro de UDM. El proceso de análisis y normalización fallará si alguno de los campos obligatorios no contiene datos válidos.

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

Guarda los valores username y method con la sentencia replace.

Los valores de los campos intermedios username y method son strings. En el siguiente ejemplo, se verifica si existe un valor válido y, de ser así, se almacena el valor username en el campo principal.user.userid de UDM y el valor method en el campo network.http.method de UDM.

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

Guarda action en el campo security_result.action de UDM

En la sección anterior, se evaluó el valor de la variable intermedia action y se transformó en uno de los valores estándar para el campo security_result.action de UDM.

Los campos de UDM security_result y action almacenan un array de elementos, lo que significa que debes seguir un enfoque un poco diferente cuando guardas este valor.

Primero, guarda el valor transformado en un campo security_result.action intermedio. El campo security_result es un elemento superior del campo action.

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

Luego, guarda el campo intermedio security_result.action intermedio en el campo security_result de UDM. El campo security_result del UDM almacena un array de elementos, por lo que el valor se agrega a este campo.

# save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

Almacena la dirección IP de destino y la dirección IP de origen con la declaración merge

Almacena los siguientes valores en el registro de eventos de la UDM:

  • Valor en la variable intermedia srcip para el campo principal.ip de UDM.
  • Valor en la variable intermedia tgtip para el campo target.ip de UDM.

Los campos de UDM principal.ip y target.ip almacenan un array de elementos, por lo que se agregan valores a cada campo.

En los siguientes ejemplos, se muestran diferentes enfoques para guardar estos valores. Durante el paso de transformación, la variable intermedia tgtip se hizo coincidir con una dirección IP mediante un patrón de Grok predefinido. La siguiente declaración de ejemplo verifica si la propiedad not_valid_tgtip es verdadera, lo que indica que el valor tgtip no pudo coincidir con un patrón de dirección IP. Si es falso, guarda el valor tgtip en el campo target.ip del UDM.

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

No se transformó la variable intermedia srcip. Con la siguiente instrucción, se verifica si se extrajo un valor del registro original y, de ser así, se guarda en el campo principal.ip del UDM.

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

Guarda url, returnCode y size con la sentencia rename

La siguiente instrucción de ejemplo almacena los siguientes valores con la instrucción rename.

  • La variable url se guardó en el campo target.url de UDM.
  • La variable intermedia returnCode se guardó en el campo network.http.response_code de UDM.
  • La variable intermedia size se guardó en el campo network.received_bytes de UDM.
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

Vincula el registro UDM con el resultado

La declaración final en la instrucción de asignación de datos envía los datos procesados a un registro de eventos de UDM.

mutate {
    merge => {
      "@output" => "event"
    }
  }

El código del analizador completo

Este es el ejemplo completo de código del analizador. El orden de las instrucciones no sigue el mismo orden que las secciones anteriores de este documento, pero da como resultado el mismo resultado.

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter