Exemples d'extensions d'analyseur

Compatible avec:

Ce document fournit des exemples de création d'extensions d'analyseur dans différents scénarios. Pour en savoir plus sur les extensions d'analyseur, consultez Créer des extensions d'analyseur.

Exemples d'extensions d'analyseur

Utilisez les tableaux d'attributs suivants pour trouver rapidement l'exemple de code dont vous avez besoin.

Exemples sans code

Format de la source de journaux Exemple de titre Description Concepts d'analyseur dans cet exemple
JSON
(type de journal: GCP_IDS)
Champs d'extraction Extrayez des champs d'un journal au format JSON. Pas de code
JSON
(type de journal: WORKSPACE_ALERTS)
Extraire des champs avec la valeur de précondition Extrayez des champs d'un journal au format JSON et normalisez-les dans un champ UDM répété, avec une précondition.

Exemples d'extraits de code

Format de la source de journaux Exemple de titre Description Concepts d'analyseur dans cet exemple
JSON
(type de journal: GCP_IDS)
Ajouter un user-agent HTTP
  • Extrayez l'user-agent du parseur HTTP réseau et créez un target hostname à partir du requestUrl.
  • Attribuez un espace de noms pour vous assurer que l'aliasing et l'enrichissement basés sur les composants sont effectués.
CSV
(type de journal: MISP_IOC)
Extraction de champs arbitraires dans l'objet UDM additional Extrait des champs dans UDM > Entité > Objet UDM additional > paire clé-valeur Objet UDM additional
Syslog
(type de journal: POWERSHELL)
Extraire la priorité et la gravité de Syslog Extrayez les valeurs de l'établissement Syslog et de la gravité dans les champs "Priorité du résultat de sécurité UDM" et "Sévérité". Basé sur Grok
JSON avec un en-tête Syslog
(type de journal: WINDOWS_SYSMON)
Décoration basée sur une instruction conditionnelle
  • Ajoute une décoration (informations contextuelles) dans le champ metadata.description en fonction d'une instruction conditionnelle et de la compréhension des types de données dans les extraits de code.
  • Lorsque vous utilisez un filtre d'extraction, le type de données d'origine peut être conservé.
  • Une instruction conditionnelle Grok doit utiliser le type de données d'origine pour évaluer le champ.
  • Basé sur Grok
  • Instruction conditionnelle Grok
  • Le type de données d'origine d'un champ extrait peut être conservé.
  • Une instruction conditionnelle Grok doit utiliser le type de données d'origine pour évaluer le champ.
JSON avec un en-tête Syslog
(type de journal: WINDOWS_SYSMON)
Convertir les types de données
  • Convertissez les types de données dans une extension d'analyseur à l'aide de la fonction convert.
  • Utilisez des instructions on_error pour assurer une gestion appropriée des erreurs et éviter les échecs de l'extension de l'analyseur causés par des erreurs.
  • Basé sur Grok
  • Convertir les types de données
  • Utilisez des instructions on_error pour gérer les erreurs.
JSON avec un en-tête Syslog
(type de journal: WINDOWS_SYSMON)
Noms de variables temporaires pour une meilleure lisibilité Vous pouvez utiliser des noms de variables temporaires dans des extraits de code, puis les renommer pour qu'ils correspondent au nom de l'objet d'événement UDM de sortie finale. Cela peut améliorer la lisibilité globale.
  • Basé sur Grok
  • Utilisez des noms de variables temporaires, puis remplacez-les par les noms de l'UDM de sortie finale.
JSON avec un en-tête Syslog
(type de journal: WINDOWS_SYSMON)
Champs répétés Utilisez les champs répétés dans les extraits de code avec précaution, par exemple le champ security_result.
XML
(type de journal: WINDOWS_DEFENDER_AV)
Extraction de champ arbitraire dans l'objet additional
  • Extrayez et stockez la valeur de la version de la plate-forme, par exemple, pour pouvoir créer des rapports et rechercher des versions obsolètes de la plate-forme.
  • Dans cet exemple, il n'existe pas de champ UDM standard approprié. L'objet additional est donc utilisé pour stocker les informations sous la forme d'une paire clé-valeur personnalisée.
L'objet additional permet de stocker les informations sous forme de paire clé-valeur personnalisée.
XML
(type de journal: WINDOWS_DEFENDER_AV)
Extraction de champ arbitraire dans le nom d'hôte principal
  • Extrayez le nom d'hôte d'un nom de domaine complet.
  • Le traitement conditionnel permet de déterminer si le champ principal.hostname doit être écrasé.
  • L'instruction Grok utilise une expression régulière pour extraire le champ hostname. L'expression régulière elle-même utilise un groupe de capture nommé, ce qui signifie que tout ce qui correspond entre les parenthèses sera stocké dans le champ nommé hostname, correspondant à un ou plusieurs caractères jusqu'à ce qu'il rencontre un point. Cette opération ne capture que le hostname dans un nom de domaine complet.
Instruction overwrite Grok
  • Toutefois, lorsque vous exécutez la PRÉVUE DE LA SORTIE UDM, une erreur est renvoyée: "LOG_PARSING_CBN_ERROR: Le champ hostname existe déjà dans les données et ne peut pas être écrasé".
  • Dans une instruction Grok, un groupe de capture nommé ne peut pas écraser une variable existante, sauf si vous le spécifiez explicitement à l'aide de l'instruction overwrite. Dans ce scénario, vous pouvez utiliser un nom de variable différent pour le groupe de capture nommé dans l'instruction Grok ou (comme indiqué dans cet exemple) utiliser l'instruction overwrite pour remplacer explicitement la variable d'hôte existante.
  • Basé sur Grok
  • Le traitement conditionnel permet de déterminer si un champ doit être écrasé.
  • Instruction Grok à l'aide d'expressions régulières.
  • Instruction overwrite Grok

Exemples de fichiers JSON

Les exemples suivants montrent comment créer une extension d'analyseur lorsque la source de journal est au format JSON.

Sans code : extraire des champs

Exemples d'attributs:

  • Format de la source de journal: JSON
  • Approche de mappage des données: sans code
  • Type de journal: GCP_IDS
  • Objet de l'extension d'analyseur: Extraire des champs.
  • Description :

    Plusieurs champs liés au réseau ne sont pas extraits. Comme cet exemple de journal est un journal structuré au format JSON, nous pouvons utiliser l'approche sans code (Map data fields) pour créer l'extension de l'analyseur.

    Les champs d'origine que nous souhaitons extraire sont les suivants:

    • total_packets (chaîne)
    • elapsed_time (chaîne)
    • total_bytes (chaîne)

    Voici un exemple d'entrée de journal brute:

    {
    "insertId": "625a41542d64c124e7db097ae0906ccb-1@a3",
    "jsonPayload": {
      "destination_port": "80",
      "application": "incomplete",
      "ip_protocol": "tcp",
      "network": "projects/prj-p-shared-base/global/networks/shared-vpc-production",
      "start_time": "2024-10-29T21:14:59Z",
      "source_port": "41936",
      "source_ip_address": "35.191.200.157",
      "total_packets": "6",
      "elapsed_time": "0",
      "destination_ip_address": "192.168.0.11",
      "total_bytes": "412",
      "repeat_count": "1",
      "session_id": "1289742"
    },
    "resource": {
      "type": "ids.googleapis.com/Endpoint",
      "labels": {
        "resource_container": "projects/12345678910",
        "location": "europe-west4-a",
        "id": "p-europe-west4"
      }
    },
    "timestamp": "2024-10-29T21:15:21Z",
    "logName": "projects/prj-p-shared-base/logs/ids.googleapis.com%2Ftraffic",
    "receiveTimestamp": "2024-10-29T21:15:24.051990717Z"
    }
    

    L'exemple utilise l'approche sans code pour créer une extension d'analyseur à l'aide du mappage de champs de données suivant:

    Chemin de précondition Opérateur de précondition Valeur de précondition Chemin d'accès aux données brutes Champ de destination*
    jsonPayload.total_bytes NOT_EQUALS "" jsonPayload.total_bytes udm.principal.network.received_bytes
    jsonPayload.elapsed_time NOT_EQUALS "" jsonPayload.elapsed_time udm.principal.network.session_duration.seconds
    jsonPayload.total_packets NOT_EQUALS "" jsonPayload.total_packets udm.principal.network.received_packets

    L'exécution de l'extension d'analyseur ajoute les trois champs extraits à l'objet principal.network.

    metadata.product_log_id = "625a41542d64c124e7db097ae0906ccb-1@a3"
    metadata.event_timestamp = "2024-10-29T21:14:59Z"
    metadata.event_type = "NETWORK_CONNECTION"
    metadata.vendor_name = "Google Cloud"
    metadata.product_name = "IDS"
    metadata.ingestion_labels[0].key = "label"
    metadata.ingestion_labels[0].value = "GCP_IDS"
    metadata.log_type = "GCP_IDS"
    principal.ip[0] = "35.191.200.157"
    principal.port = 41936
    principal.network.received_bytes = 412
    principal.network.session_duration.seconds = "0s"
    principal.network.received_packets = 6
    target.ip[0] = "192.168.0.11"
    target.port = 80
    target.application = "incomplete"
    observer.location.country_or_region = "EUROPE"
    observer.location.name = "europe-west4-a"
    observer.resource.name = "projects/12345678910"
    observer.resource.resource_type = "CLOUD_PROJECT"
    observer.resource.attribute.cloud.environment = "GOOGLE_CLOUD_PLATFORM"
    observer.resource.product_object_id = "p-europe-west4"
    network.ip_protocol = "TCP"
    network.session_id = "1289742"
    

Sans code : extraire des champs avec la valeur de précondition

Exemples d'attributs:

  • Format de la source de journal: JSON
  • Approche de mappage des données: sans code
  • Type de journal: WORKSPACE_ALERTS
  • Objet de l'extension de l'analyseur: Extraire les champs avec la valeur de précondition.
  • Description :

    L'analyseur d'origine n'extrait pas le email address de l'utilisateur principal concerné par une alerte de protection contre la perte de données (DLP).

    Cet exemple utilise une extension d'analyseur sans code pour extraire le email address et le normaliser dans un champ UDM répété, avec une condition préalable.

    Lorsque vous travaillez avec des champs répétés dans une extension d'analyseur sans code, vous devez indiquer si vous souhaitez:

    • replace (remplace toutes les valeurs des champs répétés dans l'objet UDM existant) ou
    • append (ajoute les valeurs extraites aux champs répétés).

    Pour en savoir plus, consultez la section Champs répétés.

    Cet exemple remplace toutes les adresses e-mail existantes dans le champ principal.user.email_address normalisé.

    Les conditions préalables vous permettent d'effectuer des vérifications conditionnelles avant d'effectuer une opération d'extraction. Dans la plupart des cas, le champ de précondition est le même que le champ de données brutes que vous souhaitez extraire, avec un opérateur de précondition de not Null, par exemple, foo != "".

    Toutefois, comme dans notre exemple, la valeur du champ de données brutes que vous souhaitez extraire n'est pas toujours présente dans toutes les entrées de journal. Dans ce cas, vous pouvez utiliser un autre champ de précondition pour filtrer l'opération d'extraction. Dans notre exemple, le champ triggeringUserEmail brut que vous souhaitez extraire n'est présent que dans les journaux pour lesquels type = Data Loss Prevention.

    Voici des exemples de valeurs à saisir dans les champs d'extension de l'analyseur sans code:

    Chemin de précondition Opérateur de précondition Valeur de précondition Chemin d'accès aux données brutes Champ de destination*
    type ÉGAL À Protection contre la perte de données data.ruleViolationInfo.triggeringUserEmail udm.principal.user.email_addresses

    L'exemple suivant montre les champs d'extension de l'analyseur sans code renseignés avec des exemples de valeurs:

    image2

    L'exécution de l'extension d'analyseur ajoute correctement email_address à l'objet principal.user.

    metadata.product_log_id = "Ug71LGqBr6Q="
    metadata.event_timestamp = "2022-12-18T12:17:35.154368Z"
    metadata.event_type = "USER_UNCATEGORIZED"
    metadata.vendor_name = "Google Workspace"
    metadata.product_name = "Google Workspace Alerts"
    metadata.product_event_type = "DlpRuleViolation"
    metadata.log_type = "WORKSPACE_ALERTS"
    additional.fields["resource_title"] = "bq-results-20221215-112933-1671103787123.csv"
    principal.user.email_addresses[0] = "foo.bar@altostrat.com"
    target.resource.name = "DRIVE"
    target.resource.resource_type = "STORAGE_OBJECT"
    target.resource.product_object_id = "1wLteoF3VHljS_8_ABCD_VVbhFTfcTQplJ5k1k7cL4r8"
    target.labels[0].key = "resource_title"
    target.labels[0].value = "bq-results-20221321-112933-1671103787697.csv"
    about[0].resource.resource_type = "CLOUD_ORGANIZATION"
    about[0].resource.product_object_id = "C01abcde2"
    security_result[0].about.object_reference.id = "ODU2NjEwZTItMWE2YS0xMjM0LWJjYzAtZTJlMWU2YWQzNzE3"
    security_result[0].category_details[0] = "Data Loss Prevention"
    security_result[0].rule_name = "Sensitive Projects Match"
    security_result[0].summary = "Data Loss Prevention"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "MEDIUM"
    security_result[0].rule_id = "rules/00abcdxs183abcd"
    security_result[0].action_details = "ALERT, DRIVE_WARN_ON_EXTERNAL_SHARING"
    security_result[0].alert_state = "ALERTING"
    security_result[0].detection_fields[0].key = "start_time"
    security_result[0].detection_fields[0].value = "2022-12-18T12:17:35.154368Z"
    security_result[0].detection_fields[1].key = "status"
    security_result[0].detection_fields[1].value = "NOT_STARTED"
    security_result[0].detection_fields[2].key = "trigger"
    security_result[0].detection_fields[2].value = "DRIVE_SHARE"
    security_result[0].rule_labels[0].key = "detector_name"
    security_result[0].rule_labels[0].value = "EMAIL_ADDRESS"
    network.email.to[0] = "foo.bar@altostrat.com"
    

Extrait de code : ajouter un user-agent HTTP

Exemples d'attributs:

  • Format de la source de journal: JSON
  • Approche de mappage des données: extrait de code
  • Type de journal: GCP_IDS
  • Objet de l'extension d'analyseur: Ajout d'un user-agent HTTP.
  • Description :

    Il s'agit d'un exemple de type d'objet UDM non standard qui n'est pas compatible avec l'approche sans code et qui nécessite donc d'utiliser un extrait de code. L'analyseur par défaut n'extrait pas l'analyse Network HTTP Parser User Agent. De plus, pour plus de cohérence:

    1. Un élément Target Hostname sera créé à partir de requestUrl.
    2. Un Namespace est attribué pour s'assurer que l'aliasing et l'enrichissement basés sur les composants sont effectués.
    # GCP_LOADBALANCING
    # owner: @owner
    # updated: 2022-12-23
    # Custom parser extension that:
    # 1) adds consistent Namespace 
    # 2) adds Parsed User Agent Object 
    filter {
        # Initialize placeholder
        mutate {
            replace => {
                "httpRequest.userAgent" => ""
                "httpRequest.requestUrl" => ""
            }
        }
        json {
            on_error => "not_json"
            source => "message"
            array_function => "split_columns"
        }
        if ![not_json] {
          #1 - Override Namespaces
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.target.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.src.namespace" => "TMO"
                }
            }
            #2 - Parsed User Agent
            if [httpRequest][requestUrl]!= "" {
                grok {
                    match => {
                        "httpRequest.requestUrl" => ["\/\/(?P<_hostname>.*?)\/"]
                    }
                    on_error => "_grok_hostname_failed"
                }
                if ![_grok_hostname_failed] {
                    mutate {
                        replace => {
                            "event1.idm.read_only_udm.target.hostname" => "%{_hostname}"
                        }
                    }
                }
            }
            if [httpRequest][userAgent] != "" {
                mutate {
                    convert => {
                        "httpRequest.userAgent" => "parseduseragent"
                    }
                }
                #Map the converted "user_agent" to the new UDM field "http.parsed_user_agent".
                mutate {
                    rename => {
                        "httpRequest.userAgent" => "event1.idm.read_only_udm.network.http.parsed_user_agent"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

Exemple de fichier CSV

L'exemple suivant montre comment créer une extension d'analyseur lorsque la source de journaux est au format CSV.

Extrait de code : extraction de champs arbitraires dans l'objet additional

Exemples d'attributs:

  • Format de la source de journaux: CSV
  • Approche de mappage des données: extrait de code
  • Type de journal: MISP_IOC
  • Objet de l'extension d'analyseur: extraction de champs arbitraires dans l'objet additional.
  • Description :

    Dans cet exemple, l'intégration du contexte d'entité UDM MISP_IOC est utilisée. L'objet UDM de paire clé-valeur additional permet de capturer les informations contextuelles non extraites par l'analyseur par défaut et d'ajouter des champs spécifiques à chaque organisation. Par exemple, une URL redirigeant vers l'instance MISP spécifique.

    Voici la source de journaux au format CSV pour cet exemple:

    1 9d66d38a-14e1-407f-a4d1-90b82aa1d59f
    2 3908
    3 Network activity
    4 ip-dst
    5 117.253.154.123
    6
    7
    8 1687894564
    9
    10
    11
    12
    13
    14 DigitalSide Malware report\: MD5\: 59ce0baba11893f90527fc951ac69912
    15 ORGNAME
    16 DIGITALSIDE.IT
    17 0
    18 Medium
    19 0
    20 2023-06-23
    21 tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch
    22 1698036218

    image

    # MISP_IOC
    # owner: @owner
    # updated: 2024-06-21
    # Custom parser extension that:
    # 1) adds a link back to internal MISP tenant 
    # 2) extracts missing fields into UDM > Entity > Additional fields
    filter {
        # Set the base URL for MISP. Remember to replace this placeholder!
        mutate {
            replace => {
                "misp_base_url" => "https://<YOUR_MISP_URL>"
            }
        }
        # Parse the CSV data from the 'message' field. Uses a comma as the separator.
        # The 'on_error' option handles lines that are not properly formatted CSV.
        csv {
            source => "message"
            separator => ","
            on_error => "broken_csv"
        }
        # If the CSV parsing was successful...
        if ![broken_csv] {
            # Rename the CSV columns to more descriptive names.
            mutate {
                rename => {
                    "column2" => "event_id"
                    "column8" => "object_timestamp"
                    "column16" => "event_source_org"
                    "column17" => "event_distribution"
                    "column19" => "event_analysis"
                    "column22" => "attribute_timestamp"
                }
            }
        }
        # Add a link to view the event in MISP, if an event ID is available.
        # "column2" => "event_id"
        if [event_id] != "" {
            mutate {
                replace => {
                    "additional_url.key" => "view_in_misp"
                    "additional_url.value.string_value" => "%{misp_base_url}/events/view/%{event_id}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_url"
                }
            }
        }
        # Add the object timestamp as an additional field, if available.
        # "column8" => "object_timestamp"
        if [object_timestamp] != "" {
            mutate {
                replace => {
                    "additional_object_timestamp.key" => "object_timestamp"
                    "additional_object_timestamp.value.string_value" => "%{object_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_object_timestamp"
                }
            }
        }
        # Add the event source organization as an additional field, if available.
        # "column16" => "event_source_org"
        if [event_source_org] != "" {
            mutate {
                replace => {
                    "additional_event_source_org.key" => "event_source_org"
                    "additional_event_source_org.value.string_value" => "%{event_source_org}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_source_org"
                }
            }
        }
        # Add the event distribution level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column17" => "event_distribution"
        if [event_distribution] != "" {
            if [event_distribution] == "0" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "YOUR_ORGANIZATION_ONLY"
                    }
                }
            } else if [event_distribution] == "1" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "THIS_COMMUNITY_ONLY"
                    }
                }
            } else if [event_distribution] == "2" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "CONNECTED_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "3" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "ALL_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "4" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "SHARING_GROUP"
                    }
                }
            } else if [event_distribution] == "5" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "INHERIT_EVENT"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_distribution.key" => "event_distribution"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_distribution"
                }
            }
        }
        # Add the event analysis level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column19" => "event_analysis"
        if [event_analysis] != "" {
            if [event_analysis] == "0" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "INITIAL"
                    }
                }
            } else if [event_analysis] == "1" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "ONGOING"
                    }
                }
            } else if [event_analysis] == "2" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "COMPLETE"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_analysis.key" => "event_analysis"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_analysis"
                }
            }
        }
        # Add the attribute timestamp as an additional field, if available.
        # "column22" => "attribute_timestamp" 
        if [attribute_timestamp] != "" {
            mutate {
                replace => {
                    "additional_attribute_timestamp.key" => "attribute_timestamp"
                    "additional_attribute_timestamp.value.string_value" => "%{attribute_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_attribute_timestamp"
                }
            }
        }
        # Finally, merge the 'event' data into the '@output' field.
        mutate {
            merge => {
                "@output" => "event"
            }
        }
    }
    

    L'exécution de l'extension d'analyseur ajoute les champs personnalisés du fichier CSV à l'objet additional.

    metadata.product_entity_id = "9d66d38a-14e1-407f-a4d1-90b82aa1d59f"
    metadata.collected_timestamp = "2024-10-31T15:16:08Z"
    metadata.vendor_name = "MISP"
    metadata.product_name = "MISP"
    metadata.entity_type = "IP_ADDRESS"
    metadata.description = "ip-dst"
    metadata.interval.start_time = "2023-06-27T19:36:04Z"
    metadata.interval.end_time = "9999-12-31T23:59:59Z"
    metadata.threat[0].category_details[0] = "Network activity"
    metadata.threat[0].description = "tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch - additional info: DigitalSide Malware report: MD5: 59ce0baba11893f90527fc951ac69912"
    metadata.threat[0].severity_details = "Medium"
    metadata.threat[0].threat_feed_name = "DIGITALSIDE.IT"
    entity.ip[0] = "117.253.154.123"
    additional.fields["view_in_misp"] = "https:///events/view/3908"
    additional.fields["object_timestamp"] = "1687894564"
    additional.fields["event_source_org"] = "DIGITALSIDE.IT"
    additional.fields["event_distribution"] = "YOUR_ORGANIZATION_ONLY"
    additional.fields["event_analysis"] = "INITIAL"
    additional.fields["attribute_timestamp"] = "1698036218"
    

Exemples de Grok

Les exemples suivants montrent comment créer des extensions d'analyseur basées sur Grok.

Extrait de code (et Grok) : extraction de la priorité et de la gravité

Exemples d'attributs:

  • Format de la source de journaux: Syslog
  • Approche de mappage des données: extrait de code à l'aide de Grok
  • Type de journal: POWERSHELL
  • Objet de l'extension d'analyseur: Extraction de la priorité et de la gravité.
  • Description :

    Dans cet exemple, une extension d'analyseur basée sur Grok est créée pour extraire les valeurs Syslog Facility et Severity dans les champs Priority et Severity du résultat de sécurité UDM.

    filter {
        # Use grok to parse syslog messages. The on_error clause handles messages that don't match the pattern.
        grok {
            match => {
                "message" => [
                    # Extract message with syslog headers.
                    "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
                ]
            }
            on_error => "not_supported_format"
        }
        # If the grok parsing failed, tag the event as unsupported and drop it.
        if ![not_supported_format] {
            if [_syslog_priority] != "" {
                if [_syslog_priority] =~ /0|8|16|24|32|40|48|56|64|72|80|88|96|104|112|120|128|136|144|152|160|168|176|184/ {
                    mutate { replace => { "_security_result.severity_details" => "EMERGENCY" } } 
                }
                if [_syslog_priority] =~ /1|9|17|25|33|41|49|57|65|73|81|89|97|105|113|121|129|137|145|153|161|169|177|185/ {
                    mutate { replace => { "_security_result.severity_details" => "ALERT" } } 
                }
                if [_syslog_priority] =~ /2|10|18|26|34|42|50|58|66|74|82|90|98|106|114|122|130|138|146|154|162|170|178|186/ {
                    mutate { replace => { "_security_result.severity_details" => "CRITICAL" } }
                }
                if [_syslog_priority] =~ /3|11|19|27|35|43|51|59|67|75|83|91|99|107|115|123|131|139|147|155|163|171|179|187/ {
                    mutate { replace => { "_security_result.severity_details" => "ERROR" } }
                }
                if [_syslog_priority] =~ /4|12|20|28|36|44|52|60|68|76|84|92|100|108|116|124|132|140|148|156|164|172|180|188/ {
                    mutate { replace => { "_security_result.severity_details" => "WARNING" } }
                }
                if [_syslog_priority] =~ /5|13|21|29|37|45|53|61|69|77|85|93|101|109|117|125|133|141|149|157|165|173|181|189/ {
                    mutate { replace => { "_security_result.severity_details" => "NOTICE" } }
                }
                if [_syslog_priority] =~ /6|14|22|30|38|46|54|62|70|78|86|94|102|110|118|126|134|142|150|158|166|174|182|190/ {
                    mutate { replace => { "_security_result.severity_details" => "INFORMATIONAL" } }
                }
                if [_syslog_priority] =~ /7|15|23|31|39|47|55|63|71|79|87|95|103|111|119|127|135|143|151|159|167|175|183|191/ {
                    mutate { replace => { "_security_result.severity_details" => "DEBUG" } }
                }
                # Facilities (mapped to priority)
                if [_syslog_priority] =~ /0|1|2|3|4|5|6|7/ { 
                    mutate { replace => { "_security_result.priority_details" => "KERNEL" } } 
                }
                if [_syslog_priority] =~ /8|9|10|11|12|13|14|15/ { 
                    mutate { replace => { "_security_result.priority_details" => "USER" } } 
                }
                if [_syslog_priority] =~ /16|17|18|19|20|21|22|23/ { 
                    mutate { replace => { "_security_result.priority_details" => "MAIL" } } 
                }
                if [_syslog_priority] =~ /24|25|26|27|28|29|30|31/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSTEM" } } 
                }
                if [_syslog_priority] =~ /32|33|34|35|36|37|38|39/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /40|41|42|43|44|45|46|47/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSLOG" } } 
                }
                if [_syslog_priority] =~ /48|49|50|51|52|53|54|55/ { 
                    mutate { replace => { "_security_result.priority_details" => "LPD" } } 
                }
                if [_syslog_priority] =~ /56|57|58|59|60|61|62|63/ { 
                    mutate { replace => { "_security_result.priority_details" => "NNTP" } } 
                }
                if [_syslog_priority] =~ /64|65|66|67|68|69|70|71/ { 
                    mutate { replace => { "_security_result.priority_details" => "UUCP" } } 
                }
                if [_syslog_priority] =~ /72|73|74|75|76|77|78|79/ { 
                    mutate { replace => { "_security_result.priority_details" => "TIME" } } 
                }
                if [_syslog_priority] =~ /80|81|82|83|84|85|86|87/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /88|89|90|91|92|93|94|95/ { 
                    mutate { replace => { "_security_result.priority_details" => "FTPD" } } 
                }
                if [_syslog_priority] =~ /96|97|98|99|100|101|102|103/ { 
                    mutate { replace => { "_security_result.priority_details" => "NTPD" } } 
                }
                if [_syslog_priority] =~ /104|105|106|107|108|109|110|111/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGAUDIT" } } 
                }
                if [_syslog_priority] =~ /112|113|114|115|116|117|118|119/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGALERT" } } 
                }
                if [_syslog_priority] =~ /120|121|122|123|124|125|126|127/ { 
                    mutate { replace => { "_security_result.priority_details" => "CLOCK" } } 
                }
                if [_syslog_priority] =~ /128|129|130|131|132|133|134|135/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL0" } } 
                }
                if [_syslog_priority] =~ /136|137|138|139|140|141|142|143/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL1" } } 
                }
                if [_syslog_priority] =~ /144|145|146|147|148|149|150|151/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL2" } } 
                }
                if [_syslog_priority] =~ /152|153|154|155|156|157|158|159/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL3" } } 
                }
                if [_syslog_priority] =~ /160|161|162|163|164|165|166|167/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL4" } } 
                }
                if [_syslog_priority] =~ /168|169|170|171|172|173|174|175/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL5" } } 
                }
                if [_syslog_priority] =~ /176|177|178|179|180|181|182|183/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL6" } } 
                }
                if [_syslog_priority] =~ /184|185|186|187|188|189|190|191/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL7" } } 
                }
                mutate {
                    merge => {
                        "event.idm.read_only_udm.security_result" => "_security_result"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event"
                }
            }
        }
    }
    

    L'affichage des résultats de l'extension d'analyseur affiche le format lisible.

    metadata.product_log_id = "6161053"
    metadata.event_timestamp = "2024-10-31T15:10:10Z"
    metadata.event_type = "PROCESS_LAUNCH"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "PowerShell"
    metadata.product_event_type = "600"
    metadata.description = "Info"
    metadata.log_type = "POWERSHELL"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.resource.name = "in_powershell"
    principal.resource.resource_subtype = "im_msvistalog"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.hostname = "Default Host"
    target.process.command_line = "C:\Program Files\Microsoft Azure AD Sync\Bin\miiserver.exe"
    target.asset.hostname = "Default Host"
    target.asset.asset_id = "Host ID:bf203e94-72cf-4649-84a5-fc02baedb75f"
    security_result[0].severity_details = "INFORMATIONAL"
    security_result[0].priority_details = "USER"
    

Extrait de code (et Grok) : décoration d'événements, noms de variables temporaires et conversion de types de données

Exemples d'attributs:

  • Format de la source de journal: JSON avec un en-tête Syslog
  • Approche de mappage des données: extrait de code à l'aide de Grok
  • Type de journal: WINDOWS_SYSMON
  • Objet de l'extension d'analyseur: décoration des événements, des noms de variables temporaires et des types de données.
  • Description :

    Cet exemple montre comment effectuer les actions suivantes lors de la création d'une extension d'analyseur:

    Décoration basée sur une instruction conditionnelle

    Cet exemple ajoute (informations contextuelles) des explications sur la signification de chaque type d'événement dans WINDOWS_SYSMON. Il utilise une instruction conditionnelle pour vérifier l'ID d'événement, puis ajoute un Description. Par exemple, EventID 1 est un événement Process Creation.

    Lorsque vous utilisez un filtre d'extraction, par exemple JSON, le type de données d'origine peut être conservé.

    Dans l'exemple suivant, la valeur EventID est extraite en tant qu'entier par défaut. L'instruction conditionnelle évalue la valeur EventID en tant qu'entier, et non en tant que chaîne.

    if [EventID] == 1 {
      mutate {
        replace => {
          "_description" => "[1] Process creation"
        }
      }
    }
    

    Conversion des types de données

    Vous pouvez convertir des types de données dans une extension d'analyseur à l'aide de la fonction convert.

    mutate {
      convert => {
        "EventID" => "string"
      }
      on_error => "_convert_EventID_already_string"
    }
    

    Noms de variables temporaires pour une meilleure lisibilité

    Vous pouvez utiliser des noms de variables temporaires dans des extraits de code, puis les renommer pour qu'ils correspondent au nom de l'objet d'événement UDM de sortie final. Cela peut améliorer la lisibilité globale.

    Dans l'exemple suivant, la variable description est renommée event.idm.read_only_udm.metadata.description:

    mutate {
      rename => {
        "_description" => "event.idm.read_only_udm.metadata.description"
      }
    }
    

    Champs répétés

    L'extension d'analyseur complète est la suivante:

    filter {
    # initialize variable
    mutate {
      replace => {
        "EventID" => ""
      }
    }
    # Use grok to parse syslog messages.
    # The on_error clause handles messages that don't match the pattern.
    grok {
      match => {
        "message" => [
          "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
        ]
      }
      on_error => "not_supported_format"
    }
    if ![not_supported_format] {
      json {
        source => "log_data"
        on_error => "not_json"
      }
      if ![not_json] {
        if [EventID] == 1 {
          mutate {
            replace => {
              "_description" => "[1] Process creation"
            }
          }
        }
        if [EventID] == 2 {
          mutate {
            replace => {
              "_description" => "[2] A process changed a file creation time"
            }
          }
        }
        if [EventID] == 3 {
          mutate {
            replace => {
              "_description" => "[3] Network connection"
            }
          }
        }
        if [EventID] == 4 {
          mutate {
            replace => {
              "_description" => "[4] Sysmon service state changed"
            }
          }
        }
        if [EventID] == 5 {
          mutate {
            replace => {
              "_description" => "[5] Process terminated"
            }
          }
        }
        if [EventID] == 6 {
          mutate {
            replace => {
              "_description" => "[6] Driver loaded"
            }
          }
        }
        if [EventID] == 7 {
          mutate {
            replace => {
              "_description" => "[7] Image loaded"
            }
          }
        }
        if [EventID] == 8 {
          mutate {
            replace => {
              "_description" => "[8] CreateRemoteThread"
            }
          }
        }
        if [EventID] == 9 {
          mutate {
            replace => {
              "_description" => "[9] RawAccessRead"
            }
          }
        }
        if [EventID] == 10 {
          mutate {
            replace => {
              "_description" => "[10] ProcessAccess"
            }
          }
        }
        if [EventID] == 11 {
          mutate {
            replace => {
              "_description" => "[11] FileCreate"
            }
          }
        }
        if [EventID] == 12 {
          mutate {
            replace => {
              "_description" => "[12] RegistryEvent (Object create and delete)"
            }
          }
        }
        if [EventID] == 13 {
          mutate {
            replace => {
              "_description" => "[13] RegistryEvent (Value Set)"
            }
          }
        }
        if [EventID] == 14 {
          mutate {
            replace => {
              "_description" => "[14] RegistryEvent (Key and Value Rename)"
            }
          }
        }
        if [EventID] == 15 {
          mutate {
            replace => {
              "_description" => "[15] FileCreateStreamHash"
            }
          }
        }
        if [EventID] == 16 {
          mutate {
            replace => {
              "_description" => "[16] ServiceConfigurationChange"
            }
          }
        }
        if [EventID] == 17 {
          mutate {
            replace => {
              "_description" => "[17] PipeEvent (Pipe Created)"
            }
          }
        }
        if [EventID] == 18 {
          mutate {
            replace => {
              "_description" => "[18] PipeEvent (Pipe Connected)"
            }
          }
        }
        if [EventID] == 19 {
          mutate {
            replace => {
              "_description" => "[19] WmiEvent (WmiEventFilter activity detected)"
            }
          }
        }
        if [EventID] == 20 {
          mutate {
            replace => {
              "_description" => "[20] WmiEvent (WmiEventConsumer activity detected)"
            }
          }
        }
        if [EventID] == 21 {
          mutate {
            replace => {
              "_description" => "[21] WmiEvent (WmiEventConsumerToFilter activity detected)"
            }
          }
        }
        if [EventID] == 22 {
          mutate {
            replace => {
              "_description" => "[22] DNSEvent (DNS query)"
            }
          }
        }
        if [EventID] == 23 {
          mutate {
            replace => {
              "_description" => "[23] FileDelete (File Delete archived)"
            }
          }
        }
        if [EventID] == 24 {
          mutate {
            replace => {
              "_description" => "[24] ClipboardChange (New content in the clipboard)"
            }
          }
        }
        if [EventID] == 25 {
          mutate {
            replace => {
              "_description" => "[25] ProcessTampering (Process image change)"
            }
          }
        }
        if [EventID] == 26 {
          mutate {
            replace => {
              "_description" => "[26] FileDeleteDetected (File Delete logged)"
            }
          }
        }
        if [EventID] == 255 {
          mutate {
            replace => {
              "_description" => "[255] Error"
            }
          }
        }
        mutate {
          rename => {
            "_description" => "event.idm.read_only_udm.metadata.description"
          }
        }
        statedump{}
        mutate {
          merge => {
            "@output" => "event"
          }
        }
      }
    }
    }
    

    L'exécution de l'extension d'analyseur ajoute la décoration dans le champ metadata.description.

    metadata.product_log_id = "6008459"
    metadata.event_timestamp = "2024-10-31T14:41:53.442Z"
    metadata.event_type = "REGISTRY_CREATION"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Microsoft-Windows-Sysmon"
    metadata.product_event_type = "12"
    metadata.description = "[12] RegistryEvent (Object create and delete)"
    metadata.log_type = "WINDOWS_SYSMON"
    additional.fields["thread_id"] = "3972"
    additional.fields["channel"] = "Microsoft-Windows-Sysmon/Operational"
    additional.fields["Keywords"] = "-9223372036854776000"
    additional.fields["Opcode"] = "Info"
    additional.fields["ThreadID"] = "3972"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.user.userid = "tim.smith_admin"
    principal.user.windows_sid = "S-1-5-18"
    principal.process.pid = "6856"
    principal.process.file.full_path = "C:\Windows\system32\wsmprovhost.exe"
    principal.process.product_specific_process_id = "SYSMON:{927d35bf-a374-6495-f348-000000002900}"
    principal.administrative_domain = "LUNARSTIIINESS"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.registry.registry_key = "HKU\S-1-5-21-3263964631-4121654051-1417071188-1116\Software\Policies\Microsoft\SystemCertificates\CA\Certificates"
    observer.asset_id = "5770385F:C22A:43E0:BF4C:06F5698FFBD9"
    observer.process.pid = "2556"
    about[0].labels[0].key = "Category ID"
    about[0].labels[0].value = "RegistryEvent"
    security_result[0].rule_name = "technique_id=T1553.004,technique_name=Install Root Certificate"
    security_result[0].summary = "Registry object added or deleted"
    security_result[0].severity = "INFORMATIONAL"
    security_result[1].rule_name = "EventID: 12"
    security_result[2].summary = "12"
    

Exemples de code XML

Les exemples suivants montrent comment créer une extension d'analyseur lorsque la source de journal est au format XML.

Extrait de code : extraction de champ arbitraire dans l'objet additional

Exemples d'attributs:

  • Format de la source de journaux: XML
  • Approche de mappage des données: extrait de code
  • Type de journal: WINDOWS_DEFENDER_AV
  • Objet de l'extension d'analyseur: Extraction de champs arbitraires dans l'objet additional
  • Description :

    L'objectif de cet exemple est d'extraire et de stocker la valeur Platform Version, par exemple pour pouvoir créer des rapports et rechercher outdated platform versions.

    Après avoir examiné le document Champs UDM importants, aucun champ UDM standard approprié n'a été identifié. Par conséquent, cet exemple utilisera l'objet additional pour stocker ces informations en tant que paire clé-valeur personnalisée.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Extracting 'Platform Version' into Additional
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/EventData/Data[@Name='Platform version']" => "platform_version"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
            # Prepare the additional field structure using a temporary variable
            mutate{
                replace => {
                    "additional_platform_version.key" => "Platform Version"
                    "additional_platform_version.value.string_value" => "%{platform_version}"
                }
                on_error => "no_platform_version"
            }
            # Merge the additional field into the event1 structure.
            if ![no_platform_version] {
                mutate {
                    merge => {
                        "event1.idm.read_only_udm.additional.fields" => "additional_platform_version"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    L'exécution de PREVIEW UDM OUTPUT indique que le nouveau champ a bien été ajouté.

    metadata.event_timestamp = "2024-10-29T14:08:52Z"
    metadata.event_type = "STATUS_HEARTBEAT"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Windows Defender AV"
    metadata.product_event_type = "MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description = "Endpoint Protection client health report (time in UTC)."
    metadata.log_type = "WINDOWS_DEFENDER_AV"
    additional.fields["Platform Version"] = "4.18.24080.9"
    principal.hostname = "win-dc-01.ad.1823127835827.altostrat.com"
    security_result[0].description = "EventID: 1151"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "LOW"
    

Extrait de code (et Grok) : extraction de champ arbitraire dans le nom d'hôte principal

Exemples d'attributs:

  • Format de la source de journaux: XML
  • Approche de mappage des données: extrait de code à l'aide de Grok
  • Type de journal: WINDOWS_DEFENDER_AV
  • Objet de l'extension d'analyseur: Extraction de champ arbitraire dans le nom d'hôte principal
  • Description :

    L'objectif de cet exemple est d'extraire le Hostname d'un FQDN et d'écraser le champ principal.hostname.

    Cet exemple vérifie si le champ Computer name du journal brut inclut un FQDN. Dans ce cas, il n'extrait que la partie Hostname et écrase le champ Principal Hostname de l'UDM.

    Après avoir examiné l'analyseur et le document sur les champs UDM importants, il est clair que le champ principal.hostname doit être utilisé.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: Extract Hostname from FQDN and overwrite principal.hostname
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/System/Computer" => "hostname"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
      # Extract all characters before the first dot in the hostname variable
            grok {
                match => { "hostname" => "(?<hostname>[^.]+)" }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    Cette extension d'analyseur utilise une instruction Grok pour exécuter une expression régulière afin d'extraire le champ hostname. L'expression régulière elle-même utilise un groupe de capture nommé, ce qui signifie que tout ce qui correspond entre les parenthèses sera stocké dans le champ nommé hostname, correspondant à un ou plusieurs caractères jusqu'à ce qu'il rencontre un point. Cette opération ne capture que le hostname dans un FQDN.

    Toutefois, lorsque vous exécutez la commande PREVIEW UDM OUTPUT, une erreur est renvoyée. Pourquoi ?

    generic::unknown: pipeline.ParseLogEntry failed:
     LOG_PARSING_CBN_ERROR: "generic::internal: pipeline failed: filter grok (2) failed: 
    field\ "hostname\" already exists in data and is not overwritable"
    

    Instruction overwrite Grok

    Dans une instruction Grok, un groupe de capture nommé ne peut pas écraser une variable existante, sauf si elle est explicitement spécifiée à l'aide de l'instruction overwrite. Dans ce scénario, vous pouvez utiliser un nom de variable différent pour le groupe de capture nommé dans l'instruction Grok ou, comme indiqué dans l'exemple d'extrait de code suivant, utiliser l'instruction overwrite pour écraser explicitement la variable hostname existante.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Overwriting principal Hostname
    filter {
      xml {
        source => "message"
          xpath => {
            "/Event/System/Computer" => "hostname"
        }
        on_error => "_xml_error"
      }
      if ![_xml_error] {
        grok {
          match => { "hostname" => "(?<hostname>[^.]+)" }
          overwrite => ["hostname"]
          on_error => "_grok_hostname_error"
        }
        mutate {
          replace => {
            "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
          }
        }
        mutate {
          merge => {
            "@output" => "event1"
          }
        }
      }
    }
    

    L'exécution de PREVIEW UDM OUTPUT à nouveau montre que le nouveau champ a été ajouté après l'extraction de hostname à partir de FQDN.

    metadata.event_timestamp"2024-10-29T14:08:52Z"
    metadata.event_type"STATUS_HEARTBEAT"
    metadata.vendor_name"Microsoft"
    metadata.product_name"Windows Defender AV"
    metadata.product_event_type"MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description"Endpoint Protection client health report (time in UTC)."
    metadata.log_type"WINDOWS_DEFENDER_AV"
    principal.hostname"win-dc-01"
    security_result[0].description"EventID: 1151"
    security_result[0].action[0]"ALLOW"
    security_result[0].severity"LOW"