Esempi di estensioni del parser

Supportato in:

Questo documento fornisce esempi di creazione di estensioni del parser in diversi scenari. Per scoprire di più sulle estensioni del parser, consulta Creare estensioni del parser.

Esempi di estensioni del parser

Utilizza le seguenti tabelle di attributi per trovare rapidamente il codice di esempio di cui hai bisogno.

Esempi di no-code

Formato della sorgente log Titolo di esempio Descrizione Concetti del parser in questo esempio
JSON
(tipo di log: GCP_IDS)
Campi di estrazione Estrai i campi da un log in formato JSON. Senza codice
JSON
(tipo di log: WORKSPACE_ALERTS)
Estrarre i campi con il valore Precondizionale Estrai i campi da un log in formato JSON e normalizzali in un campo UDM ripetuto, con un precondizionatore.

Esempi di snippet di codice

Formato della sorgente log Titolo di esempio Descrizione Concetti del parser in questo esempio
JSON
(tipo di log: GCP_IDS)
Aggiunta dello user agent HTTP
  • Estrai lo user agent dell'analizzatore HTTP di rete e crea un target hostname da requestUrl.
  • Assegna uno spazio dei nomi per assicurarti che vengano eseguiti l'aliasing e l'arricchimento basati sugli asset.
CSV
(tipo di log: MISP_IOC)
Estrazione di campi arbitrari nell'oggetto UDM additional Estrae i campi in UDM > Entità > additional Oggetto UDM > coppia chiave-valore additional Oggetto UDM
Syslog
(tipo di log: POWERSHELL)
Estrazione di priorità e gravità da Syslog Estrai i valori di Facility e Severity di Syslog nei campi Priorità e Gravità del risultato di sicurezza di UDM. In base a Grok
JSON con un'intestazione Syslog
(tipo di log: WINDOWS_SYSMON)
Decorazione basata su un'istruzione condizionale
  • Aggiunge una decorazione (informazioni contestuali) al campo metadata.description in base a un'istruzione condizionale e alla comprensione dei tipi di dati all'interno degli snippet di codice.
  • Quando utilizzi un filtro di estrazione, il tipo di dati originale potrebbe essere mantenuto.
  • Un'istruzione condizionale Grok deve utilizzare il tipo di dati originale per valutare il campo.
  • In base a Grok
  • Istruzione condizionale Grok
  • Il tipo di dati originale di un campo estratto potrebbe essere mantenuto.
  • Un'istruzione condizionale Grok deve utilizzare il tipo di dati originale per valutare il campo.
JSON con un'intestazione Syslog
(tipo di log: WINDOWS_SYSMON)
Convertire i tipi di dati
  • Converti i tipi di dati all'interno di un'estensione dell'analizzatore utilizzando la funzione convert.
  • Utilizza le istruzioni on_error per garantire una corretta gestione degli errori ed evitare errori dell'estensione del parser causati da errori.
  • In base a Grok
  • Converti i tipi di dati
  • Utilizza le istruzioni on_error per gestire gli errori.
JSON con un'intestazione Syslog
(tipo di log: WINDOWS_SYSMON)
Nomi di variabili temporanee per la leggibilità Puoi utilizzare nomi di variabili temporanee negli snippet di codice e rinominarli in un secondo momento in modo che corrispondano al nome dell'oggetto evento UDM dell'output finale. In questo modo, puoi migliorare la leggibilità complessiva.
  • In base a Grok
  • Utilizza nomi di variabili temporanee e rinominale in seguito con i nomi UDM dell'output finale.
JSON con un'intestazione Syslog
(tipo di log: WINDOWS_SYSMON)
Campi ripetuti Fai attenzione quando utilizzi campi ripetuti negli snippet di codice, ad esempio il campo security_result.
XML
(tipo di log: WINDOWS_DEFENDER_AV)
Estrazione di campi arbitrari nell'oggetto additional
  • Estrai e memorizza il valore della versione della piattaforma, ad esempio per poter generare report e cercare versioni della piattaforma obsolete.
  • In questo esempio non è presente un campo UDM standard adatto, pertanto l'oggetto additional viene utilizzato per memorizzare le informazioni come coppia chiave-valore personalizzata.
L'oggetto additional viene utilizzato per memorizzare le informazioni come coppia chiave-valore personalizzata.
XML
(tipo di log: WINDOWS_DEFENDER_AV)
Estrazione di campi arbitrari nel nome host principale
  • Estrai il nome host da un FQDN.
  • L'elaborazione condizionale viene utilizzata per determinare se il campo principal.hostname deve essere sovrascritto.
  • L'istruzione Grok utilizza un'espressione regolare (regex) per estrarre il campo hostname. La regex stessa utilizza un gruppo di cattura denominato, il che significa che qualsiasi corrispondenza all'interno delle parentesi verrà memorizzata nel campo denominato hostname, trovando una corrispondenza con uno o più caratteri finché non viene rilevato un punto. Verrà acquisito solo il hostname all'interno di un FQDN.
Inserzione di Grok overwrite
  • Tuttavia, quando esegui l'OUTPUT PREVIEW UDM, viene restituito un errore: "LOG_PARSING_CBN_ERROR: il campo hostname esiste già nei dati e non è sovrascrivibile".
  • All'interno di un'istruzione Grok, un gruppo di cattura denominato non può sovrascrivere una variabile esistente, a meno che non sia specificato esplicitamente utilizzando l'istruzione overwrite. In questo caso, possiamo utilizzare un nome di variabile diverso per il gruppo di cattura denominato nell'istruzione Grok oppure (come mostrato in questo esempio) utilizzare l'istruzione di sovrascrittura per sovrascrivere esplicitamente la variabile hostname esistente.
  • In base a Grok
  • L'elaborazione condizionale viene utilizzata per determinare se un campo deve essere sovrascritto.
  • Dichiarazione Grok che utilizza espressioni regolari (regex).
  • Statement overwrite Grok

Esempi di JSON

Gli esempi riportati di seguito mostrano come creare un'estensione del parser in cui la fonte di log è in formato JSON.

Senza codice: estrai campi

Esempi di attributi:

  • Formato dell'origine log: JSON
  • Approccio alla mappatura dei dati: senza codice
  • Tipo di log: GCP_IDS
  • Scopo dell'estensione dell'analizzatore sintattico: estrazione dei campi.
  • Descrizione:

    Diversi campi relativi alla rete non vengono estratti. Poiché questo esempio di log è un log strutturato in formato JSON, possiamo utilizzare l'approccio senza codice (Map data fields) per creare l'estensione del parser.

    I campi originali che vogliamo estrarre sono:

    • total_packets (stringa)
    • elapsed_time (stringa)
    • total_bytes (stringa)

    Questa è la voce del log non elaborato di esempio:

    {
    "insertId": "625a41542d64c124e7db097ae0906ccb-1@a3",
    "jsonPayload": {
      "destination_port": "80",
      "application": "incomplete",
      "ip_protocol": "tcp",
      "network": "projects/prj-p-shared-base/global/networks/shared-vpc-production",
      "start_time": "2024-10-29T21:14:59Z",
      "source_port": "41936",
      "source_ip_address": "35.191.200.157",
      "total_packets": "6",
      "elapsed_time": "0",
      "destination_ip_address": "192.168.0.11",
      "total_bytes": "412",
      "repeat_count": "1",
      "session_id": "1289742"
    },
    "resource": {
      "type": "ids.googleapis.com/Endpoint",
      "labels": {
        "resource_container": "projects/12345678910",
        "location": "europe-west4-a",
        "id": "p-europe-west4"
      }
    },
    "timestamp": "2024-10-29T21:15:21Z",
    "logName": "projects/prj-p-shared-base/logs/ids.googleapis.com%2Ftraffic",
    "receiveTimestamp": "2024-10-29T21:15:24.051990717Z"
    }
    

    L'esempio utilizza l'approccio senza codice per creare un'estensione del parser utilizzando la seguente mappatura dei campi di dati:

    Percorso precondizionale Operatore precondizione Precondition Value Percorso dati non elaborati Campo Destinazione*
    jsonPayload.total_bytes NON_È_UGUALE "" jsonPayload.total_bytes udm.principal.network.received_bytes
    jsonPayload.elapsed_time NON_È_UGUALE "" jsonPayload.elapsed_time udm.principal.network.session_duration.seconds
    jsonPayload.total_packets NON_È_UGUALE "" jsonPayload.total_packets udm.principal.network.received_packets

    L'esecuzione dell'estensione del parser aggiunge correttamente i tre campi estratti all'oggetto principal.network.

    metadata.product_log_id = "625a41542d64c124e7db097ae0906ccb-1@a3"
    metadata.event_timestamp = "2024-10-29T21:14:59Z"
    metadata.event_type = "NETWORK_CONNECTION"
    metadata.vendor_name = "Google Cloud"
    metadata.product_name = "IDS"
    metadata.ingestion_labels[0].key = "label"
    metadata.ingestion_labels[0].value = "GCP_IDS"
    metadata.log_type = "GCP_IDS"
    principal.ip[0] = "35.191.200.157"
    principal.port = 41936
    principal.network.received_bytes = 412
    principal.network.session_duration.seconds = "0s"
    principal.network.received_packets = 6
    target.ip[0] = "192.168.0.11"
    target.port = 80
    target.application = "incomplete"
    observer.location.country_or_region = "EUROPE"
    observer.location.name = "europe-west4-a"
    observer.resource.name = "projects/12345678910"
    observer.resource.resource_type = "CLOUD_PROJECT"
    observer.resource.attribute.cloud.environment = "GOOGLE_CLOUD_PLATFORM"
    observer.resource.product_object_id = "p-europe-west4"
    network.ip_protocol = "TCP"
    network.session_id = "1289742"
    

Senza codice: estrarre i campi con il valore della precondizione

Esempi di attributi:

  • Formato dell'origine log: JSON
  • Approccio alla mappatura dei dati: senza codice
  • Tipo di log: WORKSPACE_ALERTS
  • Scopo dell'estensione dell'analizzatore sintattico: estrarre i campi con il valore della condizione precedente.
  • Descrizione:

    L'interprete originale non estrae il email address dell'utente principale interessato da un avviso DLP (Data Loss Prevention).

    Questo esempio utilizza un'estensione del parser senza codice per estrarre il email address e normalizzarlo in un campo UDM ripetuto, con una precondizione.

    Quando utilizzi campi ripetuti in un'estensione del parser senza codice, devi indicare se vuoi:

    • replace (sostituisce tutti i valori dei campi ripetuti nell' oggetto UDM esistente) oppure
    • append (appende i valori estratti ai campi ripetuti).

    Per ulteriori dettagli, vedi la sezione Campi ripetuti.

    Questo esempio sostituisce gli eventuali indirizzi email esistenti nel campo principal.user.email_address normalizzato.

    I precondizionali ti consentono di eseguire controlli condizionali prima di eseguire un'operazione di estrazione. Nella maggior parte dei casi, il campo Precondizione sarà lo stesso del campo Dati non elaborati da estrarre, con un Operatore precondizione pari a not Null, ad esempio foo != "".

    Tuttavia, a volte, come nel nostro esempio, il valore del campo di dati non elaborati che vuoi estrarre non è presente in tutte le voci di log. In questo caso, puoi utilizzare un altro campo Precondizione per filtrare l'operazione di estrazione. Nel nostro esempio, il campo triggeringUserEmail non elaborato che vuoi estrarre è presente solo nei log in cui type = Data Loss Prevention.

    Di seguito sono riportati i valori di esempio da inserire nei campi dell'estensione del parser senza codice:

    Percorso precondizionale Operatore precondizione Precondition Value Percorso dati non elaborati Campo Destinazione*
    type UGUALE Prevenzione della perdita di dati data.ruleViolationInfo.triggeringUserEmail udm.principal.user.email_addresses

    L'esempio seguente mostra i campi dell'estensione del parser senza codice compilati con i valori di esempio:

    immagine2

    L'esecuzione dell'estensione del parser aggiunge email_address all'oggetto principal.user.

    metadata.product_log_id = "Ug71LGqBr6Q="
    metadata.event_timestamp = "2022-12-18T12:17:35.154368Z"
    metadata.event_type = "USER_UNCATEGORIZED"
    metadata.vendor_name = "Google Workspace"
    metadata.product_name = "Google Workspace Alerts"
    metadata.product_event_type = "DlpRuleViolation"
    metadata.log_type = "WORKSPACE_ALERTS"
    additional.fields["resource_title"] = "bq-results-20221215-112933-1671103787123.csv"
    principal.user.email_addresses[0] = "foo.bar@altostrat.com"
    target.resource.name = "DRIVE"
    target.resource.resource_type = "STORAGE_OBJECT"
    target.resource.product_object_id = "1wLteoF3VHljS_8_ABCD_VVbhFTfcTQplJ5k1k7cL4r8"
    target.labels[0].key = "resource_title"
    target.labels[0].value = "bq-results-20221321-112933-1671103787697.csv"
    about[0].resource.resource_type = "CLOUD_ORGANIZATION"
    about[0].resource.product_object_id = "C01abcde2"
    security_result[0].about.object_reference.id = "ODU2NjEwZTItMWE2YS0xMjM0LWJjYzAtZTJlMWU2YWQzNzE3"
    security_result[0].category_details[0] = "Data Loss Prevention"
    security_result[0].rule_name = "Sensitive Projects Match"
    security_result[0].summary = "Data Loss Prevention"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "MEDIUM"
    security_result[0].rule_id = "rules/00abcdxs183abcd"
    security_result[0].action_details = "ALERT, DRIVE_WARN_ON_EXTERNAL_SHARING"
    security_result[0].alert_state = "ALERTING"
    security_result[0].detection_fields[0].key = "start_time"
    security_result[0].detection_fields[0].value = "2022-12-18T12:17:35.154368Z"
    security_result[0].detection_fields[1].key = "status"
    security_result[0].detection_fields[1].value = "NOT_STARTED"
    security_result[0].detection_fields[2].key = "trigger"
    security_result[0].detection_fields[2].value = "DRIVE_SHARE"
    security_result[0].rule_labels[0].key = "detector_name"
    security_result[0].rule_labels[0].value = "EMAIL_ADDRESS"
    network.email.to[0] = "foo.bar@altostrat.com"
    

Snippet di codice: aggiunta dello user agent HTTP

Esempi di attributi:

  • Formato dell'origine log: JSON
  • Approccio alla mappatura dei dati: snippet di codice
  • Tipo di log: GCP_IDS
  • Scopo dell'estensione dell'analizzatore: aggiunta dello user agent HTTP.
  • Descrizione:

    Questo è un esempio di tipo di oggetto UDM non standard che non è supportato dall'approccio no-code e pertanto richiede l'utilizzo di uno snippet di codice. L'analizzatore sintattico predefinito non estrae l'analisi Network HTTP Parser User Agent. Inoltre, per coerenza:

    1. Verrà creato un Target Hostname dal requestUrl.
    2. Verrà assegnato un Namespace per garantire l'esecuzione di aliasing e arricchimento basati sugli asset.
    # GCP_LOADBALANCING
    # owner: @owner
    # updated: 2022-12-23
    # Custom parser extension that:
    # 1) adds consistent Namespace 
    # 2) adds Parsed User Agent Object 
    filter {
        # Initialize placeholder
        mutate {
            replace => {
                "httpRequest.userAgent" => ""
                "httpRequest.requestUrl" => ""
            }
        }
        json {
            on_error => "not_json"
            source => "message"
            array_function => "split_columns"
        }
        if ![not_json] {
          #1 - Override Namespaces
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.target.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.src.namespace" => "TMO"
                }
            }
            #2 - Parsed User Agent
            if [httpRequest][requestUrl]!= "" {
                grok {
                    match => {
                        "httpRequest.requestUrl" => ["\/\/(?P<_hostname>.*?)\/"]
                    }
                    on_error => "_grok_hostname_failed"
                }
                if ![_grok_hostname_failed] {
                    mutate {
                        replace => {
                            "event1.idm.read_only_udm.target.hostname" => "%{_hostname}"
                        }
                    }
                }
            }
            if [httpRequest][userAgent] != "" {
                mutate {
                    convert => {
                        "httpRequest.userAgent" => "parseduseragent"
                    }
                }
                #Map the converted "user_agent" to the new UDM field "http.parsed_user_agent".
                mutate {
                    rename => {
                        "httpRequest.userAgent" => "event1.idm.read_only_udm.network.http.parsed_user_agent"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

Esempio di file CSV

L'esempio seguente mostra come creare un'estensione del parser in cui la fonte del log è in formato CSV.

Snippet di codice: estrazione di campi arbitrari nell'oggetto additional

Esempi di attributi:

  • Formato dell'origine log: CSV
  • Approccio alla mappatura dei dati: snippet di codice
  • Tipo di log: MISP_IOC
  • Scopo dell'estensione dell'analizzatore: estrazione di campi arbitrari nell'oggetto additional.
  • Descrizione:

    In questo esempio viene utilizzata l'integrazione del contesto dell'entità UDM MISP_IOC. L'additional oggetto UDM con coppia chiave-valore verrà utilizzato per acquisire informazioni contestuali non estratte dal parser predefinito e per aggiungere campi specifici per ogni organizzazione. Ad esempio, un URL che rimandi alla sua istanza specifica del sistema MISP.

    Questa è l'origine log basata su CSV per questo esempio:

    1 9d66d38a-14e1-407f-a4d1-90b82aa1d59f
    2 3908
    3 Network activity
    4 ip-dst
    5 117.253.154.123
    6
    7
    8 1687894564
    9
    10
    11
    12
    13
    14 DigitalSide Malware report\: MD5\: 59ce0baba11893f90527fc951ac69912
    15 ORGNAME
    16 DIGITALSIDE.IT
    17 0
    18 Medium
    19 0
    20 2023-06-23
    21 tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch
    22 1698036218

    immagine

    # MISP_IOC
    # owner: @owner
    # updated: 2024-06-21
    # Custom parser extension that:
    # 1) adds a link back to internal MISP tenant 
    # 2) extracts missing fields into UDM > Entity > Additional fields
    filter {
        # Set the base URL for MISP. Remember to replace this placeholder!
        mutate {
            replace => {
                "misp_base_url" => "https://<YOUR_MISP_URL>"
            }
        }
        # Parse the CSV data from the 'message' field. Uses a comma as the separator.
        # The 'on_error' option handles lines that are not properly formatted CSV.
        csv {
            source => "message"
            separator => ","
            on_error => "broken_csv"
        }
        # If the CSV parsing was successful...
        if ![broken_csv] {
            # Rename the CSV columns to more descriptive names.
            mutate {
                rename => {
                    "column2" => "event_id"
                    "column8" => "object_timestamp"
                    "column16" => "event_source_org"
                    "column17" => "event_distribution"
                    "column19" => "event_analysis"
                    "column22" => "attribute_timestamp"
                }
            }
        }
        # Add a link to view the event in MISP, if an event ID is available.
        # "column2" => "event_id"
        if [event_id] != "" {
            mutate {
                replace => {
                    "additional_url.key" => "view_in_misp"
                    "additional_url.value.string_value" => "%{misp_base_url}/events/view/%{event_id}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_url"
                }
            }
        }
        # Add the object timestamp as an additional field, if available.
        # "column8" => "object_timestamp"
        if [object_timestamp] != "" {
            mutate {
                replace => {
                    "additional_object_timestamp.key" => "object_timestamp"
                    "additional_object_timestamp.value.string_value" => "%{object_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_object_timestamp"
                }
            }
        }
        # Add the event source organization as an additional field, if available.
        # "column16" => "event_source_org"
        if [event_source_org] != "" {
            mutate {
                replace => {
                    "additional_event_source_org.key" => "event_source_org"
                    "additional_event_source_org.value.string_value" => "%{event_source_org}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_source_org"
                }
            }
        }
        # Add the event distribution level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column17" => "event_distribution"
        if [event_distribution] != "" {
            if [event_distribution] == "0" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "YOUR_ORGANIZATION_ONLY"
                    }
                }
            } else if [event_distribution] == "1" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "THIS_COMMUNITY_ONLY"
                    }
                }
            } else if [event_distribution] == "2" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "CONNECTED_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "3" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "ALL_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "4" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "SHARING_GROUP"
                    }
                }
            } else if [event_distribution] == "5" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "INHERIT_EVENT"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_distribution.key" => "event_distribution"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_distribution"
                }
            }
        }
        # Add the event analysis level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column19" => "event_analysis"
        if [event_analysis] != "" {
            if [event_analysis] == "0" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "INITIAL"
                    }
                }
            } else if [event_analysis] == "1" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "ONGOING"
                    }
                }
            } else if [event_analysis] == "2" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "COMPLETE"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_analysis.key" => "event_analysis"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_analysis"
                }
            }
        }
        # Add the attribute timestamp as an additional field, if available.
        # "column22" => "attribute_timestamp" 
        if [attribute_timestamp] != "" {
            mutate {
                replace => {
                    "additional_attribute_timestamp.key" => "attribute_timestamp"
                    "additional_attribute_timestamp.value.string_value" => "%{attribute_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_attribute_timestamp"
                }
            }
        }
        # Finally, merge the 'event' data into the '@output' field.
        mutate {
            merge => {
                "@output" => "event"
            }
        }
    }
    

    L'esecuzione dell'estensione del parser aggiunge correttamente i campi personalizzati dal file CSV all'oggetto additional.

    metadata.product_entity_id = "9d66d38a-14e1-407f-a4d1-90b82aa1d59f"
    metadata.collected_timestamp = "2024-10-31T15:16:08Z"
    metadata.vendor_name = "MISP"
    metadata.product_name = "MISP"
    metadata.entity_type = "IP_ADDRESS"
    metadata.description = "ip-dst"
    metadata.interval.start_time = "2023-06-27T19:36:04Z"
    metadata.interval.end_time = "9999-12-31T23:59:59Z"
    metadata.threat[0].category_details[0] = "Network activity"
    metadata.threat[0].description = "tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch - additional info: DigitalSide Malware report: MD5: 59ce0baba11893f90527fc951ac69912"
    metadata.threat[0].severity_details = "Medium"
    metadata.threat[0].threat_feed_name = "DIGITALSIDE.IT"
    entity.ip[0] = "117.253.154.123"
    additional.fields["view_in_misp"] = "https:///events/view/3908"
    additional.fields["object_timestamp"] = "1687894564"
    additional.fields["event_source_org"] = "DIGITALSIDE.IT"
    additional.fields["event_distribution"] = "YOUR_ORGANIZATION_ONLY"
    additional.fields["event_analysis"] = "INITIAL"
    additional.fields["attribute_timestamp"] = "1698036218"
    

Esempi di Grok

Gli esempi riportati di seguito mostrano come creare estensioni di parser basate su Grok.

Snippet di codice (e Grok) - Estrazione di priorità e gravità

Esempi di attributi:

  • Formato della sorgente log: Syslog
  • Approccio alla mappatura dei dati: snippet di codice che utilizza Grok
  • Tipo di log: POWERSHELL
  • Scopo dell'estensione dell'analizzatore: estrazione di priorità e gravità.
  • Descrizione:

    In questo esempio viene creata un'estensione di analisi basata su Grok per estrarre i valori di Facility e Severity di Syslog nei campi Risultato di sicurezza UDM Priority e Severity.

    filter {
        # Use grok to parse syslog messages. The on_error clause handles messages that don't match the pattern.
        grok {
            match => {
                "message" => [
                    # Extract message with syslog headers.
                    "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
                ]
            }
            on_error => "not_supported_format"
        }
        # If the grok parsing failed, tag the event as unsupported and drop it.
        if ![not_supported_format] {
            if [_syslog_priority] != "" {
                if [_syslog_priority] =~ /0|8|16|24|32|40|48|56|64|72|80|88|96|104|112|120|128|136|144|152|160|168|176|184/ {
                    mutate { replace => { "_security_result.severity_details" => "EMERGENCY" } } 
                }
                if [_syslog_priority] =~ /1|9|17|25|33|41|49|57|65|73|81|89|97|105|113|121|129|137|145|153|161|169|177|185/ {
                    mutate { replace => { "_security_result.severity_details" => "ALERT" } } 
                }
                if [_syslog_priority] =~ /2|10|18|26|34|42|50|58|66|74|82|90|98|106|114|122|130|138|146|154|162|170|178|186/ {
                    mutate { replace => { "_security_result.severity_details" => "CRITICAL" } }
                }
                if [_syslog_priority] =~ /3|11|19|27|35|43|51|59|67|75|83|91|99|107|115|123|131|139|147|155|163|171|179|187/ {
                    mutate { replace => { "_security_result.severity_details" => "ERROR" } }
                }
                if [_syslog_priority] =~ /4|12|20|28|36|44|52|60|68|76|84|92|100|108|116|124|132|140|148|156|164|172|180|188/ {
                    mutate { replace => { "_security_result.severity_details" => "WARNING" } }
                }
                if [_syslog_priority] =~ /5|13|21|29|37|45|53|61|69|77|85|93|101|109|117|125|133|141|149|157|165|173|181|189/ {
                    mutate { replace => { "_security_result.severity_details" => "NOTICE" } }
                }
                if [_syslog_priority] =~ /6|14|22|30|38|46|54|62|70|78|86|94|102|110|118|126|134|142|150|158|166|174|182|190/ {
                    mutate { replace => { "_security_result.severity_details" => "INFORMATIONAL" } }
                }
                if [_syslog_priority] =~ /7|15|23|31|39|47|55|63|71|79|87|95|103|111|119|127|135|143|151|159|167|175|183|191/ {
                    mutate { replace => { "_security_result.severity_details" => "DEBUG" } }
                }
                # Facilities (mapped to priority)
                if [_syslog_priority] =~ /0|1|2|3|4|5|6|7/ { 
                    mutate { replace => { "_security_result.priority_details" => "KERNEL" } } 
                }
                if [_syslog_priority] =~ /8|9|10|11|12|13|14|15/ { 
                    mutate { replace => { "_security_result.priority_details" => "USER" } } 
                }
                if [_syslog_priority] =~ /16|17|18|19|20|21|22|23/ { 
                    mutate { replace => { "_security_result.priority_details" => "MAIL" } } 
                }
                if [_syslog_priority] =~ /24|25|26|27|28|29|30|31/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSTEM" } } 
                }
                if [_syslog_priority] =~ /32|33|34|35|36|37|38|39/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /40|41|42|43|44|45|46|47/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSLOG" } } 
                }
                if [_syslog_priority] =~ /48|49|50|51|52|53|54|55/ { 
                    mutate { replace => { "_security_result.priority_details" => "LPD" } } 
                }
                if [_syslog_priority] =~ /56|57|58|59|60|61|62|63/ { 
                    mutate { replace => { "_security_result.priority_details" => "NNTP" } } 
                }
                if [_syslog_priority] =~ /64|65|66|67|68|69|70|71/ { 
                    mutate { replace => { "_security_result.priority_details" => "UUCP" } } 
                }
                if [_syslog_priority] =~ /72|73|74|75|76|77|78|79/ { 
                    mutate { replace => { "_security_result.priority_details" => "TIME" } } 
                }
                if [_syslog_priority] =~ /80|81|82|83|84|85|86|87/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /88|89|90|91|92|93|94|95/ { 
                    mutate { replace => { "_security_result.priority_details" => "FTPD" } } 
                }
                if [_syslog_priority] =~ /96|97|98|99|100|101|102|103/ { 
                    mutate { replace => { "_security_result.priority_details" => "NTPD" } } 
                }
                if [_syslog_priority] =~ /104|105|106|107|108|109|110|111/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGAUDIT" } } 
                }
                if [_syslog_priority] =~ /112|113|114|115|116|117|118|119/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGALERT" } } 
                }
                if [_syslog_priority] =~ /120|121|122|123|124|125|126|127/ { 
                    mutate { replace => { "_security_result.priority_details" => "CLOCK" } } 
                }
                if [_syslog_priority] =~ /128|129|130|131|132|133|134|135/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL0" } } 
                }
                if [_syslog_priority] =~ /136|137|138|139|140|141|142|143/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL1" } } 
                }
                if [_syslog_priority] =~ /144|145|146|147|148|149|150|151/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL2" } } 
                }
                if [_syslog_priority] =~ /152|153|154|155|156|157|158|159/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL3" } } 
                }
                if [_syslog_priority] =~ /160|161|162|163|164|165|166|167/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL4" } } 
                }
                if [_syslog_priority] =~ /168|169|170|171|172|173|174|175/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL5" } } 
                }
                if [_syslog_priority] =~ /176|177|178|179|180|181|182|183/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL6" } } 
                }
                if [_syslog_priority] =~ /184|185|186|187|188|189|190|191/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL7" } } 
                }
                mutate {
                    merge => {
                        "event.idm.read_only_udm.security_result" => "_security_result"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event"
                }
            }
        }
    }
    

    La visualizzazione dei risultati dell'estensione dell'analizzatore sintattico mostra il formato leggibile.

    metadata.product_log_id = "6161053"
    metadata.event_timestamp = "2024-10-31T15:10:10Z"
    metadata.event_type = "PROCESS_LAUNCH"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "PowerShell"
    metadata.product_event_type = "600"
    metadata.description = "Info"
    metadata.log_type = "POWERSHELL"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.resource.name = "in_powershell"
    principal.resource.resource_subtype = "im_msvistalog"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.hostname = "Default Host"
    target.process.command_line = "C:\Program Files\Microsoft Azure AD Sync\Bin\miiserver.exe"
    target.asset.hostname = "Default Host"
    target.asset.asset_id = "Host ID:bf203e94-72cf-4649-84a5-fc02baedb75f"
    security_result[0].severity_details = "INFORMATIONAL"
    security_result[0].priority_details = "USER"
    

Snippet di codice (e Grok) - Decorazione di eventi, nomi di variabili temporanee e conversione del tipo di dati

Esempi di attributi:

  • Formato dell'origine log: JSON con un'intestazione Syslog
  • Approccio alla mappatura dei dati: snippet di codice che utilizza Grok
  • Tipo di log: WINDOWS_SYSMON
  • Scopo dell'estensione dell'analizzatore: decorazione di eventi, nomi di variabili temporanee e tipi di dati.
  • Descrizione:

    Questo esempio mostra come eseguire le seguenti azioni durante la creazione di un'estensione del parser:

    Decorazione basata su un'istruzione condizionale

    Questo esempio aggiunge (informazioni contestuali) spiegazioni del significato di ciascun tipo di evento in WINDOWS_SYSMON. Utilizza un'istruzione condizionale per controllare EventID, quindi aggiunge un Description, ad esempio EventID 1 è un evento Process Creation.

    Quando utilizzi un filtro di estrazione, ad esempio JSON, il tipo di dati originale potrebbe essere mantenuto.

    Nell'esempio seguente, il valore EventID viene estratto come numero intero per impostazione predefinita. L'istruzione condizionale valuta il valore EventID come un intero e non come una stringa.

    if [EventID] == 1 {
      mutate {
        replace => {
          "_description" => "[1] Process creation"
        }
      }
    }
    

    Conversione del tipo di dati

    Puoi convertire i tipi di dati all'interno di un'estensione del parser utilizzando la funzione convert.

    mutate {
      convert => {
        "EventID" => "string"
      }
      on_error => "_convert_EventID_already_string"
    }
    

    Nomi variabili temporanei per la leggibilità

    Puoi utilizzare nomi di variabili temporanee negli snippet di codice e rinominarli in un secondo momento in modo che corrispondano al nome dell'oggetto evento UDM dell'output finale. Ciò può contribuire a migliorare la leggibilità complessiva.

    Nell'esempio seguente, la variabile description viene rinominata event.idm.read_only_udm.metadata.description:

    mutate {
      rename => {
        "_description" => "event.idm.read_only_udm.metadata.description"
      }
    }
    

    Campi ripetuti

    L'estensione del parser completa è la seguente:

    filter {
    # initialize variable
    mutate {
      replace => {
        "EventID" => ""
      }
    }
    # Use grok to parse syslog messages.
    # The on_error clause handles messages that don't match the pattern.
    grok {
      match => {
        "message" => [
          "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
        ]
      }
      on_error => "not_supported_format"
    }
    if ![not_supported_format] {
      json {
        source => "log_data"
        on_error => "not_json"
      }
      if ![not_json] {
        if [EventID] == 1 {
          mutate {
            replace => {
              "_description" => "[1] Process creation"
            }
          }
        }
        if [EventID] == 2 {
          mutate {
            replace => {
              "_description" => "[2] A process changed a file creation time"
            }
          }
        }
        if [EventID] == 3 {
          mutate {
            replace => {
              "_description" => "[3] Network connection"
            }
          }
        }
        if [EventID] == 4 {
          mutate {
            replace => {
              "_description" => "[4] Sysmon service state changed"
            }
          }
        }
        if [EventID] == 5 {
          mutate {
            replace => {
              "_description" => "[5] Process terminated"
            }
          }
        }
        if [EventID] == 6 {
          mutate {
            replace => {
              "_description" => "[6] Driver loaded"
            }
          }
        }
        if [EventID] == 7 {
          mutate {
            replace => {
              "_description" => "[7] Image loaded"
            }
          }
        }
        if [EventID] == 8 {
          mutate {
            replace => {
              "_description" => "[8] CreateRemoteThread"
            }
          }
        }
        if [EventID] == 9 {
          mutate {
            replace => {
              "_description" => "[9] RawAccessRead"
            }
          }
        }
        if [EventID] == 10 {
          mutate {
            replace => {
              "_description" => "[10] ProcessAccess"
            }
          }
        }
        if [EventID] == 11 {
          mutate {
            replace => {
              "_description" => "[11] FileCreate"
            }
          }
        }
        if [EventID] == 12 {
          mutate {
            replace => {
              "_description" => "[12] RegistryEvent (Object create and delete)"
            }
          }
        }
        if [EventID] == 13 {
          mutate {
            replace => {
              "_description" => "[13] RegistryEvent (Value Set)"
            }
          }
        }
        if [EventID] == 14 {
          mutate {
            replace => {
              "_description" => "[14] RegistryEvent (Key and Value Rename)"
            }
          }
        }
        if [EventID] == 15 {
          mutate {
            replace => {
              "_description" => "[15] FileCreateStreamHash"
            }
          }
        }
        if [EventID] == 16 {
          mutate {
            replace => {
              "_description" => "[16] ServiceConfigurationChange"
            }
          }
        }
        if [EventID] == 17 {
          mutate {
            replace => {
              "_description" => "[17] PipeEvent (Pipe Created)"
            }
          }
        }
        if [EventID] == 18 {
          mutate {
            replace => {
              "_description" => "[18] PipeEvent (Pipe Connected)"
            }
          }
        }
        if [EventID] == 19 {
          mutate {
            replace => {
              "_description" => "[19] WmiEvent (WmiEventFilter activity detected)"
            }
          }
        }
        if [EventID] == 20 {
          mutate {
            replace => {
              "_description" => "[20] WmiEvent (WmiEventConsumer activity detected)"
            }
          }
        }
        if [EventID] == 21 {
          mutate {
            replace => {
              "_description" => "[21] WmiEvent (WmiEventConsumerToFilter activity detected)"
            }
          }
        }
        if [EventID] == 22 {
          mutate {
            replace => {
              "_description" => "[22] DNSEvent (DNS query)"
            }
          }
        }
        if [EventID] == 23 {
          mutate {
            replace => {
              "_description" => "[23] FileDelete (File Delete archived)"
            }
          }
        }
        if [EventID] == 24 {
          mutate {
            replace => {
              "_description" => "[24] ClipboardChange (New content in the clipboard)"
            }
          }
        }
        if [EventID] == 25 {
          mutate {
            replace => {
              "_description" => "[25] ProcessTampering (Process image change)"
            }
          }
        }
        if [EventID] == 26 {
          mutate {
            replace => {
              "_description" => "[26] FileDeleteDetected (File Delete logged)"
            }
          }
        }
        if [EventID] == 255 {
          mutate {
            replace => {
              "_description" => "[255] Error"
            }
          }
        }
        mutate {
          rename => {
            "_description" => "event.idm.read_only_udm.metadata.description"
          }
        }
        statedump{}
        mutate {
          merge => {
            "@output" => "event"
          }
        }
      }
    }
    }
    

    L'esecuzione dell'estensione del parser aggiunge correttamente la decorazione al metadata.description campo.

    metadata.product_log_id = "6008459"
    metadata.event_timestamp = "2024-10-31T14:41:53.442Z"
    metadata.event_type = "REGISTRY_CREATION"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Microsoft-Windows-Sysmon"
    metadata.product_event_type = "12"
    metadata.description = "[12] RegistryEvent (Object create and delete)"
    metadata.log_type = "WINDOWS_SYSMON"
    additional.fields["thread_id"] = "3972"
    additional.fields["channel"] = "Microsoft-Windows-Sysmon/Operational"
    additional.fields["Keywords"] = "-9223372036854776000"
    additional.fields["Opcode"] = "Info"
    additional.fields["ThreadID"] = "3972"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.user.userid = "tim.smith_admin"
    principal.user.windows_sid = "S-1-5-18"
    principal.process.pid = "6856"
    principal.process.file.full_path = "C:\Windows\system32\wsmprovhost.exe"
    principal.process.product_specific_process_id = "SYSMON:{927d35bf-a374-6495-f348-000000002900}"
    principal.administrative_domain = "LUNARSTIIINESS"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.registry.registry_key = "HKU\S-1-5-21-3263964631-4121654051-1417071188-1116\Software\Policies\Microsoft\SystemCertificates\CA\Certificates"
    observer.asset_id = "5770385F:C22A:43E0:BF4C:06F5698FFBD9"
    observer.process.pid = "2556"
    about[0].labels[0].key = "Category ID"
    about[0].labels[0].value = "RegistryEvent"
    security_result[0].rule_name = "technique_id=T1553.004,technique_name=Install Root Certificate"
    security_result[0].summary = "Registry object added or deleted"
    security_result[0].severity = "INFORMATIONAL"
    security_result[1].rule_name = "EventID: 12"
    security_result[2].summary = "12"
    

Esempi di XML

Gli esempi riportati di seguito mostrano come creare un'estensione del parser in cui la fonte del log è in formato XML.

Snippet di codice: estrazione di un campo arbitrario nell'oggetto additional

Esempi di attributi:

  • Formato dell'origine log: XML
  • Approccio alla mappatura dei dati: snippet di codice
  • Tipo di log: WINDOWS_DEFENDER_AV
  • Scopo dell'estensione del parser: Estrazione di campi arbitrari nell'oggetto additional
  • Descrizione:

    Lo scopo di questo esempio è estrarre e memorizzare il valore Platform Version, ad esempio per poter generare report e cercare outdated platform versions.

    Dopo aver esaminato il documento Campi UDM importanti, non è stato identificato alcun campo UDM standard adatto. Pertanto, in questo esempio verrà utilizzato l'oggetto additional per memorizzare queste informazioni come coppia chiave-valore personalizzata.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Extracting 'Platform Version' into Additional
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/EventData/Data[@Name='Platform version']" => "platform_version"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
            # Prepare the additional field structure using a temporary variable
            mutate{
                replace => {
                    "additional_platform_version.key" => "Platform Version"
                    "additional_platform_version.value.string_value" => "%{platform_version}"
                }
                on_error => "no_platform_version"
            }
            # Merge the additional field into the event1 structure.
            if ![no_platform_version] {
                mutate {
                    merge => {
                        "event1.idm.read_only_udm.additional.fields" => "additional_platform_version"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    L'esecuzione di PREVIEW UDM OUTPUT mostra che il nuovo campo è stato aggiunto correttamente.

    metadata.event_timestamp = "2024-10-29T14:08:52Z"
    metadata.event_type = "STATUS_HEARTBEAT"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Windows Defender AV"
    metadata.product_event_type = "MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description = "Endpoint Protection client health report (time in UTC)."
    metadata.log_type = "WINDOWS_DEFENDER_AV"
    additional.fields["Platform Version"] = "4.18.24080.9"
    principal.hostname = "win-dc-01.ad.1823127835827.altostrat.com"
    security_result[0].description = "EventID: 1151"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "LOW"
    

Snippet di codice (e Grok) - Estrazione di un campo arbitrario nel nome host principale

Esempi di attributi:

  • Formato dell'origine log: XML
  • Approccio alla mappatura dei dati: snippet di codice che utilizza Grok
  • Tipo di log: WINDOWS_DEFENDER_AV
  • Scopo dell'estensione del parser: Estrazione di campi arbitrari nel nome host principale
  • Descrizione:

    Lo scopo di questo esempio è estrarre Hostname da un FQDN e sovrascrivere il campo principal.hostname.

    Questo esempio verifica se il campo del log non elaborato Computer name include un FQDN. In questo caso, viene estratta solo la parte Hostname e viene sovrascritto il campo Principal Hostname UDM.

    Dopo aver esaminato il parser e il documento Importanti campi UDM, è chiaro che deve essere utilizzato il campo principal.hostname.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: Extract Hostname from FQDN and overwrite principal.hostname
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/System/Computer" => "hostname"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
      # Extract all characters before the first dot in the hostname variable
            grok {
                match => { "hostname" => "(?<hostname>[^.]+)" }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    Questa estensione del parser utilizza un'istruzione Grok per eseguire un'espressione regolare (regex) per estrarre il campo hostname. La regex stessa utilizza un gruppo di cattura denominato, il che significa che qualsiasi corrispondenza all'interno delle parentesi verrà memorizzata nel campo denominato hostname, trovando una corrispondenza con uno o più caratteri fino a quando non incontra un punto. Verrà acquisito solo il hostname all'interno di un FQDN.

    Tuttavia, quando viene eseguito l'OUTPUT PREVIEW UDM viene restituito un errore. Perché accade questo?

    generic::unknown: pipeline.ParseLogEntry failed:
     LOG_PARSING_CBN_ERROR: "generic::internal: pipeline failed: filter grok (2) failed: 
    field\ "hostname\" already exists in data and is not overwritable"
    

    Statement overwrite Grok

    All'interno di un'istruzione Grok, un gruppo di cattura denominato non può sovrascrivere una variabile esistente, a meno che non sia specificato esplicitamente utilizzando l'istruzione overwrite. In questo scenario potremmo utilizzare un nome di variabile diverso per il gruppo di cattura denominato nell'istruzione Grok oppure, come mostrato nell'esempio di snippet di codice seguente, utilizzare l'istruzione overwrite per sovrascrivere esplicitamente la variabile hostname esistente.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Overwriting principal Hostname
    filter {
      xml {
        source => "message"
          xpath => {
            "/Event/System/Computer" => "hostname"
        }
        on_error => "_xml_error"
      }
      if ![_xml_error] {
        grok {
          match => { "hostname" => "(?<hostname>[^.]+)" }
          overwrite => ["hostname"]
          on_error => "_grok_hostname_error"
        }
        mutate {
          replace => {
            "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
          }
        }
        mutate {
          merge => {
            "@output" => "event1"
          }
        }
      }
    }
    

    L'esecuzione di nuovo dell'OUTPUT PREVIEW UDM mostra che il nuovo campo è stato aggiunto dopo aver estratto hostname da FQDN.

    metadata.event_timestamp"2024-10-29T14:08:52Z"
    metadata.event_type"STATUS_HEARTBEAT"
    metadata.vendor_name"Microsoft"
    metadata.product_name"Windows Defender AV"
    metadata.product_event_type"MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description"Endpoint Protection client health report (time in UTC)."
    metadata.log_type"WINDOWS_DEFENDER_AV"
    principal.hostname"win-dc-01"
    security_result[0].description"EventID: 1151"
    security_result[0].action[0]"ALLOW"
    security_result[0].severity"LOW"