Exemplos de extensão de analisador

Compatível com:

Este documento fornece exemplos de criação de extensões de analisador em diferentes cenários. Para saber mais sobre extensões de analisador, consulte Criar extensões de analisador.

Exemplos de extensão de analisador

Use as tabelas de atributos a seguir para encontrar rapidamente o código de exemplo necessário.

Exemplos sem código

Formato da origem do registro Exemplo de título Descrição Conceitos do analisador neste exemplo
JSON
(tipo de registro: GCP_IDS)
Extrair campos Extrai campos de um registro no formato JSON. Sem código
JSON
(tipo de registro: WORKSPACE_ALERTS)
Extrair campos com o valor da pré-condição Extraia campos de um registro no formato JSON e normalize-os em um campo UDM repetido com uma pré-condição.

Exemplos de snippet de código

Formato da origem do registro Exemplo de título Descrição Conceitos do analisador neste exemplo
JSON
(tipo de registro: GCP_IDS)
Como adicionar um user agent HTTP
  • Extraia o user agent do analisador HTTP de rede e crie um target hostname a partir do requestUrl.
  • Atribua um namespace para garantir que o mapeamento e o enriquecimento baseados em recursos sejam realizados.
CSV
(tipo de registro: MISP_IOC)
Extração de campos arbitrários no objeto UDM additional Extrai campos para UDM > Entidade > Objeto UDM additional > par de chave-valor Objeto additional da UDM
Syslog
(tipo de registro: POWERSHELL)
Extrair prioridade e gravidade do Syslog Extraia os valores de Facility e Severity do Syslog nos campos de prioridade e gravidade do resultado de segurança do UDM. Com base no Grok
JSON com um cabeçalho Syslog
(tipo de registro: WINDOWS_SYSMON)
Decoração com base em uma instrução condicional
  • Adiciona decoração (informações contextuais) ao campo metadata.description com base em uma expressão condicional e na compreensão dos tipos de dados nos snippets de código.
  • Ao usar um filtro de extração, o tipo de dados original pode ser preservado.
  • Uma expressão condicional do Grok precisa usar o tipo de dados original para avaliar o campo.
  • Com base no Grok
  • Instrução condicional do Grok
  • O tipo de dados original de um campo extraído pode ser preservado.
  • Uma expressão condicional do Grok precisa usar o tipo de dados original para avaliar o campo.
JSON com um cabeçalho Syslog
(tipo de registro: WINDOWS_SYSMON)
Converter tipos de dados
  • Converta tipos de dados em uma extensão de analisador usando a função convert.
  • Use instruções on_error para garantir o tratamento adequado de erros e evitar falhas de extensão do analisador causadas por erros.
  • Com base no Grok
  • Converter tipos de dados
  • Use instruções on_error para fornecer tratamento de erros.
JSON com um cabeçalho Syslog
(tipo de registro: WINDOWS_SYSMON)
Nomes de variáveis temporários para facilitar a leitura É possível usar nomes de variáveis temporárias em snippets de código e renomeá-los posteriormente para corresponder ao nome do objeto de evento da UDM da saída final. Isso pode ajudar na legibilidade geral.
  • Com base no Grok
  • Use nomes de variáveis temporárias e renomeie-as para os nomes de saída finais do UDM.
JSON com um cabeçalho Syslog
(tipo de registro: WINDOWS_SYSMON)
Campos repetidos Tenha cuidado ao trabalhar com campos repetidos em snippets de código, por exemplo, o campo security_result.
XML
(tipo de registro: WINDOWS_DEFENDER_AV)
Extração de campo arbitrário no objeto additional
  • Extraia e armazene o valor da versão da plataforma, por exemplo, para poder informar e pesquisar versões desatualizadas da plataforma.
  • Neste exemplo, não há um campo UDM padrão adequado. Portanto, o objeto additional é usado para armazenar as informações como um par de chave-valor personalizado.
O objeto additional é usado para armazenar as informações como um par de chave-valor personalizado.
XML
(tipo de registro: WINDOWS_DEFENDER_AV)
Extração de campo arbitrário para o nome de host principal
  • Extrair o nome do host de um FQDN.
  • O processamento condicional é usado para determinar se o campo principal.hostname precisa ser substituído.
  • A instrução Grok usa uma expressão regular (regex) para extrair o campo hostname. A regex usa um grupo de captura nomeado, o que significa que o que for correspondido dentro dos parênteses será armazenado no campo hostname, correspondendo a um ou mais caracteres até encontrar um ponto. Isso vai capturar apenas o hostname em um FQDN.
Instrução overwrite do Grok
  • No entanto, ao executar a PREVIEW UDM OUTPUT, um erro é retornado: "LOG_PARSING_CBN_ERROR: O campo hostname já existe nos dados e não pode ser substituído".
  • Em uma instrução Grok, um grupo de captura nomeado não pode substituir uma variável existente, a menos que seja especificado explicitamente usando a instrução overwrite. Nesse cenário, podemos usar um nome de variável diferente para o grupo de captura nomeado na instrução Grok ou, conforme mostrado neste exemplo, usar a instrução de substituição para substituir explicitamente a variável de nome de host existente.
  • Com base no Grok
  • O processamento condicional é usado para determinar se um campo precisa ser substituído.
  • Use a instrução Grok com expressões regulares (regex).
  • Instrução overwrite do Grok

Exemplos de JSON

Os exemplos a seguir mostram como criar uma extensão de analisador em que a origem do registro está no formato JSON.

Sem código: extrair campos

Exemplos de atributos:

  • Formato da origem do registro: JSON
  • Abordagem de mapeamento de dados: sem código
  • Tipo de registro: GCP_IDS
  • Objetivo da extensão do analisador: extrair campos.
  • Descrição:

    Vários campos relacionados à rede não estão sendo extraídos. Como esse exemplo de registro é um registro estruturado no formato JSON, podemos usar a abordagem sem código (Mapear campos de dados) para criar a extensão do analisador.

    Os campos originais que queremos extrair são:

    • total_packets (string)
    • elapsed_time (string)
    • total_bytes (string)

    Esta é a entrada de registro bruto de exemplo:

    {
    "insertId": "625a41542d64c124e7db097ae0906ccb-1@a3",
    "jsonPayload": {
      "destination_port": "80",
      "application": "incomplete",
      "ip_protocol": "tcp",
      "network": "projects/prj-p-shared-base/global/networks/shared-vpc-production",
      "start_time": "2024-10-29T21:14:59Z",
      "source_port": "41936",
      "source_ip_address": "35.191.200.157",
      "total_packets": "6",
      "elapsed_time": "0",
      "destination_ip_address": "192.168.0.11",
      "total_bytes": "412",
      "repeat_count": "1",
      "session_id": "1289742"
    },
    "resource": {
      "type": "ids.googleapis.com/Endpoint",
      "labels": {
        "resource_container": "projects/12345678910",
        "location": "europe-west4-a",
        "id": "p-europe-west4"
      }
    },
    "timestamp": "2024-10-29T21:15:21Z",
    "logName": "projects/prj-p-shared-base/logs/ids.googleapis.com%2Ftraffic",
    "receiveTimestamp": "2024-10-29T21:15:24.051990717Z"
    }
    

    O exemplo usa a abordagem sem código para criar uma extensão de analisador usando o seguinte mapeamento de campo de dados:

    Caminho de pré-condição Operador de pré-condição Valor da condição prevista Caminho de dados brutos Campo de destino*
    jsonPayload.total_bytes NOT_EQUALS "" jsonPayload.total_bytes udm.principal.network.received_bytes
    jsonPayload.elapsed_time NOT_EQUALS "" jsonPayload.elapsed_time udm.principal.network.session_duration.seconds
    jsonPayload.total_packets NOT_EQUALS "" jsonPayload.total_packets udm.principal.network.received_packets

    A execução da extensão do analisador adiciona os três campos extraídos ao objeto principal.network.

    metadata.product_log_id = "625a41542d64c124e7db097ae0906ccb-1@a3"
    metadata.event_timestamp = "2024-10-29T21:14:59Z"
    metadata.event_type = "NETWORK_CONNECTION"
    metadata.vendor_name = "Google Cloud"
    metadata.product_name = "IDS"
    metadata.ingestion_labels[0].key = "label"
    metadata.ingestion_labels[0].value = "GCP_IDS"
    metadata.log_type = "GCP_IDS"
    principal.ip[0] = "35.191.200.157"
    principal.port = 41936
    principal.network.received_bytes = 412
    principal.network.session_duration.seconds = "0s"
    principal.network.received_packets = 6
    target.ip[0] = "192.168.0.11"
    target.port = 80
    target.application = "incomplete"
    observer.location.country_or_region = "EUROPE"
    observer.location.name = "europe-west4-a"
    observer.resource.name = "projects/12345678910"
    observer.resource.resource_type = "CLOUD_PROJECT"
    observer.resource.attribute.cloud.environment = "GOOGLE_CLOUD_PLATFORM"
    observer.resource.product_object_id = "p-europe-west4"
    network.ip_protocol = "TCP"
    network.session_id = "1289742"
    

Sem código: extrair campos com o valor da pré-condição

Exemplos de atributos:

  • Formato da origem do registro: JSON
  • Abordagem de mapeamento de dados: sem código
  • Tipo de registro: WORKSPACE_ALERTS
  • Finalidade da extensão do analisador: extrair campos com o valor da pré-condição.
  • Descrição:

    O analisador original não extrai o email address do usuário principal afetado por um alerta de prevenção contra perda de dados (DLP).

    Este exemplo usa uma extensão de parser sem código para extrair o email address e normalizá-lo em um campo UDM repetido, com uma pré-condição.

    Ao trabalhar com campos repetidos em uma extensão de analisador sem código, é necessário indicar se você quer:

    • replace (modifique todos os valores dos campos repetidos no objeto UDM atual) ou
    • append (anexar valores extraídos a campos repetidos).

    Para mais detalhes, consulte a seção Campos repetidos.

    Este exemplo substitui todos os endereços de e-mail existentes no campo principal.user.email_address normalizado.

    As pré-condições permitem que você realize verificações condicionais antes de realizar uma operação de extração. Na maioria dos casos, o campo de pré-condição é o mesmo que o campo de dados brutos que você quer extrair, com um operador de pré-condição de not Null, por exemplo, foo != "".

    No entanto, às vezes, como no nosso exemplo, o valor do campo de dados brutos que você quer extrair não está presente em todas as entradas de registro. Nesse caso, use outro campo de pré-condição para filtrar a operação de extração. No nosso exemplo, o campo triggeringUserEmail bruto que você quer extrair está presente apenas nos registros em que o type = Data Loss Prevention está.

    Estes são os valores de exemplo a serem inseridos nos campos de extensão do analisador sem código:

    Caminho de pré-condição Operador de pré-condição Valor da condição prevista Caminho de dados brutos Campo de destino*
    type IGUAL A Prevenção contra perda de dados data.ruleViolationInfo.triggeringUserEmail udm.principal.user.email_addresses

    O exemplo a seguir mostra os campos da extensão do analisador sem código preenchidos com os valores de exemplo:

    image2

    A execução da extensão do analisador adiciona o email_address ao objeto principal.user.

    metadata.product_log_id = "Ug71LGqBr6Q="
    metadata.event_timestamp = "2022-12-18T12:17:35.154368Z"
    metadata.event_type = "USER_UNCATEGORIZED"
    metadata.vendor_name = "Google Workspace"
    metadata.product_name = "Google Workspace Alerts"
    metadata.product_event_type = "DlpRuleViolation"
    metadata.log_type = "WORKSPACE_ALERTS"
    additional.fields["resource_title"] = "bq-results-20221215-112933-1671103787123.csv"
    principal.user.email_addresses[0] = "foo.bar@altostrat.com"
    target.resource.name = "DRIVE"
    target.resource.resource_type = "STORAGE_OBJECT"
    target.resource.product_object_id = "1wLteoF3VHljS_8_ABCD_VVbhFTfcTQplJ5k1k7cL4r8"
    target.labels[0].key = "resource_title"
    target.labels[0].value = "bq-results-20221321-112933-1671103787697.csv"
    about[0].resource.resource_type = "CLOUD_ORGANIZATION"
    about[0].resource.product_object_id = "C01abcde2"
    security_result[0].about.object_reference.id = "ODU2NjEwZTItMWE2YS0xMjM0LWJjYzAtZTJlMWU2YWQzNzE3"
    security_result[0].category_details[0] = "Data Loss Prevention"
    security_result[0].rule_name = "Sensitive Projects Match"
    security_result[0].summary = "Data Loss Prevention"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "MEDIUM"
    security_result[0].rule_id = "rules/00abcdxs183abcd"
    security_result[0].action_details = "ALERT, DRIVE_WARN_ON_EXTERNAL_SHARING"
    security_result[0].alert_state = "ALERTING"
    security_result[0].detection_fields[0].key = "start_time"
    security_result[0].detection_fields[0].value = "2022-12-18T12:17:35.154368Z"
    security_result[0].detection_fields[1].key = "status"
    security_result[0].detection_fields[1].value = "NOT_STARTED"
    security_result[0].detection_fields[2].key = "trigger"
    security_result[0].detection_fields[2].value = "DRIVE_SHARE"
    security_result[0].rule_labels[0].key = "detector_name"
    security_result[0].rule_labels[0].value = "EMAIL_ADDRESS"
    network.email.to[0] = "foo.bar@altostrat.com"
    

Snippet de código: como adicionar um user agent HTTP

Exemplos de atributos:

  • Formato da origem do registro: JSON
  • Abordagem de mapeamento de dados: snippet de código
  • Tipo de registro: GCP_IDS
  • Finalidade da extensão do analisador: Adicionar o user agent HTTP.
  • Descrição:

    Este é um exemplo de um tipo de objeto UDM não padrão que não é compatível com a abordagem sem código e, portanto, exige o uso de um fragmento de código. O analisador padrão não extrai a análise Network HTTP Parser User Agent. Além disso, para manter a consistência:

    1. Uma Target Hostname será criada com base no requestUrl.
    2. Um Namespace será atribuído para garantir que o aliasing e o enriquecimento baseados em recursos sejam realizados.
    # GCP_LOADBALANCING
    # owner: @owner
    # updated: 2022-12-23
    # Custom parser extension that:
    # 1) adds consistent Namespace 
    # 2) adds Parsed User Agent Object 
    filter {
        # Initialize placeholder
        mutate {
            replace => {
                "httpRequest.userAgent" => ""
                "httpRequest.requestUrl" => ""
            }
        }
        json {
            on_error => "not_json"
            source => "message"
            array_function => "split_columns"
        }
        if ![not_json] {
          #1 - Override Namespaces
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.target.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.src.namespace" => "TMO"
                }
            }
            #2 - Parsed User Agent
            if [httpRequest][requestUrl]!= "" {
                grok {
                    match => {
                        "httpRequest.requestUrl" => ["\/\/(?P<_hostname>.*?)\/"]
                    }
                    on_error => "_grok_hostname_failed"
                }
                if ![_grok_hostname_failed] {
                    mutate {
                        replace => {
                            "event1.idm.read_only_udm.target.hostname" => "%{_hostname}"
                        }
                    }
                }
            }
            if [httpRequest][userAgent] != "" {
                mutate {
                    convert => {
                        "httpRequest.userAgent" => "parseduseragent"
                    }
                }
                #Map the converted "user_agent" to the new UDM field "http.parsed_user_agent".
                mutate {
                    rename => {
                        "httpRequest.userAgent" => "event1.idm.read_only_udm.network.http.parsed_user_agent"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

Exemplo de CSV

O exemplo a seguir mostra como criar uma extensão de analisador em que a origem do registro está no formato CSV.

Snippet de código: extração de campos arbitrários no objeto additional

Exemplos de atributos:

  • Formato da origem de registro: CSV
  • Abordagem de mapeamento de dados: snippet de código
  • Tipo de registro: MISP_IOC
  • Finalidade da extensão do analisador: extração de campos arbitrários no objeto additional.
  • Descrição:

    Neste exemplo, a integração de contexto de entidade UDM MISP_IOC é usada. O objeto UDM de par de chave-valor additional será usado para capturar informações contextuais que não foram extraídas pelo analisador padrão e para adicionar campos específicos da organização. Por exemplo, um URL para a instância específica do MISP.

    Esta é a origem de registro baseada em CSV para este exemplo:

    1 9d66d38a-14e1-407f-a4d1-90b82aa1d59f
    2 3908
    3 Network activity
    4 ip-dst
    5 117.253.154.123
    6
    7
    8 1687894564
    9
    10
    11
    12
    13
    14 DigitalSide Malware report\: MD5\: 59ce0baba11893f90527fc951ac69912
    15 ORGNAME
    16 DIGITALSIDE.IT
    17 0
    18 Medium
    19 0
    20 2023-06-23
    21 tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch
    22 1698036218

    imagem

    # MISP_IOC
    # owner: @owner
    # updated: 2024-06-21
    # Custom parser extension that:
    # 1) adds a link back to internal MISP tenant 
    # 2) extracts missing fields into UDM > Entity > Additional fields
    filter {
        # Set the base URL for MISP. Remember to replace this placeholder!
        mutate {
            replace => {
                "misp_base_url" => "https://<YOUR_MISP_URL>"
            }
        }
        # Parse the CSV data from the 'message' field. Uses a comma as the separator.
        # The 'on_error' option handles lines that are not properly formatted CSV.
        csv {
            source => "message"
            separator => ","
            on_error => "broken_csv"
        }
        # If the CSV parsing was successful...
        if ![broken_csv] {
            # Rename the CSV columns to more descriptive names.
            mutate {
                rename => {
                    "column2" => "event_id"
                    "column8" => "object_timestamp"
                    "column16" => "event_source_org"
                    "column17" => "event_distribution"
                    "column19" => "event_analysis"
                    "column22" => "attribute_timestamp"
                }
            }
        }
        # Add a link to view the event in MISP, if an event ID is available.
        # "column2" => "event_id"
        if [event_id] != "" {
            mutate {
                replace => {
                    "additional_url.key" => "view_in_misp"
                    "additional_url.value.string_value" => "%{misp_base_url}/events/view/%{event_id}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_url"
                }
            }
        }
        # Add the object timestamp as an additional field, if available.
        # "column8" => "object_timestamp"
        if [object_timestamp] != "" {
            mutate {
                replace => {
                    "additional_object_timestamp.key" => "object_timestamp"
                    "additional_object_timestamp.value.string_value" => "%{object_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_object_timestamp"
                }
            }
        }
        # Add the event source organization as an additional field, if available.
        # "column16" => "event_source_org"
        if [event_source_org] != "" {
            mutate {
                replace => {
                    "additional_event_source_org.key" => "event_source_org"
                    "additional_event_source_org.value.string_value" => "%{event_source_org}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_source_org"
                }
            }
        }
        # Add the event distribution level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column17" => "event_distribution"
        if [event_distribution] != "" {
            if [event_distribution] == "0" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "YOUR_ORGANIZATION_ONLY"
                    }
                }
            } else if [event_distribution] == "1" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "THIS_COMMUNITY_ONLY"
                    }
                }
            } else if [event_distribution] == "2" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "CONNECTED_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "3" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "ALL_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "4" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "SHARING_GROUP"
                    }
                }
            } else if [event_distribution] == "5" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "INHERIT_EVENT"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_distribution.key" => "event_distribution"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_distribution"
                }
            }
        }
        # Add the event analysis level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column19" => "event_analysis"
        if [event_analysis] != "" {
            if [event_analysis] == "0" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "INITIAL"
                    }
                }
            } else if [event_analysis] == "1" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "ONGOING"
                    }
                }
            } else if [event_analysis] == "2" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "COMPLETE"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_analysis.key" => "event_analysis"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_analysis"
                }
            }
        }
        # Add the attribute timestamp as an additional field, if available.
        # "column22" => "attribute_timestamp" 
        if [attribute_timestamp] != "" {
            mutate {
                replace => {
                    "additional_attribute_timestamp.key" => "attribute_timestamp"
                    "additional_attribute_timestamp.value.string_value" => "%{attribute_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_attribute_timestamp"
                }
            }
        }
        # Finally, merge the 'event' data into the '@output' field.
        mutate {
            merge => {
                "@output" => "event"
            }
        }
    }
    

    A execução da extensão do analisador adiciona os campos personalizados do CSV ao objeto additional.

    metadata.product_entity_id = "9d66d38a-14e1-407f-a4d1-90b82aa1d59f"
    metadata.collected_timestamp = "2024-10-31T15:16:08Z"
    metadata.vendor_name = "MISP"
    metadata.product_name = "MISP"
    metadata.entity_type = "IP_ADDRESS"
    metadata.description = "ip-dst"
    metadata.interval.start_time = "2023-06-27T19:36:04Z"
    metadata.interval.end_time = "9999-12-31T23:59:59Z"
    metadata.threat[0].category_details[0] = "Network activity"
    metadata.threat[0].description = "tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch - additional info: DigitalSide Malware report: MD5: 59ce0baba11893f90527fc951ac69912"
    metadata.threat[0].severity_details = "Medium"
    metadata.threat[0].threat_feed_name = "DIGITALSIDE.IT"
    entity.ip[0] = "117.253.154.123"
    additional.fields["view_in_misp"] = "https:///events/view/3908"
    additional.fields["object_timestamp"] = "1687894564"
    additional.fields["event_source_org"] = "DIGITALSIDE.IT"
    additional.fields["event_distribution"] = "YOUR_ORGANIZATION_ONLY"
    additional.fields["event_analysis"] = "INITIAL"
    additional.fields["attribute_timestamp"] = "1698036218"
    

Exemplos do Grok

Os exemplos a seguir mostram como criar extensões de parser baseadas em Grok.

Snippet de código (e Grok): extração de prioridade e gravidade

Exemplos de atributos:

  • Formato da origem do registro: Syslog
  • Abordagem de mapeamento de dados: snippet de código usando o Grok
  • Tipo de registro: POWERSHELL
  • Finalidade da extensão do analisador: extrair prioridade e gravidade.
  • Descrição:

    Neste exemplo, uma extensão de parser baseada em Grok é criada para extrair os valores de Syslog Facility e Severity para os campos Priority e Severity do UDM Security Result.

    filter {
        # Use grok to parse syslog messages. The on_error clause handles messages that don't match the pattern.
        grok {
            match => {
                "message" => [
                    # Extract message with syslog headers.
                    "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
                ]
            }
            on_error => "not_supported_format"
        }
        # If the grok parsing failed, tag the event as unsupported and drop it.
        if ![not_supported_format] {
            if [_syslog_priority] != "" {
                if [_syslog_priority] =~ /0|8|16|24|32|40|48|56|64|72|80|88|96|104|112|120|128|136|144|152|160|168|176|184/ {
                    mutate { replace => { "_security_result.severity_details" => "EMERGENCY" } } 
                }
                if [_syslog_priority] =~ /1|9|17|25|33|41|49|57|65|73|81|89|97|105|113|121|129|137|145|153|161|169|177|185/ {
                    mutate { replace => { "_security_result.severity_details" => "ALERT" } } 
                }
                if [_syslog_priority] =~ /2|10|18|26|34|42|50|58|66|74|82|90|98|106|114|122|130|138|146|154|162|170|178|186/ {
                    mutate { replace => { "_security_result.severity_details" => "CRITICAL" } }
                }
                if [_syslog_priority] =~ /3|11|19|27|35|43|51|59|67|75|83|91|99|107|115|123|131|139|147|155|163|171|179|187/ {
                    mutate { replace => { "_security_result.severity_details" => "ERROR" } }
                }
                if [_syslog_priority] =~ /4|12|20|28|36|44|52|60|68|76|84|92|100|108|116|124|132|140|148|156|164|172|180|188/ {
                    mutate { replace => { "_security_result.severity_details" => "WARNING" } }
                }
                if [_syslog_priority] =~ /5|13|21|29|37|45|53|61|69|77|85|93|101|109|117|125|133|141|149|157|165|173|181|189/ {
                    mutate { replace => { "_security_result.severity_details" => "NOTICE" } }
                }
                if [_syslog_priority] =~ /6|14|22|30|38|46|54|62|70|78|86|94|102|110|118|126|134|142|150|158|166|174|182|190/ {
                    mutate { replace => { "_security_result.severity_details" => "INFORMATIONAL" } }
                }
                if [_syslog_priority] =~ /7|15|23|31|39|47|55|63|71|79|87|95|103|111|119|127|135|143|151|159|167|175|183|191/ {
                    mutate { replace => { "_security_result.severity_details" => "DEBUG" } }
                }
                # Facilities (mapped to priority)
                if [_syslog_priority] =~ /0|1|2|3|4|5|6|7/ { 
                    mutate { replace => { "_security_result.priority_details" => "KERNEL" } } 
                }
                if [_syslog_priority] =~ /8|9|10|11|12|13|14|15/ { 
                    mutate { replace => { "_security_result.priority_details" => "USER" } } 
                }
                if [_syslog_priority] =~ /16|17|18|19|20|21|22|23/ { 
                    mutate { replace => { "_security_result.priority_details" => "MAIL" } } 
                }
                if [_syslog_priority] =~ /24|25|26|27|28|29|30|31/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSTEM" } } 
                }
                if [_syslog_priority] =~ /32|33|34|35|36|37|38|39/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /40|41|42|43|44|45|46|47/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSLOG" } } 
                }
                if [_syslog_priority] =~ /48|49|50|51|52|53|54|55/ { 
                    mutate { replace => { "_security_result.priority_details" => "LPD" } } 
                }
                if [_syslog_priority] =~ /56|57|58|59|60|61|62|63/ { 
                    mutate { replace => { "_security_result.priority_details" => "NNTP" } } 
                }
                if [_syslog_priority] =~ /64|65|66|67|68|69|70|71/ { 
                    mutate { replace => { "_security_result.priority_details" => "UUCP" } } 
                }
                if [_syslog_priority] =~ /72|73|74|75|76|77|78|79/ { 
                    mutate { replace => { "_security_result.priority_details" => "TIME" } } 
                }
                if [_syslog_priority] =~ /80|81|82|83|84|85|86|87/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /88|89|90|91|92|93|94|95/ { 
                    mutate { replace => { "_security_result.priority_details" => "FTPD" } } 
                }
                if [_syslog_priority] =~ /96|97|98|99|100|101|102|103/ { 
                    mutate { replace => { "_security_result.priority_details" => "NTPD" } } 
                }
                if [_syslog_priority] =~ /104|105|106|107|108|109|110|111/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGAUDIT" } } 
                }
                if [_syslog_priority] =~ /112|113|114|115|116|117|118|119/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGALERT" } } 
                }
                if [_syslog_priority] =~ /120|121|122|123|124|125|126|127/ { 
                    mutate { replace => { "_security_result.priority_details" => "CLOCK" } } 
                }
                if [_syslog_priority] =~ /128|129|130|131|132|133|134|135/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL0" } } 
                }
                if [_syslog_priority] =~ /136|137|138|139|140|141|142|143/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL1" } } 
                }
                if [_syslog_priority] =~ /144|145|146|147|148|149|150|151/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL2" } } 
                }
                if [_syslog_priority] =~ /152|153|154|155|156|157|158|159/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL3" } } 
                }
                if [_syslog_priority] =~ /160|161|162|163|164|165|166|167/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL4" } } 
                }
                if [_syslog_priority] =~ /168|169|170|171|172|173|174|175/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL5" } } 
                }
                if [_syslog_priority] =~ /176|177|178|179|180|181|182|183/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL6" } } 
                }
                if [_syslog_priority] =~ /184|185|186|187|188|189|190|191/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL7" } } 
                }
                mutate {
                    merge => {
                        "event.idm.read_only_udm.security_result" => "_security_result"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event"
                }
            }
        }
    }
    

    A visualização dos resultados da extensão do analisador mostra o formato legível por humanos.

    metadata.product_log_id = "6161053"
    metadata.event_timestamp = "2024-10-31T15:10:10Z"
    metadata.event_type = "PROCESS_LAUNCH"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "PowerShell"
    metadata.product_event_type = "600"
    metadata.description = "Info"
    metadata.log_type = "POWERSHELL"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.resource.name = "in_powershell"
    principal.resource.resource_subtype = "im_msvistalog"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.hostname = "Default Host"
    target.process.command_line = "C:\Program Files\Microsoft Azure AD Sync\Bin\miiserver.exe"
    target.asset.hostname = "Default Host"
    target.asset.asset_id = "Host ID:bf203e94-72cf-4649-84a5-fc02baedb75f"
    security_result[0].severity_details = "INFORMATIONAL"
    security_result[0].priority_details = "USER"
    

Exemplo de código (e Grok): decoração de eventos, nomes de variáveis temporárias e conversão de tipos de dados

Exemplos de atributos:

  • Formato da origem do registro: JSON com um cabeçalho Syslog
  • Abordagem de mapeamento de dados: snippet de código usando o Grok
  • Tipo de registro: WINDOWS_SYSMON
  • Finalidade da extensão do analisador: decorar eventos, nomes de variáveis temporárias e tipos de dados.
  • Descrição:

    Este exemplo mostra como realizar as seguintes ações ao criar uma extensão de analisador:

    Decoração com base em uma instrução condicional

    Este exemplo adiciona explicações (informações contextuais) sobre o significado de cada tipo de evento em WINDOWS_SYSMON. Ele usa uma instrução condicional para verificar o EventID e, em seguida, adiciona um Description. Por exemplo, EventID 1 é um evento Process Creation.

    Ao usar um filtro de extração, por exemplo, JSON, o tipo de dados original pode ser preservado.

    No exemplo a seguir, o valor EventID é extraído como um número inteiro por padrão. A expressão condicional avalia o valor EventID como um número inteiro, não uma string.

    if [EventID] == 1 {
      mutate {
        replace => {
          "_description" => "[1] Process creation"
        }
      }
    }
    

    Conversão do tipo de dados

    É possível converter tipos de dados em uma extensão de analisador usando a função de conversão.

    mutate {
      convert => {
        "EventID" => "string"
      }
      on_error => "_convert_EventID_already_string"
    }
    

    Nomes de variáveis temporários para facilitar a leitura

    É possível usar nomes de variáveis temporárias em snippets de código e renomeá-los posteriormente para corresponder ao nome do objeto de evento da UDM da saída final. Isso pode ajudar na legibilidade geral.

    No exemplo abaixo, a variável description é renomeada como event.idm.read_only_udm.metadata.description:

    mutate {
      rename => {
        "_description" => "event.idm.read_only_udm.metadata.description"
      }
    }
    

    Campos repetidos

    A extensão completa do analisador é a seguinte:

    filter {
    # initialize variable
    mutate {
      replace => {
        "EventID" => ""
      }
    }
    # Use grok to parse syslog messages.
    # The on_error clause handles messages that don't match the pattern.
    grok {
      match => {
        "message" => [
          "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
        ]
      }
      on_error => "not_supported_format"
    }
    if ![not_supported_format] {
      json {
        source => "log_data"
        on_error => "not_json"
      }
      if ![not_json] {
        if [EventID] == 1 {
          mutate {
            replace => {
              "_description" => "[1] Process creation"
            }
          }
        }
        if [EventID] == 2 {
          mutate {
            replace => {
              "_description" => "[2] A process changed a file creation time"
            }
          }
        }
        if [EventID] == 3 {
          mutate {
            replace => {
              "_description" => "[3] Network connection"
            }
          }
        }
        if [EventID] == 4 {
          mutate {
            replace => {
              "_description" => "[4] Sysmon service state changed"
            }
          }
        }
        if [EventID] == 5 {
          mutate {
            replace => {
              "_description" => "[5] Process terminated"
            }
          }
        }
        if [EventID] == 6 {
          mutate {
            replace => {
              "_description" => "[6] Driver loaded"
            }
          }
        }
        if [EventID] == 7 {
          mutate {
            replace => {
              "_description" => "[7] Image loaded"
            }
          }
        }
        if [EventID] == 8 {
          mutate {
            replace => {
              "_description" => "[8] CreateRemoteThread"
            }
          }
        }
        if [EventID] == 9 {
          mutate {
            replace => {
              "_description" => "[9] RawAccessRead"
            }
          }
        }
        if [EventID] == 10 {
          mutate {
            replace => {
              "_description" => "[10] ProcessAccess"
            }
          }
        }
        if [EventID] == 11 {
          mutate {
            replace => {
              "_description" => "[11] FileCreate"
            }
          }
        }
        if [EventID] == 12 {
          mutate {
            replace => {
              "_description" => "[12] RegistryEvent (Object create and delete)"
            }
          }
        }
        if [EventID] == 13 {
          mutate {
            replace => {
              "_description" => "[13] RegistryEvent (Value Set)"
            }
          }
        }
        if [EventID] == 14 {
          mutate {
            replace => {
              "_description" => "[14] RegistryEvent (Key and Value Rename)"
            }
          }
        }
        if [EventID] == 15 {
          mutate {
            replace => {
              "_description" => "[15] FileCreateStreamHash"
            }
          }
        }
        if [EventID] == 16 {
          mutate {
            replace => {
              "_description" => "[16] ServiceConfigurationChange"
            }
          }
        }
        if [EventID] == 17 {
          mutate {
            replace => {
              "_description" => "[17] PipeEvent (Pipe Created)"
            }
          }
        }
        if [EventID] == 18 {
          mutate {
            replace => {
              "_description" => "[18] PipeEvent (Pipe Connected)"
            }
          }
        }
        if [EventID] == 19 {
          mutate {
            replace => {
              "_description" => "[19] WmiEvent (WmiEventFilter activity detected)"
            }
          }
        }
        if [EventID] == 20 {
          mutate {
            replace => {
              "_description" => "[20] WmiEvent (WmiEventConsumer activity detected)"
            }
          }
        }
        if [EventID] == 21 {
          mutate {
            replace => {
              "_description" => "[21] WmiEvent (WmiEventConsumerToFilter activity detected)"
            }
          }
        }
        if [EventID] == 22 {
          mutate {
            replace => {
              "_description" => "[22] DNSEvent (DNS query)"
            }
          }
        }
        if [EventID] == 23 {
          mutate {
            replace => {
              "_description" => "[23] FileDelete (File Delete archived)"
            }
          }
        }
        if [EventID] == 24 {
          mutate {
            replace => {
              "_description" => "[24] ClipboardChange (New content in the clipboard)"
            }
          }
        }
        if [EventID] == 25 {
          mutate {
            replace => {
              "_description" => "[25] ProcessTampering (Process image change)"
            }
          }
        }
        if [EventID] == 26 {
          mutate {
            replace => {
              "_description" => "[26] FileDeleteDetected (File Delete logged)"
            }
          }
        }
        if [EventID] == 255 {
          mutate {
            replace => {
              "_description" => "[255] Error"
            }
          }
        }
        mutate {
          rename => {
            "_description" => "event.idm.read_only_udm.metadata.description"
          }
        }
        statedump{}
        mutate {
          merge => {
            "@output" => "event"
          }
        }
      }
    }
    }
    

    A execução da extensão do analisador adiciona a decoração ao campo metadata.description.

    metadata.product_log_id = "6008459"
    metadata.event_timestamp = "2024-10-31T14:41:53.442Z"
    metadata.event_type = "REGISTRY_CREATION"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Microsoft-Windows-Sysmon"
    metadata.product_event_type = "12"
    metadata.description = "[12] RegistryEvent (Object create and delete)"
    metadata.log_type = "WINDOWS_SYSMON"
    additional.fields["thread_id"] = "3972"
    additional.fields["channel"] = "Microsoft-Windows-Sysmon/Operational"
    additional.fields["Keywords"] = "-9223372036854776000"
    additional.fields["Opcode"] = "Info"
    additional.fields["ThreadID"] = "3972"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.user.userid = "tim.smith_admin"
    principal.user.windows_sid = "S-1-5-18"
    principal.process.pid = "6856"
    principal.process.file.full_path = "C:\Windows\system32\wsmprovhost.exe"
    principal.process.product_specific_process_id = "SYSMON:{927d35bf-a374-6495-f348-000000002900}"
    principal.administrative_domain = "LUNARSTIIINESS"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.registry.registry_key = "HKU\S-1-5-21-3263964631-4121654051-1417071188-1116\Software\Policies\Microsoft\SystemCertificates\CA\Certificates"
    observer.asset_id = "5770385F:C22A:43E0:BF4C:06F5698FFBD9"
    observer.process.pid = "2556"
    about[0].labels[0].key = "Category ID"
    about[0].labels[0].value = "RegistryEvent"
    security_result[0].rule_name = "technique_id=T1553.004,technique_name=Install Root Certificate"
    security_result[0].summary = "Registry object added or deleted"
    security_result[0].severity = "INFORMATIONAL"
    security_result[1].rule_name = "EventID: 12"
    security_result[2].summary = "12"
    

Exemplos de XML

Os exemplos a seguir mostram como criar uma extensão de analisador em que a origem do registro está no formato XML.

Snippet de código: extração de campo arbitrário no objeto additional

Exemplos de atributos:

  • Formato da origem do registro: XML
  • Abordagem de mapeamento de dados: snippet de código
  • Tipo de registro: WINDOWS_DEFENDER_AV
  • Finalidade da extensão do analisador: extração de campo arbitrário no objeto additional
  • Descrição:

    O objetivo deste exemplo é extrair e armazenar o valor Platform Version, por exemplo, para poder informar e pesquisar outdated platform versions.

    Depois de analisar o documento Campos importantes do UDM, nenhum campo padrão do UDM adequado foi identificado. Portanto, este exemplo vai usar o objeto additional para armazenar essas informações como um par de chave-valor personalizado.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Extracting 'Platform Version' into Additional
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/EventData/Data[@Name='Platform version']" => "platform_version"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
            # Prepare the additional field structure using a temporary variable
            mutate{
                replace => {
                    "additional_platform_version.key" => "Platform Version"
                    "additional_platform_version.value.string_value" => "%{platform_version}"
                }
                on_error => "no_platform_version"
            }
            # Merge the additional field into the event1 structure.
            if ![no_platform_version] {
                mutate {
                    merge => {
                        "event1.idm.read_only_udm.additional.fields" => "additional_platform_version"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    A execução da PREVIEW UDM OUTPUT mostra que o novo campo foi adicionado.

    metadata.event_timestamp = "2024-10-29T14:08:52Z"
    metadata.event_type = "STATUS_HEARTBEAT"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Windows Defender AV"
    metadata.product_event_type = "MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description = "Endpoint Protection client health report (time in UTC)."
    metadata.log_type = "WINDOWS_DEFENDER_AV"
    additional.fields["Platform Version"] = "4.18.24080.9"
    principal.hostname = "win-dc-01.ad.1823127835827.altostrat.com"
    security_result[0].description = "EventID: 1151"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "LOW"
    

Snippet de código (e Grok) - Extração de campo arbitrário no nome de host principal

Exemplos de atributos:

  • Formato da origem do registro: XML
  • Abordagem de mapeamento de dados: snippet de código usando o Grok
  • Tipo de registro: WINDOWS_DEFENDER_AV
  • Finalidade da extensão do analisador: extração de campo arbitrário no nome de host principal
  • Descrição:

    O objetivo deste exemplo é extrair o Hostname de um FQDN e substituir o campo principal.hostname.

    Este exemplo verifica se o campo de registro bruto Computer name inclui um FQDN. Nesse caso, ele extrai apenas a parte Hostname e substitui o campo Principal Hostname do UDM.

    Depois de analisar o analisador e o documento Campos importantes do UDM, fica claro que o campo principal.hostname precisa ser usado.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: Extract Hostname from FQDN and overwrite principal.hostname
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/System/Computer" => "hostname"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
      # Extract all characters before the first dot in the hostname variable
            grok {
                match => { "hostname" => "(?<hostname>[^.]+)" }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    Essa extensão de analisador usa uma instrução Grok para executar uma expressão regular (regex) e extrair o campo hostname. A regex em si usa um grupo de captura nomeado, o que significa que o que for correspondido dentro dos parênteses será armazenado no campo chamado hostname, correspondendo a um ou mais caracteres até encontrar um ponto. Isso vai capturar apenas o hostname em um FQDN.

    No entanto, ao executar a PREVIEW UDM OUTPUT, um erro é retornado. Por que isso acontece?

    generic::unknown: pipeline.ParseLogEntry failed:
     LOG_PARSING_CBN_ERROR: "generic::internal: pipeline failed: filter grok (2) failed: 
    field\ "hostname\" already exists in data and is not overwritable"
    

    Instrução overwrite do Grok

    Em uma instrução Grok, um grupo de captura nomeado não pode substituir uma variável existente, a menos que seja especificado explicitamente usando a instrução overwrite. Nesse cenário, poderíamos usar um nome de variável diferente para o grupo de captura nomeado na instrução Grok ou, conforme mostrado no exemplo de snippet de código a seguir, usar a instrução overwrite para substituir explicitamente a variável hostname existente.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Overwriting principal Hostname
    filter {
      xml {
        source => "message"
          xpath => {
            "/Event/System/Computer" => "hostname"
        }
        on_error => "_xml_error"
      }
      if ![_xml_error] {
        grok {
          match => { "hostname" => "(?<hostname>[^.]+)" }
          overwrite => ["hostname"]
          on_error => "_grok_hostname_error"
        }
        mutate {
          replace => {
            "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
          }
        }
        mutate {
          merge => {
            "@output" => "event1"
          }
        }
      }
    }
    

    A execução da PREVIEW UDM OUTPUT novamente mostra que o novo campo foi adicionado depois de extrair o hostname do FQDN.

    metadata.event_timestamp"2024-10-29T14:08:52Z"
    metadata.event_type"STATUS_HEARTBEAT"
    metadata.vendor_name"Microsoft"
    metadata.product_name"Windows Defender AV"
    metadata.product_event_type"MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description"Endpoint Protection client health report (time in UTC)."
    metadata.log_type"WINDOWS_DEFENDER_AV"
    principal.hostname"win-dc-01"
    security_result[0].description"EventID: 1151"
    security_result[0].action[0]"ALLOW"
    security_result[0].severity"LOW"