Exemples d'extensions d'analyseur
Ce document fournit des exemples de création d'extensions d'analyseur dans différents scénarios. Pour en savoir plus sur les extensions d'analyseur, consultez Créer des extensions d'analyseur.
Exemples d'extensions d'analyseur
Utilisez les tableaux d'attributs suivants pour trouver rapidement l'exemple de code dont vous avez besoin.
Exemples sans code
Format de la source de journaux | Exemple de titre | Description | Concepts d'analyseur dans cet exemple |
---|---|---|---|
JSON (type de journal: GCP_IDS) |
Champs d'extraction | Extrayez des champs d'un journal au format JSON. | Pas de code |
JSON (type de journal: WORKSPACE_ALERTS) |
Extraire des champs avec la valeur de précondition | Extrayez des champs d'un journal au format JSON et normalisez-les dans un champ UDM répété, avec une précondition. |
|
Exemples d'extraits de code
Format de la source de journaux | Exemple de titre | Description | Concepts d'analyseur dans cet exemple |
---|---|---|---|
JSON (type de journal: GCP_IDS) |
Ajouter un user-agent HTTP |
|
|
CSV (type de journal: MISP_IOC) |
Extraction de champs arbitraires dans l'objet UDM additional |
Extrait des champs dans UDM > Entité > Objet UDM additional > paire clé-valeur |
Objet UDM additional |
Syslog (type de journal: POWERSHELL) |
Extraire la priorité et la gravité de Syslog | Extrayez les valeurs de l'établissement Syslog et de la gravité dans les champs "Priorité du résultat de sécurité UDM" et "Sévérité". | Basé sur Grok |
JSON avec un en-tête Syslog (type de journal: WINDOWS_SYSMON) |
Décoration basée sur une instruction conditionnelle |
|
|
JSON avec un en-tête Syslog (type de journal: WINDOWS_SYSMON) |
Convertir les types de données |
|
|
JSON avec un en-tête Syslog (type de journal: WINDOWS_SYSMON) |
Noms de variables temporaires pour une meilleure lisibilité | Vous pouvez utiliser des noms de variables temporaires dans des extraits de code, puis les renommer pour qu'ils correspondent au nom de l'objet d'événement UDM de sortie finale. Cela peut améliorer la lisibilité globale. |
|
JSON avec un en-tête Syslog (type de journal: WINDOWS_SYSMON) |
Champs répétés | Utilisez les champs répétés dans les extraits de code avec précaution, par exemple le champ security_result. |
|
XML (type de journal: WINDOWS_DEFENDER_AV) |
Extraction de champ arbitraire dans l'objet additional |
|
L'objet additional permet de stocker les informations sous forme de paire clé-valeur personnalisée. |
XML (type de journal: WINDOWS_DEFENDER_AV) |
Extraction de champ arbitraire dans le nom d'hôte principal |
overwrite Grok
|
|
Exemples de fichiers JSON
Les exemples suivants montrent comment créer une extension d'analyseur lorsque la source de journal est au format JSON.
Sans code : extraire des champs
Exemples d'attributs:
- Format de la source de journal: JSON
- Approche de mappage des données: sans code
- Type de journal: GCP_IDS
- Objet de l'extension d'analyseur: Extraire des champs.
Description :
Plusieurs champs liés au réseau ne sont pas extraits. Comme cet exemple de journal est un journal structuré au format JSON, nous pouvons utiliser l'approche sans code (Map data fields) pour créer l'extension de l'analyseur.
Les champs d'origine que nous souhaitons extraire sont les suivants:
total_packets
(chaîne)elapsed_time
(chaîne)total_bytes
(chaîne)
Voici un exemple d'entrée de journal brute:
{ "insertId": "625a41542d64c124e7db097ae0906ccb-1@a3", "jsonPayload": { "destination_port": "80", "application": "incomplete", "ip_protocol": "tcp", "network": "projects/prj-p-shared-base/global/networks/shared-vpc-production", "start_time": "2024-10-29T21:14:59Z", "source_port": "41936", "source_ip_address": "35.191.200.157", "total_packets": "6", "elapsed_time": "0", "destination_ip_address": "192.168.0.11", "total_bytes": "412", "repeat_count": "1", "session_id": "1289742" }, "resource": { "type": "ids.googleapis.com/Endpoint", "labels": { "resource_container": "projects/12345678910", "location": "europe-west4-a", "id": "p-europe-west4" } }, "timestamp": "2024-10-29T21:15:21Z", "logName": "projects/prj-p-shared-base/logs/ids.googleapis.com%2Ftraffic", "receiveTimestamp": "2024-10-29T21:15:24.051990717Z" }
L'exemple utilise l'approche sans code pour créer une extension d'analyseur à l'aide du mappage de champs de données suivant:
Chemin de précondition Opérateur de précondition Valeur de précondition Chemin d'accès aux données brutes Champ de destination* jsonPayload.total_bytes
NOT_EQUALS "" jsonPayload.total_bytes
udm.principal.network.received_bytes
jsonPayload.elapsed_time
NOT_EQUALS "" jsonPayload.elapsed_time
udm.principal.network.session_duration.seconds
jsonPayload.total_packets
NOT_EQUALS "" jsonPayload.total_packets
udm.principal.network.received_packets
L'exécution de l'extension d'analyseur ajoute les trois champs extraits à l'objet
principal.network
.metadata.product_log_id = "625a41542d64c124e7db097ae0906ccb-1@a3" metadata.event_timestamp = "2024-10-29T21:14:59Z" metadata.event_type = "NETWORK_CONNECTION" metadata.vendor_name = "Google Cloud" metadata.product_name = "IDS" metadata.ingestion_labels[0].key = "label" metadata.ingestion_labels[0].value = "GCP_IDS" metadata.log_type = "GCP_IDS" principal.ip[0] = "35.191.200.157" principal.port = 41936 principal.network.received_bytes = 412 principal.network.session_duration.seconds = "0s" principal.network.received_packets = 6 target.ip[0] = "192.168.0.11" target.port = 80 target.application = "incomplete" observer.location.country_or_region = "EUROPE" observer.location.name = "europe-west4-a" observer.resource.name = "projects/12345678910" observer.resource.resource_type = "CLOUD_PROJECT" observer.resource.attribute.cloud.environment = "GOOGLE_CLOUD_PLATFORM" observer.resource.product_object_id = "p-europe-west4" network.ip_protocol = "TCP" network.session_id = "1289742"
Sans code : extraire des champs avec la valeur de précondition
Exemples d'attributs:
- Format de la source de journal: JSON
- Approche de mappage des données: sans code
- Type de journal: WORKSPACE_ALERTS
- Objet de l'extension de l'analyseur: Extraire les champs avec la valeur de précondition.
Description :
L'analyseur d'origine n'extrait pas le
email address
de l'utilisateur principal concerné par une alerte de protection contre la perte de données (DLP).Cet exemple utilise une extension d'analyseur sans code pour extraire le
email address
et le normaliser dans un champ UDM répété, avec une condition préalable.Lorsque vous travaillez avec des champs répétés dans une extension d'analyseur sans code, vous devez indiquer si vous souhaitez:
- replace (remplace toutes les valeurs des champs répétés dans l'objet UDM existant) ou
- append (ajoute les valeurs extraites aux champs répétés).
Pour en savoir plus, consultez la section Champs répétés.
Cet exemple remplace toutes les adresses e-mail existantes dans le champ
principal.user.email_address
normalisé.Les conditions préalables vous permettent d'effectuer des vérifications conditionnelles avant d'effectuer une opération d'extraction. Dans la plupart des cas, le champ de précondition est le même que le champ de données brutes que vous souhaitez extraire, avec un opérateur de précondition de
not Null
, par exemple,foo != ""
.Toutefois, comme dans notre exemple, la valeur du champ de données brutes que vous souhaitez extraire n'est pas toujours présente dans toutes les entrées de journal. Dans ce cas, vous pouvez utiliser un autre champ de précondition pour filtrer l'opération d'extraction. Dans notre exemple, le champ
triggeringUserEmail
brut que vous souhaitez extraire n'est présent que dans les journaux pour lesquelstype = Data Loss Prevention
.Voici des exemples de valeurs à saisir dans les champs d'extension de l'analyseur sans code:
Chemin de précondition Opérateur de précondition Valeur de précondition Chemin d'accès aux données brutes Champ de destination* type
ÉGAL À Protection contre la perte de données data.ruleViolationInfo.triggeringUserEmail
udm.principal.user.email_addresses
L'exemple suivant montre les champs d'extension de l'analyseur sans code renseignés avec des exemples de valeurs:
L'exécution de l'extension d'analyseur ajoute correctement
email_address
à l'objetprincipal.user
.metadata.product_log_id = "Ug71LGqBr6Q=" metadata.event_timestamp = "2022-12-18T12:17:35.154368Z" metadata.event_type = "USER_UNCATEGORIZED" metadata.vendor_name = "Google Workspace" metadata.product_name = "Google Workspace Alerts" metadata.product_event_type = "DlpRuleViolation" metadata.log_type = "WORKSPACE_ALERTS" additional.fields["resource_title"] = "bq-results-20221215-112933-1671103787123.csv" principal.user.email_addresses[0] = "foo.bar@altostrat.com" target.resource.name = "DRIVE" target.resource.resource_type = "STORAGE_OBJECT" target.resource.product_object_id = "1wLteoF3VHljS_8_ABCD_VVbhFTfcTQplJ5k1k7cL4r8" target.labels[0].key = "resource_title" target.labels[0].value = "bq-results-20221321-112933-1671103787697.csv" about[0].resource.resource_type = "CLOUD_ORGANIZATION" about[0].resource.product_object_id = "C01abcde2" security_result[0].about.object_reference.id = "ODU2NjEwZTItMWE2YS0xMjM0LWJjYzAtZTJlMWU2YWQzNzE3" security_result[0].category_details[0] = "Data Loss Prevention" security_result[0].rule_name = "Sensitive Projects Match" security_result[0].summary = "Data Loss Prevention" security_result[0].action[0] = "ALLOW" security_result[0].severity = "MEDIUM" security_result[0].rule_id = "rules/00abcdxs183abcd" security_result[0].action_details = "ALERT, DRIVE_WARN_ON_EXTERNAL_SHARING" security_result[0].alert_state = "ALERTING" security_result[0].detection_fields[0].key = "start_time" security_result[0].detection_fields[0].value = "2022-12-18T12:17:35.154368Z" security_result[0].detection_fields[1].key = "status" security_result[0].detection_fields[1].value = "NOT_STARTED" security_result[0].detection_fields[2].key = "trigger" security_result[0].detection_fields[2].value = "DRIVE_SHARE" security_result[0].rule_labels[0].key = "detector_name" security_result[0].rule_labels[0].value = "EMAIL_ADDRESS" network.email.to[0] = "foo.bar@altostrat.com"
Extrait de code : ajouter un user-agent HTTP
Exemples d'attributs:
- Format de la source de journal: JSON
- Approche de mappage des données: extrait de code
- Type de journal: GCP_IDS
- Objet de l'extension d'analyseur: Ajout d'un user-agent HTTP.
Description :
Il s'agit d'un exemple de type d'objet UDM non standard qui n'est pas compatible avec l'approche sans code et qui nécessite donc d'utiliser un extrait de code. L'analyseur par défaut n'extrait pas l'analyse
Network HTTP Parser User Agent
. De plus, pour plus de cohérence:- Un élément
Target Hostname
sera créé à partir derequestUrl
. - Un
Namespace
est attribué pour s'assurer que l'aliasing et l'enrichissement basés sur les composants sont effectués.
# GCP_LOADBALANCING # owner: @owner # updated: 2022-12-23 # Custom parser extension that: # 1) adds consistent Namespace # 2) adds Parsed User Agent Object filter { # Initialize placeholder mutate { replace => { "httpRequest.userAgent" => "" "httpRequest.requestUrl" => "" } } json { on_error => "not_json" source => "message" array_function => "split_columns" } if ![not_json] { #1 - Override Namespaces mutate { replace => { "event1.idm.read_only_udm.principal.namespace" => "TMO" } } mutate { replace => { "event1.idm.read_only_udm.target.namespace" => "TMO" } } mutate { replace => { "event1.idm.read_only_udm.src.namespace" => "TMO" } } #2 - Parsed User Agent if [httpRequest][requestUrl]!= "" { grok { match => { "httpRequest.requestUrl" => ["\/\/(?P<_hostname>.*?)\/"] } on_error => "_grok_hostname_failed" } if ![_grok_hostname_failed] { mutate { replace => { "event1.idm.read_only_udm.target.hostname" => "%{_hostname}" } } } } if [httpRequest][userAgent] != "" { mutate { convert => { "httpRequest.userAgent" => "parseduseragent" } } #Map the converted "user_agent" to the new UDM field "http.parsed_user_agent". mutate { rename => { "httpRequest.userAgent" => "event1.idm.read_only_udm.network.http.parsed_user_agent" } } } mutate { merge => { "@output" => "event1" } } } }
- Un élément
Exemple de fichier CSV
L'exemple suivant montre comment créer une extension d'analyseur lorsque la source de journaux est au format CSV.
Extrait de code : extraction de champs arbitraires dans l'objet additional
Exemples d'attributs:
- Format de la source de journaux: CSV
- Approche de mappage des données: extrait de code
- Type de journal: MISP_IOC
- Objet de l'extension d'analyseur: extraction de champs arbitraires dans l'objet
additional
. Description :
Dans cet exemple, l'intégration du contexte d'entité UDM MISP_IOC est utilisée. L'objet UDM de paire clé-valeur
additional
permet de capturer les informations contextuelles non extraites par l'analyseur par défaut et d'ajouter des champs spécifiques à chaque organisation. Par exemple, une URL redirigeant vers l'instance MISP spécifique.Voici la source de journaux au format CSV pour cet exemple:
1
9d66d38a-14e1-407f-a4d1-90b82aa1d59f
2
3908
3
Network activity
4
ip-dst
5
117.253.154.123
6
7
8
1687894564
9
10
11
12
13
14
DigitalSide Malware report\: MD5\: 59ce0baba11893f90527fc951ac69912
15
ORGNAME
16
DIGITALSIDE.IT
17
0
18
Medium
19
0
20
2023-06-23
21
tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch
22
1698036218
# MISP_IOC # owner: @owner # updated: 2024-06-21 # Custom parser extension that: # 1) adds a link back to internal MISP tenant # 2) extracts missing fields into UDM > Entity > Additional fields filter { # Set the base URL for MISP. Remember to replace this placeholder! mutate { replace => { "misp_base_url" => "https://<YOUR_MISP_URL>" } } # Parse the CSV data from the 'message' field. Uses a comma as the separator. # The 'on_error' option handles lines that are not properly formatted CSV. csv { source => "message" separator => "," on_error => "broken_csv" } # If the CSV parsing was successful... if ![broken_csv] { # Rename the CSV columns to more descriptive names. mutate { rename => { "column2" => "event_id" "column8" => "object_timestamp" "column16" => "event_source_org" "column17" => "event_distribution" "column19" => "event_analysis" "column22" => "attribute_timestamp" } } } # Add a link to view the event in MISP, if an event ID is available. # "column2" => "event_id" if [event_id] != "" { mutate { replace => { "additional_url.key" => "view_in_misp" "additional_url.value.string_value" => "%{misp_base_url}/events/view/%{event_id}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_url" } } } # Add the object timestamp as an additional field, if available. # "column8" => "object_timestamp" if [object_timestamp] != "" { mutate { replace => { "additional_object_timestamp.key" => "object_timestamp" "additional_object_timestamp.value.string_value" => "%{object_timestamp}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_object_timestamp" } } } # Add the event source organization as an additional field, if available. # "column16" => "event_source_org" if [event_source_org] != "" { mutate { replace => { "additional_event_source_org.key" => "event_source_org" "additional_event_source_org.value.string_value" => "%{event_source_org}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_event_source_org" } } } # Add the event distribution level as an additional field, if available. # Maps numerical values to descriptive strings. # "column17" => "event_distribution" if [event_distribution] != "" { if [event_distribution] == "0" { mutate { replace => { "additional_event_distribution.value.string_value" => "YOUR_ORGANIZATION_ONLY" } } } else if [event_distribution] == "1" { mutate { replace => { "additional_event_distribution.value.string_value" => "THIS_COMMUNITY_ONLY" } } } else if [event_distribution] == "2" { mutate { replace => { "additional_event_distribution.value.string_value" => "CONNECTED_COMMUNITIES" } } } else if [event_distribution] == "3" { mutate { replace => { "additional_event_distribution.value.string_value" => "ALL_COMMUNITIES" } } } else if [event_distribution] == "4" { mutate { replace => { "additional_event_distribution.value.string_value" => "SHARING_GROUP" } } } else if [event_distribution] == "5" { mutate { replace => { "additional_event_distribution.value.string_value" => "INHERIT_EVENT" } } } mutate { replace => { "additional_event_distribution.key" => "event_distribution" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_event_distribution" } } } # Add the event analysis level as an additional field, if available. # Maps numerical values to descriptive strings. # "column19" => "event_analysis" if [event_analysis] != "" { if [event_analysis] == "0" { mutate { replace => { "additional_event_analysis.value.string_value" => "INITIAL" } } } else if [event_analysis] == "1" { mutate { replace => { "additional_event_analysis.value.string_value" => "ONGOING" } } } else if [event_analysis] == "2" { mutate { replace => { "additional_event_analysis.value.string_value" => "COMPLETE" } } } mutate { replace => { "additional_event_analysis.key" => "event_analysis" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_event_analysis" } } } # Add the attribute timestamp as an additional field, if available. # "column22" => "attribute_timestamp" if [attribute_timestamp] != "" { mutate { replace => { "additional_attribute_timestamp.key" => "attribute_timestamp" "additional_attribute_timestamp.value.string_value" => "%{attribute_timestamp}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_attribute_timestamp" } } } # Finally, merge the 'event' data into the '@output' field. mutate { merge => { "@output" => "event" } } }
L'exécution de l'extension d'analyseur ajoute les champs personnalisés du fichier CSV à l'objet
additional
.metadata.product_entity_id = "9d66d38a-14e1-407f-a4d1-90b82aa1d59f" metadata.collected_timestamp = "2024-10-31T15:16:08Z" metadata.vendor_name = "MISP" metadata.product_name = "MISP" metadata.entity_type = "IP_ADDRESS" metadata.description = "ip-dst" metadata.interval.start_time = "2023-06-27T19:36:04Z" metadata.interval.end_time = "9999-12-31T23:59:59Z" metadata.threat[0].category_details[0] = "Network activity" metadata.threat[0].description = "tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch - additional info: DigitalSide Malware report: MD5: 59ce0baba11893f90527fc951ac69912" metadata.threat[0].severity_details = "Medium" metadata.threat[0].threat_feed_name = "DIGITALSIDE.IT" entity.ip[0] = "117.253.154.123" additional.fields["view_in_misp"] = "https://
/events/view/3908" additional.fields["object_timestamp"] = "1687894564" additional.fields["event_source_org"] = "DIGITALSIDE.IT" additional.fields["event_distribution"] = "YOUR_ORGANIZATION_ONLY" additional.fields["event_analysis"] = "INITIAL" additional.fields["attribute_timestamp"] = "1698036218"
Exemples de Grok
Les exemples suivants montrent comment créer des extensions d'analyseur basées sur Grok.
Extrait de code (et Grok) : extraction de la priorité et de la gravité
Exemples d'attributs:
- Format de la source de journaux: Syslog
- Approche de mappage des données: extrait de code à l'aide de Grok
- Type de journal: POWERSHELL
- Objet de l'extension d'analyseur: Extraction de la priorité et de la gravité.
Description :
Dans cet exemple, une extension d'analyseur basée sur Grok est créée pour extraire les valeurs Syslog Facility et Severity dans les champs
Priority
etSeverity
du résultat de sécurité UDM.filter { # Use grok to parse syslog messages. The on_error clause handles messages that don't match the pattern. grok { match => { "message" => [ # Extract message with syslog headers. "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}" ] } on_error => "not_supported_format" } # If the grok parsing failed, tag the event as unsupported and drop it. if ![not_supported_format] { if [_syslog_priority] != "" { if [_syslog_priority] =~ /0|8|16|24|32|40|48|56|64|72|80|88|96|104|112|120|128|136|144|152|160|168|176|184/ { mutate { replace => { "_security_result.severity_details" => "EMERGENCY" } } } if [_syslog_priority] =~ /1|9|17|25|33|41|49|57|65|73|81|89|97|105|113|121|129|137|145|153|161|169|177|185/ { mutate { replace => { "_security_result.severity_details" => "ALERT" } } } if [_syslog_priority] =~ /2|10|18|26|34|42|50|58|66|74|82|90|98|106|114|122|130|138|146|154|162|170|178|186/ { mutate { replace => { "_security_result.severity_details" => "CRITICAL" } } } if [_syslog_priority] =~ /3|11|19|27|35|43|51|59|67|75|83|91|99|107|115|123|131|139|147|155|163|171|179|187/ { mutate { replace => { "_security_result.severity_details" => "ERROR" } } } if [_syslog_priority] =~ /4|12|20|28|36|44|52|60|68|76|84|92|100|108|116|124|132|140|148|156|164|172|180|188/ { mutate { replace => { "_security_result.severity_details" => "WARNING" } } } if [_syslog_priority] =~ /5|13|21|29|37|45|53|61|69|77|85|93|101|109|117|125|133|141|149|157|165|173|181|189/ { mutate { replace => { "_security_result.severity_details" => "NOTICE" } } } if [_syslog_priority] =~ /6|14|22|30|38|46|54|62|70|78|86|94|102|110|118|126|134|142|150|158|166|174|182|190/ { mutate { replace => { "_security_result.severity_details" => "INFORMATIONAL" } } } if [_syslog_priority] =~ /7|15|23|31|39|47|55|63|71|79|87|95|103|111|119|127|135|143|151|159|167|175|183|191/ { mutate { replace => { "_security_result.severity_details" => "DEBUG" } } } # Facilities (mapped to priority) if [_syslog_priority] =~ /0|1|2|3|4|5|6|7/ { mutate { replace => { "_security_result.priority_details" => "KERNEL" } } } if [_syslog_priority] =~ /8|9|10|11|12|13|14|15/ { mutate { replace => { "_security_result.priority_details" => "USER" } } } if [_syslog_priority] =~ /16|17|18|19|20|21|22|23/ { mutate { replace => { "_security_result.priority_details" => "MAIL" } } } if [_syslog_priority] =~ /24|25|26|27|28|29|30|31/ { mutate { replace => { "_security_result.priority_details" => "SYSTEM" } } } if [_syslog_priority] =~ /32|33|34|35|36|37|38|39/ { mutate { replace => { "_security_result.priority_details" => "SECURITY" } } } if [_syslog_priority] =~ /40|41|42|43|44|45|46|47/ { mutate { replace => { "_security_result.priority_details" => "SYSLOG" } } } if [_syslog_priority] =~ /48|49|50|51|52|53|54|55/ { mutate { replace => { "_security_result.priority_details" => "LPD" } } } if [_syslog_priority] =~ /56|57|58|59|60|61|62|63/ { mutate { replace => { "_security_result.priority_details" => "NNTP" } } } if [_syslog_priority] =~ /64|65|66|67|68|69|70|71/ { mutate { replace => { "_security_result.priority_details" => "UUCP" } } } if [_syslog_priority] =~ /72|73|74|75|76|77|78|79/ { mutate { replace => { "_security_result.priority_details" => "TIME" } } } if [_syslog_priority] =~ /80|81|82|83|84|85|86|87/ { mutate { replace => { "_security_result.priority_details" => "SECURITY" } } } if [_syslog_priority] =~ /88|89|90|91|92|93|94|95/ { mutate { replace => { "_security_result.priority_details" => "FTPD" } } } if [_syslog_priority] =~ /96|97|98|99|100|101|102|103/ { mutate { replace => { "_security_result.priority_details" => "NTPD" } } } if [_syslog_priority] =~ /104|105|106|107|108|109|110|111/ { mutate { replace => { "_security_result.priority_details" => "LOGAUDIT" } } } if [_syslog_priority] =~ /112|113|114|115|116|117|118|119/ { mutate { replace => { "_security_result.priority_details" => "LOGALERT" } } } if [_syslog_priority] =~ /120|121|122|123|124|125|126|127/ { mutate { replace => { "_security_result.priority_details" => "CLOCK" } } } if [_syslog_priority] =~ /128|129|130|131|132|133|134|135/ { mutate { replace => { "_security_result.priority_details" => "LOCAL0" } } } if [_syslog_priority] =~ /136|137|138|139|140|141|142|143/ { mutate { replace => { "_security_result.priority_details" => "LOCAL1" } } } if [_syslog_priority] =~ /144|145|146|147|148|149|150|151/ { mutate { replace => { "_security_result.priority_details" => "LOCAL2" } } } if [_syslog_priority] =~ /152|153|154|155|156|157|158|159/ { mutate { replace => { "_security_result.priority_details" => "LOCAL3" } } } if [_syslog_priority] =~ /160|161|162|163|164|165|166|167/ { mutate { replace => { "_security_result.priority_details" => "LOCAL4" } } } if [_syslog_priority] =~ /168|169|170|171|172|173|174|175/ { mutate { replace => { "_security_result.priority_details" => "LOCAL5" } } } if [_syslog_priority] =~ /176|177|178|179|180|181|182|183/ { mutate { replace => { "_security_result.priority_details" => "LOCAL6" } } } if [_syslog_priority] =~ /184|185|186|187|188|189|190|191/ { mutate { replace => { "_security_result.priority_details" => "LOCAL7" } } } mutate { merge => { "event.idm.read_only_udm.security_result" => "_security_result" } } } mutate { merge => { "@output" => "event" } } } }
L'affichage des résultats de l'extension d'analyseur affiche le format lisible.
metadata.product_log_id = "6161053" metadata.event_timestamp = "2024-10-31T15:10:10Z" metadata.event_type = "PROCESS_LAUNCH" metadata.vendor_name = "Microsoft" metadata.product_name = "PowerShell" metadata.product_event_type = "600" metadata.description = "Info" metadata.log_type = "POWERSHELL" principal.hostname = "win-adfs.lunarstiiiness.com" principal.resource.name = "in_powershell" principal.resource.resource_subtype = "im_msvistalog" principal.asset.hostname = "win-adfs.lunarstiiiness.com" target.hostname = "Default Host" target.process.command_line = "C:\Program Files\Microsoft Azure AD Sync\Bin\miiserver.exe" target.asset.hostname = "Default Host" target.asset.asset_id = "Host ID:bf203e94-72cf-4649-84a5-fc02baedb75f" security_result[0].severity_details = "INFORMATIONAL" security_result[0].priority_details = "USER"
Extrait de code (et Grok) : décoration d'événements, noms de variables temporaires et conversion de types de données
Exemples d'attributs:
- Format de la source de journal: JSON avec un en-tête Syslog
- Approche de mappage des données: extrait de code à l'aide de Grok
- Type de journal: WINDOWS_SYSMON
- Objet de l'extension d'analyseur: décoration des événements, des noms de variables temporaires et des types de données.
Description :
Cet exemple montre comment effectuer les actions suivantes lors de la création d'une extension d'analyseur:
- Décoration basée sur une instruction conditionnelle et compréhension des types de données dans un extrait de code.
- Convertir des types de données
- Noms de variables temporaires pour une meilleure lisibilité
- Champs répétés
Décoration basée sur une instruction conditionnelle
Cet exemple ajoute (informations contextuelles) des explications sur la signification de chaque type d'événement dans WINDOWS_SYSMON. Il utilise une instruction conditionnelle pour vérifier l'ID d'événement, puis ajoute un
Description
. Par exemple,EventID
1 est un événementProcess Creation
.Lorsque vous utilisez un filtre d'extraction, par exemple JSON, le type de données d'origine peut être conservé.
Dans l'exemple suivant, la valeur
EventID
est extraite en tant qu'entier par défaut. L'instruction conditionnelle évalue la valeurEventID
en tant qu'entier, et non en tant que chaîne.if [EventID] == 1 { mutate { replace => { "_description" => "[1] Process creation" } } }
Conversion des types de données
Vous pouvez convertir des types de données dans une extension d'analyseur à l'aide de la fonction convert.
mutate { convert => { "EventID" => "string" } on_error => "_convert_EventID_already_string" }
Noms de variables temporaires pour une meilleure lisibilité
Vous pouvez utiliser des noms de variables temporaires dans des extraits de code, puis les renommer pour qu'ils correspondent au nom de l'objet d'événement UDM de sortie final. Cela peut améliorer la lisibilité globale.
Dans l'exemple suivant, la variable
description
est renomméeevent.idm.read_only_udm.metadata.description
:mutate { rename => { "_description" => "event.idm.read_only_udm.metadata.description" } }
Champs répétés
L'extension d'analyseur complète est la suivante:
filter { # initialize variable mutate { replace => { "EventID" => "" } } # Use grok to parse syslog messages. # The on_error clause handles messages that don't match the pattern. grok { match => { "message" => [ "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}" ] } on_error => "not_supported_format" } if ![not_supported_format] { json { source => "log_data" on_error => "not_json" } if ![not_json] { if [EventID] == 1 { mutate { replace => { "_description" => "[1] Process creation" } } } if [EventID] == 2 { mutate { replace => { "_description" => "[2] A process changed a file creation time" } } } if [EventID] == 3 { mutate { replace => { "_description" => "[3] Network connection" } } } if [EventID] == 4 { mutate { replace => { "_description" => "[4] Sysmon service state changed" } } } if [EventID] == 5 { mutate { replace => { "_description" => "[5] Process terminated" } } } if [EventID] == 6 { mutate { replace => { "_description" => "[6] Driver loaded" } } } if [EventID] == 7 { mutate { replace => { "_description" => "[7] Image loaded" } } } if [EventID] == 8 { mutate { replace => { "_description" => "[8] CreateRemoteThread" } } } if [EventID] == 9 { mutate { replace => { "_description" => "[9] RawAccessRead" } } } if [EventID] == 10 { mutate { replace => { "_description" => "[10] ProcessAccess" } } } if [EventID] == 11 { mutate { replace => { "_description" => "[11] FileCreate" } } } if [EventID] == 12 { mutate { replace => { "_description" => "[12] RegistryEvent (Object create and delete)" } } } if [EventID] == 13 { mutate { replace => { "_description" => "[13] RegistryEvent (Value Set)" } } } if [EventID] == 14 { mutate { replace => { "_description" => "[14] RegistryEvent (Key and Value Rename)" } } } if [EventID] == 15 { mutate { replace => { "_description" => "[15] FileCreateStreamHash" } } } if [EventID] == 16 { mutate { replace => { "_description" => "[16] ServiceConfigurationChange" } } } if [EventID] == 17 { mutate { replace => { "_description" => "[17] PipeEvent (Pipe Created)" } } } if [EventID] == 18 { mutate { replace => { "_description" => "[18] PipeEvent (Pipe Connected)" } } } if [EventID] == 19 { mutate { replace => { "_description" => "[19] WmiEvent (WmiEventFilter activity detected)" } } } if [EventID] == 20 { mutate { replace => { "_description" => "[20] WmiEvent (WmiEventConsumer activity detected)" } } } if [EventID] == 21 { mutate { replace => { "_description" => "[21] WmiEvent (WmiEventConsumerToFilter activity detected)" } } } if [EventID] == 22 { mutate { replace => { "_description" => "[22] DNSEvent (DNS query)" } } } if [EventID] == 23 { mutate { replace => { "_description" => "[23] FileDelete (File Delete archived)" } } } if [EventID] == 24 { mutate { replace => { "_description" => "[24] ClipboardChange (New content in the clipboard)" } } } if [EventID] == 25 { mutate { replace => { "_description" => "[25] ProcessTampering (Process image change)" } } } if [EventID] == 26 { mutate { replace => { "_description" => "[26] FileDeleteDetected (File Delete logged)" } } } if [EventID] == 255 { mutate { replace => { "_description" => "[255] Error" } } } mutate { rename => { "_description" => "event.idm.read_only_udm.metadata.description" } } statedump{} mutate { merge => { "@output" => "event" } } } } }
L'exécution de l'extension d'analyseur ajoute la décoration dans le champ
metadata.description
.metadata.product_log_id = "6008459" metadata.event_timestamp = "2024-10-31T14:41:53.442Z" metadata.event_type = "REGISTRY_CREATION" metadata.vendor_name = "Microsoft" metadata.product_name = "Microsoft-Windows-Sysmon" metadata.product_event_type = "12" metadata.description = "[12] RegistryEvent (Object create and delete)" metadata.log_type = "WINDOWS_SYSMON" additional.fields["thread_id"] = "3972" additional.fields["channel"] = "Microsoft-Windows-Sysmon/Operational" additional.fields["Keywords"] = "-9223372036854776000" additional.fields["Opcode"] = "Info" additional.fields["ThreadID"] = "3972" principal.hostname = "win-adfs.lunarstiiiness.com" principal.user.userid = "tim.smith_admin" principal.user.windows_sid = "S-1-5-18" principal.process.pid = "6856" principal.process.file.full_path = "C:\Windows\system32\wsmprovhost.exe" principal.process.product_specific_process_id = "SYSMON:{927d35bf-a374-6495-f348-000000002900}" principal.administrative_domain = "LUNARSTIIINESS" principal.asset.hostname = "win-adfs.lunarstiiiness.com" target.registry.registry_key = "HKU\S-1-5-21-3263964631-4121654051-1417071188-1116\Software\Policies\Microsoft\SystemCertificates\CA\Certificates" observer.asset_id = "5770385F:C22A:43E0:BF4C:06F5698FFBD9" observer.process.pid = "2556" about[0].labels[0].key = "Category ID" about[0].labels[0].value = "RegistryEvent" security_result[0].rule_name = "technique_id=T1553.004,technique_name=Install Root Certificate" security_result[0].summary = "Registry object added or deleted" security_result[0].severity = "INFORMATIONAL" security_result[1].rule_name = "EventID: 12" security_result[2].summary = "12"
Exemples de code XML
Les exemples suivants montrent comment créer une extension d'analyseur lorsque la source de journal est au format XML.
Extrait de code : extraction de champ arbitraire dans l'objet additional
Exemples d'attributs:
- Format de la source de journaux: XML
- Approche de mappage des données: extrait de code
- Type de journal: WINDOWS_DEFENDER_AV
- Objet de l'extension d'analyseur: Extraction de champs arbitraires dans l'objet
additional
Description :
L'objectif de cet exemple est d'extraire et de stocker la valeur
Platform Version
, par exemple pour pouvoir créer des rapports et rechercheroutdated platform versions
.Après avoir examiné le document Champs UDM importants, aucun champ UDM standard approprié n'a été identifié. Par conséquent, cet exemple utilisera l'objet
additional
pour stocker ces informations en tant que paire clé-valeur personnalisée.# Parser Extension for WINDOWS_DEFENDER_AV # 2024-10-29: cmmartin: Extracting 'Platform Version' into Additional filter { # Uses XPath to target the specific element(s) xml { source => "message" xpath => { "/Event/EventData/Data[@Name='Platform version']" => "platform_version" } on_error => "_xml_error" } # Conditional processing: Only proceed if XML parsing was successful if ![_xml_error] { # Prepare the additional field structure using a temporary variable mutate{ replace => { "additional_platform_version.key" => "Platform Version" "additional_platform_version.value.string_value" => "%{platform_version}" } on_error => "no_platform_version" } # Merge the additional field into the event1 structure. if ![no_platform_version] { mutate { merge => { "event1.idm.read_only_udm.additional.fields" => "additional_platform_version" } } } mutate { merge => { "@output" => "event1" } } } }
L'exécution de PREVIEW UDM OUTPUT indique que le nouveau champ a bien été ajouté.
metadata.event_timestamp = "2024-10-29T14:08:52Z" metadata.event_type = "STATUS_HEARTBEAT" metadata.vendor_name = "Microsoft" metadata.product_name = "Windows Defender AV" metadata.product_event_type = "MALWAREPROTECTION_SERVICE_HEALTH_REPORT" metadata.description = "Endpoint Protection client health report (time in UTC)." metadata.log_type = "WINDOWS_DEFENDER_AV" additional.fields["Platform Version"] = "4.18.24080.9" principal.hostname = "win-dc-01.ad.1823127835827.altostrat.com" security_result[0].description = "EventID: 1151" security_result[0].action[0] = "ALLOW" security_result[0].severity = "LOW"
Extrait de code (et Grok) : extraction de champ arbitraire dans le nom d'hôte principal
Exemples d'attributs:
- Format de la source de journaux: XML
- Approche de mappage des données: extrait de code à l'aide de Grok
- Type de journal: WINDOWS_DEFENDER_AV
- Objet de l'extension d'analyseur: Extraction de champ arbitraire dans le nom d'hôte principal
Description :
L'objectif de cet exemple est d'extraire le
Hostname
d'unFQDN
et d'écraser le champprincipal.hostname
.Cet exemple vérifie si le champ
Computer name
du journal brut inclut unFQDN
. Dans ce cas, il n'extrait que la partieHostname
et écrase le champPrincipal Hostname
de l'UDM.Après avoir examiné l'analyseur et le document sur les champs UDM importants, il est clair que le champ
principal.hostname
doit être utilisé.# Parser Extension for WINDOWS_DEFENDER_AV # 2024-10-29: Extract Hostname from FQDN and overwrite principal.hostname filter { # Uses XPath to target the specific element(s) xml { source => "message" xpath => { "/Event/System/Computer" => "hostname" } on_error => "_xml_error" } # Conditional processing: Only proceed if XML parsing was successful if ![_xml_error] { # Extract all characters before the first dot in the hostname variable grok { match => { "hostname" => "(?<hostname>[^.]+)" } } mutate { replace => { "event1.idm.read_only_udm.principal.hostname" => "%{hostname}" } } mutate { merge => { "@output" => "event1" } } } }
Cette extension d'analyseur utilise une instruction Grok pour exécuter une expression régulière afin d'extraire le champ
hostname
. L'expression régulière elle-même utilise un groupe de capture nommé, ce qui signifie que tout ce qui correspond entre les parenthèses sera stocké dans le champ nomméhostname
, correspondant à un ou plusieurs caractères jusqu'à ce qu'il rencontre un point. Cette opération ne capture que lehostname
dans unFQDN
.Toutefois, lorsque vous exécutez la commande PREVIEW UDM OUTPUT, une erreur est renvoyée. Pourquoi ?
generic::unknown: pipeline.ParseLogEntry failed: LOG_PARSING_CBN_ERROR: "generic::internal: pipeline failed: filter grok (2) failed: field\ "hostname\" already exists in data and is not overwritable"
Instruction
overwrite
GrokDans une instruction Grok, un groupe de capture nommé ne peut pas écraser une variable existante, sauf si elle est explicitement spécifiée à l'aide de l'instruction
overwrite
. Dans ce scénario, vous pouvez utiliser un nom de variable différent pour le groupe de capture nommé dans l'instruction Grok ou, comme indiqué dans l'exemple d'extrait de code suivant, utiliser l'instructionoverwrite
pour écraser explicitement la variablehostname
existante.# Parser Extension for WINDOWS_DEFENDER_AV # 2024-10-29: cmmartin: Overwriting principal Hostname filter { xml { source => "message" xpath => { "/Event/System/Computer" => "hostname" } on_error => "_xml_error" } if ![_xml_error] { grok { match => { "hostname" => "(?<hostname>[^.]+)" } overwrite => ["hostname"] on_error => "_grok_hostname_error" } mutate { replace => { "event1.idm.read_only_udm.principal.hostname" => "%{hostname}" } } mutate { merge => { "@output" => "event1" } } } }
L'exécution de PREVIEW UDM OUTPUT à nouveau montre que le nouveau champ a été ajouté après l'extraction de
hostname
à partir deFQDN
.metadata.event_timestamp"2024-10-29T14:08:52Z" metadata.event_type"STATUS_HEARTBEAT" metadata.vendor_name"Microsoft" metadata.product_name"Windows Defender AV" metadata.product_event_type"MALWAREPROTECTION_SERVICE_HEALTH_REPORT" metadata.description"Endpoint Protection client health report (time in UTC)." metadata.log_type"WINDOWS_DEFENDER_AV" principal.hostname"win-dc-01" security_result[0].description"EventID: 1151" security_result[0].action[0]"ALLOW" security_result[0].severity"LOW"