Présentation de l'analyse des journaux
Ce document explique comment Google Security Operations analyse les journaux bruts au format UDM (Unified Data Model).
Google Security Operations peut recevoir des données de journaux provenant des sources d'ingestion suivantes:
- redirecteur Google Security Operations
- Flux de l'API Google Security Operations
- API Google Security Operations Ingestion
- Partenaire technologique tiers
En général, les clients envoient leurs données sous forme de journaux bruts d'origine. Google Security Operations identifie de manière unique l'appareil qui a généré les journaux à l'aide du LogType. Le LogType identifie à la fois:
- le fournisseur et l'appareil qui ont généré le journal, par exemple Cisco Firewall, Linux DHCP Server ou Bro DNS.
- qui convertit le journal brut en un modèle de données unifié (UDM) structuré. Il existe une relation de type un à un entre un analyseur et un LogType. Chaque analyseur convertit les données reçues par un seul LogType.
Google Security Operations fournit un ensemble d'analyseurs par défaut qui lisent les journaux bruts d'origine et génèrent des enregistrements UDM structurés à l'aide des données du journal brut d'origine. Google Security Operations gère ces analyseurs. Les clients peuvent également définir des instructions personnalisées de mappage des données en créant un analyseur spécifique. Contactez votre représentant Google Security Operations pour en savoir plus sur la création d'un analyseur spécifique au client.
L'analyseur contient des instructions sur le mappage des données. Il définit la manière dont les données du journal brut d'origine sont mappées avec un ou plusieurs champs de la structure de données UDM.
En l'absence d'erreurs d'analyse, Google Security Operations crée un enregistrement structuré en UDM à l'aide des données du journal brut. Le processus de conversion d'un journal brut en enregistrement UDM est appelé normalisation.
Un analyseur par défaut peut mapper un sous-ensemble de valeurs principales du journal brut. En règle générale, ces champs principaux sont les plus importants pour fournir des insights sur la sécurité dans Google Security Operations. Les valeurs non mappées restent dans le journal brut, mais ne sont pas stockées dans l'enregistrement UDM.
Un client peut également utiliser l'API d'ingestion pour envoyer des données au format de modèle de données unifié (UDM, Unified Data Model) structuré.
Personnaliser la façon dont les données ingérées sont analysées
Google Security Operations offre les fonctionnalités suivantes pour permettre aux clients de personnaliser l'analyse des données de journaux d'origine entrantes.
- Analyseurs spécifiques au client: les clients créent une configuration d'analyseur personnalisée pour un type de journal spécifique qui répond à leurs exigences spécifiques. Un analyseur spécifique au client remplace l'analyseur par défaut pour le LogType spécifique. Contactez votre représentant Google Security Operations pour en savoir plus sur la création d'un analyseur spécifique au client.
- Extensions d'analyseur: les clients peuvent ajouter des instructions de mappage personnalisées en plus de la configuration d'analyseur par défaut. Chaque client peut créer son propre ensemble unique d'instructions de mappage personnalisées. Ces instructions de mappage définissent comment extraire et transformer des champs supplémentaires des journaux bruts d'origine en champs UDM. Une extension d'analyseur ne remplace pas l'analyseur par défaut ou spécifique au client.
Exemple d'utilisation d'un journal de proxy Web Squid
Cette section fournit un exemple de journal de proxy Web Squid et décrit comment les valeurs sont mappées à un enregistrement UDM. Pour obtenir une description de tous les champs du schéma UDM, consultez la page Liste des champs du modèle de données unifié.
L'exemple de journal de proxy Web Squid contient des valeurs séparées par des espaces. Chaque enregistrement représente un événement et stocke les données suivantes: code temporel, durée, client, code du résultat/état du résultat, octets transmis, méthode de requête, URL, utilisateur, code hiérarchique et type de contenu. Dans cet exemple, les champs suivants sont extraits et mappés dans un enregistrement UDM: heure, client, état du résultat, octets, méthode de requête et URL.
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
En comparant ces structures, vous remarquerez que seul un sous-ensemble des données de journal d'origine est inclus dans l'enregistrement UDM. Certains champs sont obligatoires et d'autres facultatifs. De plus, seul un sous-ensemble des sections de l'enregistrement UDM contient des données. Si l'analyseur ne mappe pas les données du journal d'origine avec l'enregistrement UDM, cette section de l'enregistrement UDM ne s'affiche pas dans Google Security Operations.
La section metadata
stocke l'horodatage de l'événement. Notez que la valeur a été convertie
du format EPOCH au format RFC 3339. Cette conversion est facultative. L'horodatage peut être stocké au format EPOCH, avec un prétraitement pour séparer les secondes et les millisecondes dans des champs distincts.
Le champ metadata.event_type
stocke la valeur NETWORK_HTTP
, qui est une valeur énumérée identifiant le type d'événement. La valeur de metadata.event_type
détermine les champs UDM supplémentaires qui sont obligatoires ou facultatifs. Les valeurs product_name
et vendor_name
contiennent des descriptions conviviales de l'appareil qui a enregistré le journal d'origine.
Le metadata.event_type
d'un enregistrement d'événement UDM est différent du log_type défini lors de l'ingestion de données à l'aide de l'API d'ingestion. Ces deux attributs stockent
des informations différentes.
La section network
contient les valeurs de l'événement de journal d'origine. Notez dans cet exemple que la valeur d'état du journal d'origine a été analysée à partir du champ "code/état du résultat" avant d'être écrite dans l'enregistrement UDM. Seul le paramètre "result_code" a été inclus dans l'enregistrement UDM.
La section principal
stocke les informations sur le client du journal d'origine. La section target
stocke à la fois l'URL complète et l'adresse IP.
La section security_result
stocke l'une des valeurs d'énumération pour représenter l'action enregistrée dans le journal d'origine.
Il s'agit de l'enregistrement UDM au format JSON. Notez que seules les sections contenant des données sont incluses. Les sections src
, observer
, intermediary
, about
et extensions
ne sont pas incluses.
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
Étapes dans les instructions de l'analyseur
Les instructions de mappage des données dans un analyseur suivent un modèle commun, comme suit:
- Analyser et extraire les données du journal d'origine
- Manipuler les données extraites Cela inclut l'utilisation d'une logique conditionnelle pour analyser les valeurs, convertir des types de données, remplacer les sous-chaînes d'une valeur, les convertir en majuscules ou en minuscules, etc.
- Attribuez des valeurs aux champs UDM.
- Générez l'enregistrement UDM mappé sur la clé de sortie @output.
Analyser et extraire les données du journal d'origine
Définir l'instruction de filtre
L'instruction filter
est la première de l'ensemble d'instructions d'analyse.
Toutes les instructions d'analyse supplémentaires sont contenues dans l'instruction filter
.
filter {
}
Initialiser les variables qui stockeront les valeurs extraites
Dans l'instruction filter
, initialisez les variables intermédiaires que l'analyseur utilisera pour stocker les valeurs extraites du journal.
Ces variables sont utilisées chaque fois qu'un journal individuel est analysé. La valeur de chaque variable intermédiaire sera définie sur un ou plusieurs champs UDM plus tard dans les instructions d'analyse.
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
Extraire des valeurs individuelles du journal
Google Security Operations fournit un ensemble de filtres basés sur Logstash, qui permettent d'extraire les champs des fichiers journaux d'origine. Selon le format du journal, vous utilisez un ou plusieurs filtres d'extraction pour extraire toutes les données du journal. Si la chaîne est:
- JSON natif, la syntaxe de l'analyseur est semblable au filtre JSON qui accepte les journaux au format JSON. Les données JSON imbriquées ne sont pas compatibles.
- XML, la syntaxe de l'analyseur est semblable à celle du filtre XML, qui accepte les journaux au format XML.
- paires clé/valeur, la syntaxe de l'analyseur est semblable au filtre Kv qui accepte les messages au format clé-valeur.
- CSV, la syntaxe de l'analyseur est semblable au filtre CSV, qui accepte les messages au format CSV.
- Pour tous les autres formats, la syntaxe de l'analyseur est semblable à celle du filtre GROK, avec des motifs intégrés GROK . Cette méthode utilise des instructions d'extraction de type Regex.
Google Security Operations fournit un sous-ensemble des fonctionnalités disponibles pour chaque filtre. Google Security Operations fournit également une syntaxe de mappage de données personnalisée qui n'est pas disponible dans les filtres. Consultez la documentation de référence sur la syntaxe de l'analyseur pour obtenir une description des fonctionnalités compatibles et des fonctions personnalisées.
En reprenant l'exemple de journal du proxy Web Squid, les instructions d'extraction de données suivantes combinent la syntaxe Logstash Grok et des expressions régulières.
L'instruction d'extraction suivante stocke les valeurs dans les variables intermédiaires suivantes:
when
srcip
action
returnCode
size
method
username
url
tgtip
Cet exemple d'instruction utilise également le mot clé overwrite
pour stocker les valeurs extraites dans chaque variable. Si le processus d'extraction renvoie une erreur, l'instruction on_error
définit not_valid_log
sur True
.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
Manipuler et transformer les valeurs extraites
Google Security Operations exploite les fonctionnalités du plug-in de filtre mutate de Logstash pour permettre la manipulation des valeurs extraites du journal d'origine. Google Security Operations fournit un sous-ensemble des fonctionnalités disponibles dans le plug-in. Consultez la page Syntaxe de l'analyseur pour obtenir une description des fonctionnalités compatibles et des fonctions personnalisées, telles que:
- convertir des valeurs en un autre type de données
- remplacer les valeurs de la chaîne
- fusionner deux tableaux ou ajouter une chaîne à un tableau. Les valeurs de chaîne sont converties en tableau avant la fusion.
- convertir en minuscules ou en majuscules
Cette section fournit des exemples de transformation de données qui s'appuient sur le journal du proxy Web Squid présenté précédemment.
Transformer l'horodatage de l'événement
Tous les événements stockés en tant qu'enregistrement UDM doivent comporter un code temporel d'événement. Cet exemple vérifie si une valeur correspondant aux données a été extraite du journal. Il utilise ensuite la fonction de date Grok pour faire correspondre la valeur au format d'heure UNIX
.
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
Transformer la valeur username
L'exemple d'instruction suivant convertit la valeur de la variable username
en minuscules.
mutate {
lowercase => [ "username"]
}
Transformer la valeur action
L'exemple suivant évalue la valeur de la variable intermédiaire action
et la remplace par ALLOW, BLOCK ou UNKNOWN_ACTION, qui sont des valeurs valides pour le champ UDM security_result.action
. Le champ UDM security_result.action
est un type énuméré qui ne stocke que des valeurs spécifiques.
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
Transformer l'adresse IP cible
L'exemple suivant recherche une valeur dans la variable intermédiaire tgtip
.
Si elle est trouvée, la valeur est mise en correspondance avec un modèle d'adresse IP utilisant un modèle Grok prédéfini. Si une erreur se produit lors de la mise en correspondance de la valeur avec un modèle d'adresse IP, la fonction on_error
définit la propriété not_valid_tgtip
sur True
. Si la correspondance est établie, la propriété not_valid_tgtip
n'est pas définie.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
Modifier le type de données de returnCode et de taille
L'exemple suivant convertit la valeur de la variable size
en uinteger
et la valeur de la variable returnCode
en integer
. Cela est nécessaire, car la variable size
sera enregistrée dans le champ UDM network.received_bytes
qui stocke un type de données int64
. La variable returnCode
sera enregistrée dans le champ UDM network.http.response_code
qui stocke un type de données int32
.
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
Attribuer des valeurs aux champs UDM dans un événement
Une fois les valeurs extraites et prétraitées, attribuez-les aux champs d'un enregistrement d'événement UDM. Vous pouvez attribuer à la fois des valeurs extraites et des valeurs statiques à un champ UDM.
Si vous renseignez event.disambiguation_key
, assurez-vous que ce champ est propre à chaque événement généré pour le journal donné. Si deux événements différents ont le même disambiguation_key
, cela entraîne un comportement inattendu dans le système.
Les exemples d'analyseur de cette section s'appuient sur l'exemple de journal proxy Web Squid ci-dessus.
Enregistrer l'horodatage de l'événement
Chaque enregistrement d'événement UDM doit comporter une valeur pour le champ UDM metadata.event_timestamp
. L'exemple suivant enregistre l'horodatage de l'événement extrait du journal dans la variable de compilation @timestamp
. Par défaut, Google Security Operations l'enregistre dans le champ UDM metadata.event_timestamp
.
mutate {
rename => {
"when" => "timestamp"
}
}
Définir le type d'événement
Chaque enregistrement d'événement UDM doit avoir une valeur définie pour le champ UDM metadata.event_type
. Ce champ est un type énuméré. La valeur de ce champ détermine les champs UDM supplémentaires à renseigner pour que l'enregistrement UDM soit enregistré.
Le processus d'analyse et de normalisation échouera si l'un des champs obligatoires ne contient pas de données valides.
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
Enregistrez les valeurs username
et method
à l'aide de l'instruction replace
.
Les valeurs des champs intermédiaires username
et method
sont des chaînes. L'exemple suivant vérifie si une valeur valide existe et, le cas échéant, stocke la valeur username
dans le champ UDM principal.user.userid
et la valeur method
dans le champ UDM network.http.method
.
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
Enregistrez le action
dans le champ UDM security_result.action
.
Dans la section précédente, la valeur de la variable intermédiaire action
a été évaluée et transformée en l'une des valeurs standards du champ UDM security_result.action
.
Les champs UDM security_result
et action
stockent tous deux un tableau d'éléments, ce qui signifie que vous devez suivre une approche légèrement différente lors de l'enregistrement de cette valeur.
Enregistrez d'abord la valeur transformée dans un champ security_result.action
intermédiaire. Le champ security_result
est un parent du champ action
.
mutate {
merge => {
"security_result.action" => "action"
}
}
Enregistrez ensuite le champ intermédiaire security_result.action
intermédiaire dans le champ UDM security_result
. Le champ UDM security_result
stocke un tableau d'éléments. La valeur est donc ajoutée à ce champ.
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
Stockez les adresses IP cibles et sources à l'aide de l'instruction merge
.
Stockez les valeurs suivantes dans l'enregistrement d'événement UDM:
- Valeur de la variable intermédiaire
srcip
dans le champ UDMprincipal.ip
. - Valeur de la variable intermédiaire
tgtip
dans le champ UDMtarget.ip
.
Les champs UDM principal.ip
et target.ip
stockent tous deux un tableau d'éléments. Des valeurs sont donc ajoutées à chaque champ.
Les exemples ci-dessous illustrent différentes approches pour enregistrer ces valeurs.
Au cours de l'étape de transformation, la variable intermédiaire tgtip
a été mise en correspondance avec une adresse IP à l'aide d'un modèle Grok prédéfini. L'exemple d'instruction suivant vérifie si la propriété not_valid_tgtip
est vraie, ce qui indique que la valeur tgtip
n'a pas pu être mise en correspondance avec un modèle d'adresse IP. Si la valeur est "false", la valeur tgtip
est enregistrée dans le champ UDM target.ip
.
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
La variable intermédiaire srcip
n'a pas été transformée. L'instruction suivante vérifie si une valeur a été extraite du journal d'origine et, le cas échéant, enregistre la valeur dans le champ UDM principal.ip
.
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
Enregistrez url
, returnCode
et size
à l'aide de l'instruction rename
.
L'exemple d'instruction ci-dessous stocke les valeurs suivantes à l'aide de l'instruction rename
.
- La variable
url
a été enregistrée dans le champ UDMtarget.url
. - La variable intermédiaire
returnCode
enregistrée dans le champ UDMnetwork.http.response_code
. - La variable intermédiaire
size
enregistrée dans le champ UDMnetwork.received_bytes
.
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
Lier l'enregistrement UDM à la sortie
L'instruction finale de l'instruction de mappage de données génère les données traitées dans un enregistrement d'événement UDM.
mutate {
merge => {
"@output" => "event"
}
}
Le code complet de l'analyseur
Voici l'exemple de code d'analyseur complet. L'ordre des instructions ne suit pas le même ordre que les sections précédentes de ce document, mais produit le même résultat.
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter