파서 확장 프로그램 예

다음에서 지원:

이 문서에서는 다양한 시나리오에서 파서 확장 프로그램을 만드는 예를 제공합니다. 파서 확장 프로그램에 대한 자세한 내용은 파서 확장 프로그램 만들기를 참고하세요.

파서 확장 프로그램 예

다음 속성 표를 사용하여 필요한 샘플 코드를 빠르게 찾습니다.

노 코드 예시

로그 소스 형식 제목 예시 설명 이 예시의 파서 개념
JSON
(로그 유형: GCP_IDS)
필드 추출 JSON 형식의 로그에서 필드를 추출합니다. 노코드
JSON
(로그 유형: WORKSPACE_ALERTS)
전제조건 값이 있는 필드 추출 JSON 형식의 로그에서 필드를 추출하고 사전 조건을 사용하여 반복 UDM 필드로 정규화합니다.

코드 스니펫 예시

로그 소스 형식 제목 예시 설명 이 예시의 파서 개념
JSON
(로그 유형: GCP_IDS)
HTTP 사용자 에이전트 추가
  • 네트워크 HTTP 파서 사용자 에이전트를 추출하고 requestUrl에서 target hostname를 만듭니다.
  • 네임스페이스를 할당하여 애셋 기반 별칭 지정 및 보강이 실행되도록 합니다.
CSV
(로그 유형: MISP_IOC)
additional UDM 객체에 임의 필드 추출 필드를 UDM > 항목 > additional UDM 객체 > 키-값 쌍으로 추출합니다. additional UDM 객체
Syslog
(로그 유형: POWERSHELL)
Syslog에서 우선순위 및 심각도 추출 Syslog Facility 및 Severity 값을 UDM Security Result Priority 및 Severity 필드로 추출합니다. Grok 기반
Syslog 헤더가 있는 JSON
(로그 유형: WINDOWS_SYSMON)
조건문을 기반으로 한 장식
  • 조건문을 기반으로 metadata.description 필드에 장식 (문맥 정보)을 추가하고 코드 스니펫 내의 데이터 유형을 이해합니다.
  • 추출 필터를 사용하면 원래 데이터 유형이 보존될 수 있습니다.
  • Grok 조건문은 원래 데이터 유형을 사용하여 필드를 평가해야 합니다.
  • Grok 기반
  • Grok 조건문
  • 추출된 필드의 원래 데이터 유형은 보존될 수 있습니다.
  • Grok 조건문은 원래 데이터 유형을 사용하여 필드를 평가해야 합니다.
Syslog 헤더가 있는 JSON
(로그 유형: WINDOWS_SYSMON)
데이터 유형 변환
  • convert 함수를 사용하여 파서 확장 프로그램 내에서 데이터 유형을 변환합니다.
  • on_error 문을 사용하여 적절한 오류 처리를 보장하고 오류로 인한 파서 확장 프로그램 실패를 방지합니다.
  • Grok 기반
  • 데이터 유형 전환
  • on_error 문을 사용하여 오류 처리를 제공합니다.
Syslog 헤더가 있는 JSON
(로그 유형: WINDOWS_SYSMON)
가독성을 위한 임시 변수 이름 코드 스니펫에서 임시 변수 이름을 사용하고 나중에 최종 출력 UDM 이벤트 객체 이름과 일치하도록 이름을 바꿀 수 있습니다. 이렇게 하면 전반적인 가독성이 향상될 수 있습니다.
  • Grok 기반
  • 임시 변수 이름을 사용하고 나중에 최종 출력 UDM 이름으로 이름을 바꿉니다.
Syslog 헤더가 있는 JSON
(로그 유형: WINDOWS_SYSMON)
반복 필드 코드 스니펫에서 반복 필드(예: security_result 필드)를 사용할 때는 주의해야 합니다.
XML
(로그 유형: WINDOWS_DEFENDER_AV)
additional 객체에 임의 필드 추출
  • 예를 들어 오래된 플랫폼 버전을 보고하고 검색할 수 있도록 플랫폼 버전 값을 추출하고 저장합니다.
  • 이 예시에는 적절한 표준 UDM 필드가 없으므로 additional 객체가 정보를 맞춤 키-값 쌍으로 저장하는 데 사용됩니다.
additional 객체는 정보를 맞춤 키-값 쌍으로 저장하는 데 사용됩니다.
XML
(로그 유형: WINDOWS_DEFENDER_AV)
주요 호스트 이름에 임의 필드 추출
  • FQDN에서 호스트 이름을 추출합니다.
  • 조건부 처리principal.hostname 필드를 덮어써야 하는지 결정하는 데 사용됩니다.
  • Grok 문이 정규 표현식 (정규식)을 사용하여 hostname 필드를 추출합니다. 정규식 자체는 이름이 지정된 캡처 그룹을 사용합니다. 즉, 괄호 안에서 일치하는 것은 hostname라는 필드에 저장되며, 점(.)을 만날 때까지 하나 이상의 문자를 일치시킵니다. 이렇게 하면 FQDN 내의 hostname만 캡처됩니다.
Grok overwrite
  • 그러나 PREVIEW UDM OUTPUT을 실행하면 'LOG_PARSING_CBN_ERROR: 필드 hostname가 이미 데이터에 존재하며 덮어쓸 수 없음' 오류가 반환됩니다.
  • Grok 문 내에서 이름이 지정된 캡처 그룹은 overwrite 문을 사용하여 명시적으로 지정되지 않는 한 기존 변수를 덮어쓸 수 없습니다. 이 시나리오에서는 Grok 문에 이름이 지정된 캡처 그룹에 다른 변수 이름을 사용하거나 이 예와 같이 덮어쓰기 문을 사용하여 기존 호스트 이름 변수를 명시적으로 덮어쓸 수 있습니다.
  • Grok 기반
  • 조건부 처리는 필드를 덮어써야 하는지 결정하는 데 사용됩니다.
  • 정규 표현식 (정규식)을 사용하는 Grok 문
  • Grok overwrite

JSON 예시

다음 예는 로그 소스가 JSON 형식인 파서 확장 프로그램을 만드는 방법을 보여줍니다.

코드 없이 - 필드 추출

속성 예시:

  • 로그 소스 형식: JSON
  • 데이터 매핑 접근 방식: 노 코드
  • 로그 유형: GCP_IDS
  • 파서 확장 프로그램의 목적: 필드 추출
  • 설명:

    여러 네트워크 관련 필드가 추출되지 않습니다. 이 로그 샘플은 JSON 형식의 구조화된 로그이므로 코드 없는 (데이터 필드 매핑) 접근 방식을 사용하여 파서 확장 프로그램을 만들 수 있습니다.

    추출하려는 원본 필드는 다음과 같습니다.

    • total_packets(문자열)
    • elapsed_time(문자열)
    • total_bytes(문자열)

    다음은 샘플 원시 로그 항목입니다.

    {
    "insertId": "625a41542d64c124e7db097ae0906ccb-1@a3",
    "jsonPayload": {
      "destination_port": "80",
      "application": "incomplete",
      "ip_protocol": "tcp",
      "network": "projects/prj-p-shared-base/global/networks/shared-vpc-production",
      "start_time": "2024-10-29T21:14:59Z",
      "source_port": "41936",
      "source_ip_address": "35.191.200.157",
      "total_packets": "6",
      "elapsed_time": "0",
      "destination_ip_address": "192.168.0.11",
      "total_bytes": "412",
      "repeat_count": "1",
      "session_id": "1289742"
    },
    "resource": {
      "type": "ids.googleapis.com/Endpoint",
      "labels": {
        "resource_container": "projects/12345678910",
        "location": "europe-west4-a",
        "id": "p-europe-west4"
      }
    },
    "timestamp": "2024-10-29T21:15:21Z",
    "logName": "projects/prj-p-shared-base/logs/ids.googleapis.com%2Ftraffic",
    "receiveTimestamp": "2024-10-29T21:15:24.051990717Z"
    }
    

    이 예에서는 코드 없는 접근 방식을 사용하여 다음 데이터 필드 매핑을 사용하여 파서 확장 프로그램을 만듭니다.

    조건문 경로 조건문 연산자 조건문 값 원시 데이터 경로 대상 필드*
    jsonPayload.total_bytes NOT_EQUALS "" jsonPayload.total_bytes udm.principal.network.received_bytes
    jsonPayload.elapsed_time NOT_EQUALS "" jsonPayload.elapsed_time udm.principal.network.session_duration.seconds
    jsonPayload.total_packets NOT_EQUALS "" jsonPayload.total_packets udm.principal.network.received_packets

    파서 확장 프로그램을 실행하면 추출된 세 필드가 principal.network 객체에 추가됩니다.

    metadata.product_log_id = "625a41542d64c124e7db097ae0906ccb-1@a3"
    metadata.event_timestamp = "2024-10-29T21:14:59Z"
    metadata.event_type = "NETWORK_CONNECTION"
    metadata.vendor_name = "Google Cloud"
    metadata.product_name = "IDS"
    metadata.ingestion_labels[0].key = "label"
    metadata.ingestion_labels[0].value = "GCP_IDS"
    metadata.log_type = "GCP_IDS"
    principal.ip[0] = "35.191.200.157"
    principal.port = 41936
    principal.network.received_bytes = 412
    principal.network.session_duration.seconds = "0s"
    principal.network.received_packets = 6
    target.ip[0] = "192.168.0.11"
    target.port = 80
    target.application = "incomplete"
    observer.location.country_or_region = "EUROPE"
    observer.location.name = "europe-west4-a"
    observer.resource.name = "projects/12345678910"
    observer.resource.resource_type = "CLOUD_PROJECT"
    observer.resource.attribute.cloud.environment = "GOOGLE_CLOUD_PLATFORM"
    observer.resource.product_object_id = "p-europe-west4"
    network.ip_protocol = "TCP"
    network.session_id = "1289742"
    

No-code - 전제 조건 값이 있는 필드 추출

속성 예시:

  • 로그 소스 형식: JSON
  • 데이터 매핑 접근 방식: 노 코드
  • 로그 유형: WORKSPACE_ALERTS
  • 파서 확장 프로그램의 목적: 전제 조건 값이 있는 필드 추출
  • 설명:

    원래 파서는 DLP (데이터 손실 방지) 알림의 영향을 받는 기본 사용자의 email address를 추출하지 않습니다.

    이 예에서는 코드 없는 파서 확장 프로그램을 사용하여 email address를 추출하고 조건문을 사용하여 반복되는 UDM 필드로 정규화합니다.

    코드 없는 파서 확장 프로그램에서 반복 필드를 사용할 때 다음 작업을 실행할지 여부를 표시해야 합니다.

    • replace (기존 UDM 객체의 반복되는 필드의 모든 값 재정의)
    • append (추출된 값을 반복되는 필드에 추가).

    자세한 내용은 반복되는 입력란 섹션을 참고하세요.

    이 예에서는 정규화된 principal.user.email_address 필드의 기존 이메일 주소를 대체합니다.

    사전 조건을 사용하면 추출 작업을 실행하기 전에 조건부 검사를 실행할 수 있습니다. 대부분의 경우 전제조건 필드는 추출하려는 원시 데이터 필드와 동일한 필드이며 전제조건 연산자not Null(예: foo != "")입니다.

    하지만 위의 예와 같이 추출하려는 원시 데이터 필드 값이 일부 로그 항목에 포함되지 않은 경우도 있습니다. 이 경우 다른 사전 조건 필드를 사용하여 추출 작업을 필터링할 수 있습니다. 이 예시에서 추출하려는 원시 triggeringUserEmail 필드는 type = Data Loss Prevention인 로그에만 있습니다.

    다음은 노코드 파서 확장 프로그램 필드에 입력할 값의 예입니다.

    조건문 경로 조건문 연산자 조건문 값 원시 데이터 경로 대상 필드*
    type 같음 데이터 손실 방지 data.ruleViolationInfo.triggeringUserEmail udm.principal.user.email_addresses

    다음 예는 예시 값으로 채워진 코드 없는 파서 확장 프로그램 필드를 보여줍니다.

    image2

    파서 확장 프로그램을 실행하면 email_addressprincipal.user 객체에 추가됩니다.

    metadata.product_log_id = "Ug71LGqBr6Q="
    metadata.event_timestamp = "2022-12-18T12:17:35.154368Z"
    metadata.event_type = "USER_UNCATEGORIZED"
    metadata.vendor_name = "Google Workspace"
    metadata.product_name = "Google Workspace Alerts"
    metadata.product_event_type = "DlpRuleViolation"
    metadata.log_type = "WORKSPACE_ALERTS"
    additional.fields["resource_title"] = "bq-results-20221215-112933-1671103787123.csv"
    principal.user.email_addresses[0] = "foo.bar@altostrat.com"
    target.resource.name = "DRIVE"
    target.resource.resource_type = "STORAGE_OBJECT"
    target.resource.product_object_id = "1wLteoF3VHljS_8_ABCD_VVbhFTfcTQplJ5k1k7cL4r8"
    target.labels[0].key = "resource_title"
    target.labels[0].value = "bq-results-20221321-112933-1671103787697.csv"
    about[0].resource.resource_type = "CLOUD_ORGANIZATION"
    about[0].resource.product_object_id = "C01abcde2"
    security_result[0].about.object_reference.id = "ODU2NjEwZTItMWE2YS0xMjM0LWJjYzAtZTJlMWU2YWQzNzE3"
    security_result[0].category_details[0] = "Data Loss Prevention"
    security_result[0].rule_name = "Sensitive Projects Match"
    security_result[0].summary = "Data Loss Prevention"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "MEDIUM"
    security_result[0].rule_id = "rules/00abcdxs183abcd"
    security_result[0].action_details = "ALERT, DRIVE_WARN_ON_EXTERNAL_SHARING"
    security_result[0].alert_state = "ALERTING"
    security_result[0].detection_fields[0].key = "start_time"
    security_result[0].detection_fields[0].value = "2022-12-18T12:17:35.154368Z"
    security_result[0].detection_fields[1].key = "status"
    security_result[0].detection_fields[1].value = "NOT_STARTED"
    security_result[0].detection_fields[2].key = "trigger"
    security_result[0].detection_fields[2].value = "DRIVE_SHARE"
    security_result[0].rule_labels[0].key = "detector_name"
    security_result[0].rule_labels[0].value = "EMAIL_ADDRESS"
    network.email.to[0] = "foo.bar@altostrat.com"
    

코드 스니펫 - HTTP 사용자 에이전트 추가

속성 예시:

  • 로그 소스 형식: JSON
  • 데이터 매핑 접근 방식: 코드 스니펫
  • 로그 유형: GCP_IDS
  • 파서 확장 프로그램의 목적: HTTP 사용자 에이전트 추가
  • 설명:

    노코드 접근 방식에서 지원되지 않으므로 코드 스니펫을 사용해야 하는 비표준 UDM 객체 유형의 예입니다. 기본 파서는 Network HTTP Parser User Agent 분석을 추출하지 않습니다. 또한 일관성을 위해 다음 사항도 적용됩니다.

    1. Target HostnamerequestUrl에서 생성됩니다.
    2. 애셋 기반 별칭 지정 및 보강이 실행되도록 Namespace가 할당됩니다.
    # GCP_LOADBALANCING
    # owner: @owner
    # updated: 2022-12-23
    # Custom parser extension that:
    # 1) adds consistent Namespace 
    # 2) adds Parsed User Agent Object 
    filter {
        # Initialize placeholder
        mutate {
            replace => {
                "httpRequest.userAgent" => ""
                "httpRequest.requestUrl" => ""
            }
        }
        json {
            on_error => "not_json"
            source => "message"
            array_function => "split_columns"
        }
        if ![not_json] {
          #1 - Override Namespaces
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.target.namespace" => "TMO"
                }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.src.namespace" => "TMO"
                }
            }
            #2 - Parsed User Agent
            if [httpRequest][requestUrl]!= "" {
                grok {
                    match => {
                        "httpRequest.requestUrl" => ["\/\/(?P<_hostname>.*?)\/"]
                    }
                    on_error => "_grok_hostname_failed"
                }
                if ![_grok_hostname_failed] {
                    mutate {
                        replace => {
                            "event1.idm.read_only_udm.target.hostname" => "%{_hostname}"
                        }
                    }
                }
            }
            if [httpRequest][userAgent] != "" {
                mutate {
                    convert => {
                        "httpRequest.userAgent" => "parseduseragent"
                    }
                }
                #Map the converted "user_agent" to the new UDM field "http.parsed_user_agent".
                mutate {
                    rename => {
                        "httpRequest.userAgent" => "event1.idm.read_only_udm.network.http.parsed_user_agent"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

CSV 예

다음 예는 로그 소스가 CSV 형식인 파서 확장 프로그램을 만드는 방법을 보여줍니다.

코드 스니펫 - additional 객체에 임의 필드 추출

속성 예시:

  • 로그 소스 형식: CSV
  • 데이터 매핑 접근 방식: 코드 스니펫
  • 로그 유형: MISP_IOC
  • 파서 확장 프로그램의 목적: additional 객체에 임의 필드 추출
  • 설명:

    이 예에서는 MISP_IOC UDM 엔터티 컨텍스트 통합이 사용됩니다. additional 키-값 쌍 UDM 객체는 기본 파서에서 추출하지 않은 문맥 정보를 캡처하고 조직별 필드를 추가하는 데 사용됩니다. 예를 들어 특정 MISP 인스턴스로 돌아가는 URL입니다.

    다음은 이 예시의 CSV 기반 로그 소스입니다.

    1 9d66d38a-14e1-407f-a4d1-90b82aa1d59f
    2 3908
    3 Network activity
    4 ip-dst
    5 117.253.154.123
    6
    7
    8 1687894564
    9
    10
    11
    12
    13
    14 DigitalSide Malware report\: MD5\: 59ce0baba11893f90527fc951ac69912
    15 ORGNAME
    16 DIGITALSIDE.IT
    17 0
    18 Medium
    19 0
    20 2023-06-23
    21 tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch
    22 1698036218

    이미지

    # MISP_IOC
    # owner: @owner
    # updated: 2024-06-21
    # Custom parser extension that:
    # 1) adds a link back to internal MISP tenant 
    # 2) extracts missing fields into UDM > Entity > Additional fields
    filter {
        # Set the base URL for MISP. Remember to replace this placeholder!
        mutate {
            replace => {
                "misp_base_url" => "https://<YOUR_MISP_URL>"
            }
        }
        # Parse the CSV data from the 'message' field. Uses a comma as the separator.
        # The 'on_error' option handles lines that are not properly formatted CSV.
        csv {
            source => "message"
            separator => ","
            on_error => "broken_csv"
        }
        # If the CSV parsing was successful...
        if ![broken_csv] {
            # Rename the CSV columns to more descriptive names.
            mutate {
                rename => {
                    "column2" => "event_id"
                    "column8" => "object_timestamp"
                    "column16" => "event_source_org"
                    "column17" => "event_distribution"
                    "column19" => "event_analysis"
                    "column22" => "attribute_timestamp"
                }
            }
        }
        # Add a link to view the event in MISP, if an event ID is available.
        # "column2" => "event_id"
        if [event_id] != "" {
            mutate {
                replace => {
                    "additional_url.key" => "view_in_misp"
                    "additional_url.value.string_value" => "%{misp_base_url}/events/view/%{event_id}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_url"
                }
            }
        }
        # Add the object timestamp as an additional field, if available.
        # "column8" => "object_timestamp"
        if [object_timestamp] != "" {
            mutate {
                replace => {
                    "additional_object_timestamp.key" => "object_timestamp"
                    "additional_object_timestamp.value.string_value" => "%{object_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_object_timestamp"
                }
            }
        }
        # Add the event source organization as an additional field, if available.
        # "column16" => "event_source_org"
        if [event_source_org] != "" {
            mutate {
                replace => {
                    "additional_event_source_org.key" => "event_source_org"
                    "additional_event_source_org.value.string_value" => "%{event_source_org}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_source_org"
                }
            }
        }
        # Add the event distribution level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column17" => "event_distribution"
        if [event_distribution] != "" {
            if [event_distribution] == "0" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "YOUR_ORGANIZATION_ONLY"
                    }
                }
            } else if [event_distribution] == "1" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "THIS_COMMUNITY_ONLY"
                    }
                }
            } else if [event_distribution] == "2" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "CONNECTED_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "3" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "ALL_COMMUNITIES"
                    }
                }
            } else if [event_distribution] == "4" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "SHARING_GROUP"
                    }
                }
            } else if [event_distribution] == "5" {
                mutate {
                    replace => {
                        "additional_event_distribution.value.string_value" => "INHERIT_EVENT"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_distribution.key" => "event_distribution"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_distribution"
                }
            }
        }
        # Add the event analysis level as an additional field, if available.
        # Maps numerical values to descriptive strings.
        # "column19" => "event_analysis"
        if [event_analysis] != "" {
            if [event_analysis] == "0" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "INITIAL"
                    }
                }
            } else if [event_analysis] == "1" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "ONGOING"
                    }
                }
            } else if [event_analysis] == "2" {
                mutate {
                    replace => {
                        "additional_event_analysis.value.string_value" => "COMPLETE"
                    }
                }
            }
            mutate {
                replace => {
                    "additional_event_analysis.key" => "event_analysis"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_event_analysis"
                }
            }
        }
        # Add the attribute timestamp as an additional field, if available.
        # "column22" => "attribute_timestamp" 
        if [attribute_timestamp] != "" {
            mutate {
                replace => {
                    "additional_attribute_timestamp.key" => "attribute_timestamp"
                    "additional_attribute_timestamp.value.string_value" => "%{attribute_timestamp}"
                }
            }
            mutate {
                merge => {
                    "event.idm.entity.additional.fields" => "additional_attribute_timestamp"
                }
            }
        }
        # Finally, merge the 'event' data into the '@output' field.
        mutate {
            merge => {
                "@output" => "event"
            }
        }
    }
    

    파서 확장 프로그램을 실행하면 CSV의 맞춤 필드가 additional 객체에 추가됩니다.

    metadata.product_entity_id = "9d66d38a-14e1-407f-a4d1-90b82aa1d59f"
    metadata.collected_timestamp = "2024-10-31T15:16:08Z"
    metadata.vendor_name = "MISP"
    metadata.product_name = "MISP"
    metadata.entity_type = "IP_ADDRESS"
    metadata.description = "ip-dst"
    metadata.interval.start_time = "2023-06-27T19:36:04Z"
    metadata.interval.end_time = "9999-12-31T23:59:59Z"
    metadata.threat[0].category_details[0] = "Network activity"
    metadata.threat[0].description = "tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch - additional info: DigitalSide Malware report: MD5: 59ce0baba11893f90527fc951ac69912"
    metadata.threat[0].severity_details = "Medium"
    metadata.threat[0].threat_feed_name = "DIGITALSIDE.IT"
    entity.ip[0] = "117.253.154.123"
    additional.fields["view_in_misp"] = "https:///events/view/3908"
    additional.fields["object_timestamp"] = "1687894564"
    additional.fields["event_source_org"] = "DIGITALSIDE.IT"
    additional.fields["event_distribution"] = "YOUR_ORGANIZATION_ONLY"
    additional.fields["event_analysis"] = "INITIAL"
    additional.fields["attribute_timestamp"] = "1698036218"
    

Grok 예시

다음 예는 Grok 기반 파서 확장 프로그램을 만드는 방법을 보여줍니다.

코드 스니펫 (및 Grok) - 우선순위 및 심각도 추출

속성 예시:

  • 로그 소스 형식: Syslog
  • 데이터 매핑 접근 방식: Grok를 사용하는 코드 스니펫
  • 로그 유형: POWERSHELL
  • 파서 확장 프로그램의 목적: 우선순위 및 심각도 추출
  • 설명:

    이 예에서는 Grok 기반 파서 확장 프로그램을 만들어 Syslog Facility 및 심각도 값을 UDM 보안 결과 PrioritySeverity 필드로 추출합니다.

    filter {
        # Use grok to parse syslog messages. The on_error clause handles messages that don't match the pattern.
        grok {
            match => {
                "message" => [
                    # Extract message with syslog headers.
                    "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
                ]
            }
            on_error => "not_supported_format"
        }
        # If the grok parsing failed, tag the event as unsupported and drop it.
        if ![not_supported_format] {
            if [_syslog_priority] != "" {
                if [_syslog_priority] =~ /0|8|16|24|32|40|48|56|64|72|80|88|96|104|112|120|128|136|144|152|160|168|176|184/ {
                    mutate { replace => { "_security_result.severity_details" => "EMERGENCY" } } 
                }
                if [_syslog_priority] =~ /1|9|17|25|33|41|49|57|65|73|81|89|97|105|113|121|129|137|145|153|161|169|177|185/ {
                    mutate { replace => { "_security_result.severity_details" => "ALERT" } } 
                }
                if [_syslog_priority] =~ /2|10|18|26|34|42|50|58|66|74|82|90|98|106|114|122|130|138|146|154|162|170|178|186/ {
                    mutate { replace => { "_security_result.severity_details" => "CRITICAL" } }
                }
                if [_syslog_priority] =~ /3|11|19|27|35|43|51|59|67|75|83|91|99|107|115|123|131|139|147|155|163|171|179|187/ {
                    mutate { replace => { "_security_result.severity_details" => "ERROR" } }
                }
                if [_syslog_priority] =~ /4|12|20|28|36|44|52|60|68|76|84|92|100|108|116|124|132|140|148|156|164|172|180|188/ {
                    mutate { replace => { "_security_result.severity_details" => "WARNING" } }
                }
                if [_syslog_priority] =~ /5|13|21|29|37|45|53|61|69|77|85|93|101|109|117|125|133|141|149|157|165|173|181|189/ {
                    mutate { replace => { "_security_result.severity_details" => "NOTICE" } }
                }
                if [_syslog_priority] =~ /6|14|22|30|38|46|54|62|70|78|86|94|102|110|118|126|134|142|150|158|166|174|182|190/ {
                    mutate { replace => { "_security_result.severity_details" => "INFORMATIONAL" } }
                }
                if [_syslog_priority] =~ /7|15|23|31|39|47|55|63|71|79|87|95|103|111|119|127|135|143|151|159|167|175|183|191/ {
                    mutate { replace => { "_security_result.severity_details" => "DEBUG" } }
                }
                # Facilities (mapped to priority)
                if [_syslog_priority] =~ /0|1|2|3|4|5|6|7/ { 
                    mutate { replace => { "_security_result.priority_details" => "KERNEL" } } 
                }
                if [_syslog_priority] =~ /8|9|10|11|12|13|14|15/ { 
                    mutate { replace => { "_security_result.priority_details" => "USER" } } 
                }
                if [_syslog_priority] =~ /16|17|18|19|20|21|22|23/ { 
                    mutate { replace => { "_security_result.priority_details" => "MAIL" } } 
                }
                if [_syslog_priority] =~ /24|25|26|27|28|29|30|31/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSTEM" } } 
                }
                if [_syslog_priority] =~ /32|33|34|35|36|37|38|39/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /40|41|42|43|44|45|46|47/ { 
                    mutate { replace => { "_security_result.priority_details" => "SYSLOG" } } 
                }
                if [_syslog_priority] =~ /48|49|50|51|52|53|54|55/ { 
                    mutate { replace => { "_security_result.priority_details" => "LPD" } } 
                }
                if [_syslog_priority] =~ /56|57|58|59|60|61|62|63/ { 
                    mutate { replace => { "_security_result.priority_details" => "NNTP" } } 
                }
                if [_syslog_priority] =~ /64|65|66|67|68|69|70|71/ { 
                    mutate { replace => { "_security_result.priority_details" => "UUCP" } } 
                }
                if [_syslog_priority] =~ /72|73|74|75|76|77|78|79/ { 
                    mutate { replace => { "_security_result.priority_details" => "TIME" } } 
                }
                if [_syslog_priority] =~ /80|81|82|83|84|85|86|87/ { 
                    mutate { replace => { "_security_result.priority_details" => "SECURITY" } } 
                }
                if [_syslog_priority] =~ /88|89|90|91|92|93|94|95/ { 
                    mutate { replace => { "_security_result.priority_details" => "FTPD" } } 
                }
                if [_syslog_priority] =~ /96|97|98|99|100|101|102|103/ { 
                    mutate { replace => { "_security_result.priority_details" => "NTPD" } } 
                }
                if [_syslog_priority] =~ /104|105|106|107|108|109|110|111/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGAUDIT" } } 
                }
                if [_syslog_priority] =~ /112|113|114|115|116|117|118|119/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOGALERT" } } 
                }
                if [_syslog_priority] =~ /120|121|122|123|124|125|126|127/ { 
                    mutate { replace => { "_security_result.priority_details" => "CLOCK" } } 
                }
                if [_syslog_priority] =~ /128|129|130|131|132|133|134|135/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL0" } } 
                }
                if [_syslog_priority] =~ /136|137|138|139|140|141|142|143/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL1" } } 
                }
                if [_syslog_priority] =~ /144|145|146|147|148|149|150|151/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL2" } } 
                }
                if [_syslog_priority] =~ /152|153|154|155|156|157|158|159/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL3" } } 
                }
                if [_syslog_priority] =~ /160|161|162|163|164|165|166|167/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL4" } } 
                }
                if [_syslog_priority] =~ /168|169|170|171|172|173|174|175/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL5" } } 
                }
                if [_syslog_priority] =~ /176|177|178|179|180|181|182|183/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL6" } } 
                }
                if [_syslog_priority] =~ /184|185|186|187|188|189|190|191/ { 
                    mutate { replace => { "_security_result.priority_details" => "LOCAL7" } } 
                }
                mutate {
                    merge => {
                        "event.idm.read_only_udm.security_result" => "_security_result"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event"
                }
            }
        }
    }
    

    파서 확장 프로그램의 결과를 보면 사람이 읽을 수 있는 형식이 표시됩니다.

    metadata.product_log_id = "6161053"
    metadata.event_timestamp = "2024-10-31T15:10:10Z"
    metadata.event_type = "PROCESS_LAUNCH"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "PowerShell"
    metadata.product_event_type = "600"
    metadata.description = "Info"
    metadata.log_type = "POWERSHELL"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.resource.name = "in_powershell"
    principal.resource.resource_subtype = "im_msvistalog"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.hostname = "Default Host"
    target.process.command_line = "C:\Program Files\Microsoft Azure AD Sync\Bin\miiserver.exe"
    target.asset.hostname = "Default Host"
    target.asset.asset_id = "Host ID:bf203e94-72cf-4649-84a5-fc02baedb75f"
    security_result[0].severity_details = "INFORMATIONAL"
    security_result[0].priority_details = "USER"
    

코드 스니펫 (및 Grok) - 이벤트 장식, 임시 변수 이름, 데이터 유형 변환

속성 예시:

  • 로그 소스 형식: Syslog 헤더가 있는 JSON
  • 데이터 매핑 접근 방식: Grok를 사용하는 코드 스니펫
  • 로그 유형: WINDOWS_SYSMON
  • 파서 확장 프로그램의 목적: 이벤트, 임시 변수 이름, 데이터 유형 장식
  • 설명:

    이 예에서는 파서 확장 프로그램을 만들 때 다음 작업을 실행하는 방법을 보여줍니다.

    조건문을 기반으로 한 장식

    이 예에서는 WINDOWS_SYSMON에서 각 이벤트 유형의 의미에 관한 설명 (상황 정보)을 추가합니다. 조건문을 사용하여 EventID를 확인한 다음 Description를 추가합니다. 예를 들어 EventID 1은 Process Creation 이벤트입니다.

    JSON과 같은 추출 필터를 사용하면 원래 데이터 유형이 보존될 수 있습니다.

    다음 예에서는 EventID 값이 기본적으로 정수로 추출됩니다. 조건문EventID 값을 문자열이 아닌 정수로 평가합니다.

    if [EventID] == 1 {
      mutate {
        replace => {
          "_description" => "[1] Process creation"
        }
      }
    }
    

    데이터 유형 변환

    convert 함수를 사용하여 파서 확장 프로그램 내에서 데이터 유형을 변환할 수 있습니다.

    mutate {
      convert => {
        "EventID" => "string"
      }
      on_error => "_convert_EventID_already_string"
    }
    

    가독성을 위한 임시 변수 이름

    코드 스니펫에서 임시 변수 이름을 사용하고 나중에 최종 출력 UDM 이벤트 객체 이름과 일치하도록 이름을 바꿀 수 있습니다. 이렇게 하면 전반적인 가독성이 향상될 수 있습니다.

    다음 예에서 description 변수의 이름이 event.idm.read_only_udm.metadata.description로 바뀝습니다.

    mutate {
      rename => {
        "_description" => "event.idm.read_only_udm.metadata.description"
      }
    }
    

    반복 필드

    전체 파서 확장 프로그램은 다음과 같습니다.

    filter {
    # initialize variable
    mutate {
      replace => {
        "EventID" => ""
      }
    }
    # Use grok to parse syslog messages.
    # The on_error clause handles messages that don't match the pattern.
    grok {
      match => {
        "message" => [
          "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}"
        ]
      }
      on_error => "not_supported_format"
    }
    if ![not_supported_format] {
      json {
        source => "log_data"
        on_error => "not_json"
      }
      if ![not_json] {
        if [EventID] == 1 {
          mutate {
            replace => {
              "_description" => "[1] Process creation"
            }
          }
        }
        if [EventID] == 2 {
          mutate {
            replace => {
              "_description" => "[2] A process changed a file creation time"
            }
          }
        }
        if [EventID] == 3 {
          mutate {
            replace => {
              "_description" => "[3] Network connection"
            }
          }
        }
        if [EventID] == 4 {
          mutate {
            replace => {
              "_description" => "[4] Sysmon service state changed"
            }
          }
        }
        if [EventID] == 5 {
          mutate {
            replace => {
              "_description" => "[5] Process terminated"
            }
          }
        }
        if [EventID] == 6 {
          mutate {
            replace => {
              "_description" => "[6] Driver loaded"
            }
          }
        }
        if [EventID] == 7 {
          mutate {
            replace => {
              "_description" => "[7] Image loaded"
            }
          }
        }
        if [EventID] == 8 {
          mutate {
            replace => {
              "_description" => "[8] CreateRemoteThread"
            }
          }
        }
        if [EventID] == 9 {
          mutate {
            replace => {
              "_description" => "[9] RawAccessRead"
            }
          }
        }
        if [EventID] == 10 {
          mutate {
            replace => {
              "_description" => "[10] ProcessAccess"
            }
          }
        }
        if [EventID] == 11 {
          mutate {
            replace => {
              "_description" => "[11] FileCreate"
            }
          }
        }
        if [EventID] == 12 {
          mutate {
            replace => {
              "_description" => "[12] RegistryEvent (Object create and delete)"
            }
          }
        }
        if [EventID] == 13 {
          mutate {
            replace => {
              "_description" => "[13] RegistryEvent (Value Set)"
            }
          }
        }
        if [EventID] == 14 {
          mutate {
            replace => {
              "_description" => "[14] RegistryEvent (Key and Value Rename)"
            }
          }
        }
        if [EventID] == 15 {
          mutate {
            replace => {
              "_description" => "[15] FileCreateStreamHash"
            }
          }
        }
        if [EventID] == 16 {
          mutate {
            replace => {
              "_description" => "[16] ServiceConfigurationChange"
            }
          }
        }
        if [EventID] == 17 {
          mutate {
            replace => {
              "_description" => "[17] PipeEvent (Pipe Created)"
            }
          }
        }
        if [EventID] == 18 {
          mutate {
            replace => {
              "_description" => "[18] PipeEvent (Pipe Connected)"
            }
          }
        }
        if [EventID] == 19 {
          mutate {
            replace => {
              "_description" => "[19] WmiEvent (WmiEventFilter activity detected)"
            }
          }
        }
        if [EventID] == 20 {
          mutate {
            replace => {
              "_description" => "[20] WmiEvent (WmiEventConsumer activity detected)"
            }
          }
        }
        if [EventID] == 21 {
          mutate {
            replace => {
              "_description" => "[21] WmiEvent (WmiEventConsumerToFilter activity detected)"
            }
          }
        }
        if [EventID] == 22 {
          mutate {
            replace => {
              "_description" => "[22] DNSEvent (DNS query)"
            }
          }
        }
        if [EventID] == 23 {
          mutate {
            replace => {
              "_description" => "[23] FileDelete (File Delete archived)"
            }
          }
        }
        if [EventID] == 24 {
          mutate {
            replace => {
              "_description" => "[24] ClipboardChange (New content in the clipboard)"
            }
          }
        }
        if [EventID] == 25 {
          mutate {
            replace => {
              "_description" => "[25] ProcessTampering (Process image change)"
            }
          }
        }
        if [EventID] == 26 {
          mutate {
            replace => {
              "_description" => "[26] FileDeleteDetected (File Delete logged)"
            }
          }
        }
        if [EventID] == 255 {
          mutate {
            replace => {
              "_description" => "[255] Error"
            }
          }
        }
        mutate {
          rename => {
            "_description" => "event.idm.read_only_udm.metadata.description"
          }
        }
        statedump{}
        mutate {
          merge => {
            "@output" => "event"
          }
        }
      }
    }
    }
    

    파서 확장 프로그램을 실행하면 metadata.description 필드에 장식이 추가됩니다.

    metadata.product_log_id = "6008459"
    metadata.event_timestamp = "2024-10-31T14:41:53.442Z"
    metadata.event_type = "REGISTRY_CREATION"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Microsoft-Windows-Sysmon"
    metadata.product_event_type = "12"
    metadata.description = "[12] RegistryEvent (Object create and delete)"
    metadata.log_type = "WINDOWS_SYSMON"
    additional.fields["thread_id"] = "3972"
    additional.fields["channel"] = "Microsoft-Windows-Sysmon/Operational"
    additional.fields["Keywords"] = "-9223372036854776000"
    additional.fields["Opcode"] = "Info"
    additional.fields["ThreadID"] = "3972"
    principal.hostname = "win-adfs.lunarstiiiness.com"
    principal.user.userid = "tim.smith_admin"
    principal.user.windows_sid = "S-1-5-18"
    principal.process.pid = "6856"
    principal.process.file.full_path = "C:\Windows\system32\wsmprovhost.exe"
    principal.process.product_specific_process_id = "SYSMON:{927d35bf-a374-6495-f348-000000002900}"
    principal.administrative_domain = "LUNARSTIIINESS"
    principal.asset.hostname = "win-adfs.lunarstiiiness.com"
    target.registry.registry_key = "HKU\S-1-5-21-3263964631-4121654051-1417071188-1116\Software\Policies\Microsoft\SystemCertificates\CA\Certificates"
    observer.asset_id = "5770385F:C22A:43E0:BF4C:06F5698FFBD9"
    observer.process.pid = "2556"
    about[0].labels[0].key = "Category ID"
    about[0].labels[0].value = "RegistryEvent"
    security_result[0].rule_name = "technique_id=T1553.004,technique_name=Install Root Certificate"
    security_result[0].summary = "Registry object added or deleted"
    security_result[0].severity = "INFORMATIONAL"
    security_result[1].rule_name = "EventID: 12"
    security_result[2].summary = "12"
    

XML 예

다음 예에서는 로그 소스가 XML 형식인 파서 확장 프로그램을 만드는 방법을 보여줍니다.

코드 스니펫 - additional 객체에 임의 필드 추출

속성 예시:

  • 로그 소스 형식: XML
  • 데이터 매핑 접근 방식: 코드 스니펫
  • 로그 유형: WINDOWS_DEFENDER_AV
  • 파서 확장 프로그램의 목적: additional 객체에 임의 필드 추출
  • 설명:

    이 예시의 목표는 Platform Version 값을 추출하고 저장하는 것입니다(예: outdated platform versions에 대해 보고하고 검색할 수 있도록).

    중요한 UDM 필드 문서를 검토한 결과 적절한 표준 UDM 필드가 확인되지 않았습니다. 따라서 이 예에서는 additional 객체를 사용하여 이 정보를 맞춤 키-값 쌍으로 저장합니다.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Extracting 'Platform Version' into Additional
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/EventData/Data[@Name='Platform version']" => "platform_version"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
            # Prepare the additional field structure using a temporary variable
            mutate{
                replace => {
                    "additional_platform_version.key" => "Platform Version"
                    "additional_platform_version.value.string_value" => "%{platform_version}"
                }
                on_error => "no_platform_version"
            }
            # Merge the additional field into the event1 structure.
            if ![no_platform_version] {
                mutate {
                    merge => {
                        "event1.idm.read_only_udm.additional.fields" => "additional_platform_version"
                    }
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    PREVIEW UDM OUTPUT을 실행하면 새 필드가 추가되었음을 알 수 있습니다.

    metadata.event_timestamp = "2024-10-29T14:08:52Z"
    metadata.event_type = "STATUS_HEARTBEAT"
    metadata.vendor_name = "Microsoft"
    metadata.product_name = "Windows Defender AV"
    metadata.product_event_type = "MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description = "Endpoint Protection client health report (time in UTC)."
    metadata.log_type = "WINDOWS_DEFENDER_AV"
    additional.fields["Platform Version"] = "4.18.24080.9"
    principal.hostname = "win-dc-01.ad.1823127835827.altostrat.com"
    security_result[0].description = "EventID: 1151"
    security_result[0].action[0] = "ALLOW"
    security_result[0].severity = "LOW"
    

코드 스니펫 (및 Grok) - 기본 호스트 이름으로 임의 필드 추출

속성 예시:

  • 로그 소스 형식: XML
  • 데이터 매핑 접근 방식: Grok를 사용하는 코드 스니펫
  • 로그 유형: WINDOWS_DEFENDER_AV
  • 파서 확장 프로그램 목적: 기본 호스트 이름으로 임의 필드 추출
  • 설명:

    이 예의 목표는 FQDN에서 Hostname를 추출하고 principal.hostname 필드를 덮어쓰는 것입니다.

    이 예에서는 원시 로그 Computer name 필드에 FQDN가 포함되어 있는지 확인합니다. 이 경우 Hostname 부분만 추출하고 UDM Principal Hostname 필드를 덮어씁니다.

    파서와 중요한 UDM 필드 문서를 검토한 결과 principal.hostname 필드를 사용해야 합니다.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: Extract Hostname from FQDN and overwrite principal.hostname
    filter {
        # Uses XPath to target the specific element(s)
        xml {
            source => "message"
                xpath => {
                    "/Event/System/Computer" => "hostname"
            }
            on_error => "_xml_error"
        }
        # Conditional processing: Only proceed if XML parsing was successful
        if ![_xml_error] {
      # Extract all characters before the first dot in the hostname variable
            grok {
                match => { "hostname" => "(?<hostname>[^.]+)" }
            }
            mutate {
                replace => {
                    "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
                }
            }
            mutate {
                merge => {
                    "@output" => "event1"
                }
            }
        }
    }
    

    이 파서 확장 프로그램은 Grok 문을 사용하여 정규 표현식(정규식)을 실행하여 hostname 필드를 추출합니다. 정규식 자체는 이름이 지정된 캡처 그룹을 사용합니다. 즉, 괄호 안에서 일치하는 것은 hostname라는 필드에 저장되며, 점(.)을 만날 때까지 하나 이상의 문자를 일치시킵니다. 이렇게 하면 FQDN 내의 hostname만 캡처됩니다.

    하지만 PREVIEW UDM OUTPUT을 실행하면 오류가 반환됩니다. 그 이유는 무엇인가요?

    generic::unknown: pipeline.ParseLogEntry failed:
     LOG_PARSING_CBN_ERROR: "generic::internal: pipeline failed: filter grok (2) failed: 
    field\ "hostname\" already exists in data and is not overwritable"
    

    Grok overwrite

    Grok 문 내에서 이름이 지정된 캡처 그룹은 overwrite 문을 사용하여 명시적으로 지정되지 않는 한 기존 변수를 덮어쓸 수 없습니다. 이 시나리오에서는 Grok 문에 이름이 지정된 캡처 그룹에 다른 변수 이름을 사용하거나 다음 코드 스니펫 예와 같이 overwrite 문을 사용하여 기존 hostname 변수를 명시적으로 덮어쓸 수 있습니다.

    # Parser Extension for WINDOWS_DEFENDER_AV
    # 2024-10-29: cmmartin: Overwriting principal Hostname
    filter {
      xml {
        source => "message"
          xpath => {
            "/Event/System/Computer" => "hostname"
        }
        on_error => "_xml_error"
      }
      if ![_xml_error] {
        grok {
          match => { "hostname" => "(?<hostname>[^.]+)" }
          overwrite => ["hostname"]
          on_error => "_grok_hostname_error"
        }
        mutate {
          replace => {
            "event1.idm.read_only_udm.principal.hostname" => "%{hostname}"
          }
        }
        mutate {
          merge => {
            "@output" => "event1"
          }
        }
      }
    }
    

    PREVIEW UDM OUTPUT을 다시 실행하면 FQDN에서 hostname를 추출한 후 새 필드가 추가된 것을 확인할 수 있습니다.

    metadata.event_timestamp"2024-10-29T14:08:52Z"
    metadata.event_type"STATUS_HEARTBEAT"
    metadata.vendor_name"Microsoft"
    metadata.product_name"Windows Defender AV"
    metadata.product_event_type"MALWAREPROTECTION_SERVICE_HEALTH_REPORT"
    metadata.description"Endpoint Protection client health report (time in UTC)."
    metadata.log_type"WINDOWS_DEFENDER_AV"
    principal.hostname"win-dc-01"
    security_result[0].description"EventID: 1151"
    security_result[0].action[0]"ALLOW"
    security_result[0].severity"LOW"