파서 확장 프로그램 예
이 문서에서는 다양한 시나리오에서 파서 확장 프로그램을 만드는 예를 제공합니다. 파서 확장 프로그램에 대해 자세히 알아보려면 파서 확장 프로그램 만들기를 참고하세요.
파서 확장 프로그램 예
다음 속성 표를 사용하여 필요한 샘플 코드를 빠르게 찾으세요.
노코드 예
로그 소스 형식 | 제목 예시 | 설명 | 이 예의 파서 개념 |
---|---|---|---|
JSON (로그 유형: GCP_IDS ) |
필드 추출 | JSON 형식으로 로그에서 필드를 추출합니다. | 노코드 |
JSON (로그 유형: WORKSPACE_ALERTS ) |
전제조건 값으로 필드 추출 | JSON 형식의 로그에서 필드를 추출하고 사전 조건과 함께 반복 UDM 필드로 정규화합니다. |
|
코드 스니펫 예시
로그 소스 형식 | 제목 예시 | 설명 | 이 예의 파서 개념 |
---|---|---|---|
JSON (로그 유형: `GCP_IDS`) |
HTTP 사용자 에이전트 추가 |
|
|
CSV (로그 유형: MISP_IOC) |
additional UDM 객체로 임의 필드 추출 |
필드를 UDM > 항목 > additional UDM 객체 > 키-값 쌍으로 추출합니다. |
additional UDM 객체 |
Syslog (로그 유형: POWERSHELL) |
Syslog에서 우선순위 및 심각도 추출 | Syslog Facility 및 Severity 값을 UDM Security Result Priority 및 Severity 필드에 추출합니다. | Grok 기반 |
Syslog 헤더가 있는 JSON (로그 유형: WINDOWS_SYSMON) |
조건문을 기반으로 한 장식 |
|
|
Syslog 헤더가 있는 JSON (로그 유형: WINDOWS_SYSMON) |
데이터 유형 변환 |
|
|
Syslog 헤더가 있는 JSON (로그 유형: WINDOWS_SYSMON) |
가독성을 위한 임시 변수 이름 | 코드 스니펫에서 임시 변수 이름을 사용한 후 최종 출력 UDM 이벤트 객체 이름과 일치하도록 이름을 바꿀 수 있습니다. 이렇게 하면 전반적인 가독성을 높일 수 있습니다. |
|
Syslog 헤더가 있는 JSON (로그 유형: WINDOWS_SYSMON) |
반복 필드 | 코드 스니펫에서 반복 필드(예: security_result 필드)를 사용할 때는 주의해야 합니다. |
|
XML (로그 유형: WINDOWS_DEFENDER_AV) |
additional 객체로 임의 필드 추출 |
|
additional 객체는 정보를 맞춤 키-값 쌍으로 저장하는 데 사용됩니다. |
XML (로그 유형: WINDOWS_DEFENDER_AV) |
임의 필드를 주 호스트 이름으로 추출 |
overwrite statement
|
|
JSON, CSV, XML, Syslog, KV | 기존 매핑 삭제하기 | UDM 필드의 값을 삭제하여 기존 매핑을 삭제합니다. |
JSON 예
다음 예에서는 로그 소스가 JSON 형식인 파서 확장 프로그램을 만드는 방법을 보여줍니다.
노코드 - 필드 추출
예시 속성:
- 로그 소스 형식: JSON
- 데이터 매핑 접근 방식: 노 코드
- 로그 유형: GCP_IDS
- 파서 확장 프로그램 목적: 필드 추출
설명:
네트워크 관련 필드가 추출되지 않습니다. 이 로그 샘플은 JSON 형식의 구조화된 로그이므로 코드 없음 (데이터 필드 매핑) 접근 방식을 사용하여 파서 확장 프로그램을 만들 수 있습니다.
추출하려는 원래 필드는 다음과 같습니다.
total_packets
(문자열)elapsed_time
(문자열)total_bytes
(문자열)
다음은 샘플 원시 로그 항목입니다.
{ "insertId": "625a41542d64c124e7db097ae0906ccb-1@a3", "jsonPayload": { "destination_port": "80", "application": "incomplete", "ip_protocol": "tcp", "network": "projects/prj-p-shared-base/global/networks/shared-vpc-production", "start_time": "2024-10-29T21:14:59Z", "source_port": "41936", "source_ip_address": "35.191.200.157", "total_packets": "6", "elapsed_time": "0", "destination_ip_address": "192.168.0.11", "total_bytes": "412", "repeat_count": "1", "session_id": "1289742" }, "resource": { "type": "ids.googleapis.com/Endpoint", "labels": { "resource_container": "projects/12345678910", "location": "europe-west4-a", "id": "p-europe-west4" } }, "timestamp": "2024-10-29T21:15:21Z", "logName": "projects/prj-p-shared-base/logs/ids.googleapis.com%2Ftraffic", "receiveTimestamp": "2024-10-29T21:15:24.051990717Z" }
이 예에서는 코드 없음 접근 방식을 사용하여 다음 데이터 필드 매핑을 사용하는 파서 확장 프로그램을 만듭니다.
Precondition Path 사전 조건 연산자 Precondition Value 원시 데이터 경로 대상 필드* jsonPayload.total_bytes
NOT_EQUALS "" jsonPayload.total_bytes
udm.principal.network.received_bytes
jsonPayload.elapsed_time
NOT_EQUALS "" jsonPayload.elapsed_time
udm.principal.network.session_duration.seconds
jsonPayload.total_packets
NOT_EQUALS "" jsonPayload.total_packets
udm.principal.network.received_packets
파서 확장 프로그램을 실행하면 추출된 세 필드가
principal.network
객체에 추가됩니다.metadata.product_log_id = "625a41542d64c124e7db097ae0906ccb-1@a3" metadata.event_timestamp = "2024-10-29T21:14:59Z" metadata.event_type = "NETWORK_CONNECTION" metadata.vendor_name = "Google Cloud" metadata.product_name = "IDS" metadata.ingestion_labels[0].key = "label" metadata.ingestion_labels[0].value = "GCP_IDS" metadata.log_type = "GCP_IDS" principal.ip[0] = "35.191.200.157" principal.port = 41936 principal.network.received_bytes = 412 principal.network.session_duration.seconds = "0s" principal.network.received_packets = 6 target.ip[0] = "192.168.0.11" target.port = 80 target.application = "incomplete" observer.location.country_or_region = "EUROPE" observer.location.name = "europe-west4-a" observer.resource.name = "projects/12345678910" observer.resource.resource_type = "CLOUD_PROJECT" observer.resource.attribute.cloud.environment = "GOOGLE_CLOUD_PLATFORM" observer.resource.product_object_id = "p-europe-west4" network.ip_protocol = "TCP" network.session_id = "1289742"
코드 없음 - 전제 조건 값으로 필드 추출
예시 속성:
- 로그 소스 형식: JSON
- 데이터 매핑 접근 방식: 노 코드
- 로그 유형: WORKSPACE_ALERTS
- 파서 확장 프로그램 목적: 전제조건 값으로 필드 추출
설명:
원래 파서는 DLP (데이터 손실 방지) 알림의 영향을 받는 기본 사용자의
email address
를 추출하지 않습니다.이 예에서는 노코드 파서 확장 프로그램을 사용하여
email address
를 추출하고 사전 조건을 사용하여 반복 UDM 필드로 정규화합니다.노코드 파서 확장 프로그램에서 반복 필드를 사용할 때는 다음 중 원하는 작업을 표시해야 합니다.
- replace (기존 UDM 객체에서 반복 필드의 모든 값을 재정의)
- append (추출된 값을 반복 필드에 추가)
자세한 내용은 반복되는 입력란 섹션을 참고하세요.
이 예에서는 정규화된
principal.user.email_address
필드의 기존 이메일 주소를 바꿉니다.사전 조건을 사용하면 추출 작업을 수행하기 전에 조건부 검사를 실행할 수 있습니다. 대부분의 경우 전제조건 필드는 추출하려는 원시 데이터 필드와 동일한 필드이며 전제조건 연산자는
not Null
(예:foo != ""
)입니다.하지만 예와 같이 추출하려는 원시 데이터 필드 값이 일부 로그 항목에 없을 수도 있습니다. 이 경우 다른 사전 조건 필드를 사용하여 추출 작업을 필터링할 수 있습니다. 이 예에서 추출하려는 원시
triggeringUserEmail
필드는type = Data Loss Prevention
인 로그에만 있습니다.코드 없음 파서 확장 프로그램 필드에 입력할 값의 예는 다음과 같습니다.
Precondition Path 사전 조건 연산자 Precondition Value 원시 데이터 경로 대상 필드* type
같음 데이터 손실 방지 data.ruleViolationInfo.triggeringUserEmail
udm.principal.user.email_addresses
다음 예시에서는 예시 값으로 채워진 코드 없음 파서 확장 프로그램 필드를 보여줍니다.
파서 확장 프로그램을 실행하면
email_address
이principal.user
객체에 추가됩니다.metadata.product_log_id = "Ug71LGqBr6Q=" metadata.event_timestamp = "2022-12-18T12:17:35.154368Z" metadata.event_type = "USER_UNCATEGORIZED" metadata.vendor_name = "Google Workspace" metadata.product_name = "Google Workspace Alerts" metadata.product_event_type = "DlpRuleViolation" metadata.log_type = "WORKSPACE_ALERTS" additional.fields["resource_title"] = "bq-results-20221215-112933-1671103787123.csv" principal.user.email_addresses[0] = "foo.bar@altostrat.com" target.resource.name = "DRIVE" target.resource.resource_type = "STORAGE_OBJECT" target.resource.product_object_id = "1wLteoF3VHljS_8_ABCD_VVbhFTfcTQplJ5k1k7cL4r8" target.labels[0].key = "resource_title" target.labels[0].value = "bq-results-20221321-112933-1671103787697.csv" about[0].resource.resource_type = "CLOUD_ORGANIZATION" about[0].resource.product_object_id = "C01abcde2" security_result[0].about.object_reference.id = "ODU2NjEwZTItMWE2YS0xMjM0LWJjYzAtZTJlMWU2YWQzNzE3" security_result[0].category_details[0] = "Data Loss Prevention" security_result[0].rule_name = "Sensitive Projects Match" security_result[0].summary = "Data Loss Prevention" security_result[0].action[0] = "ALLOW" security_result[0].severity = "MEDIUM" security_result[0].rule_id = "rules/00abcdxs183abcd" security_result[0].action_details = "ALERT, DRIVE_WARN_ON_EXTERNAL_SHARING" security_result[0].alert_state = "ALERTING" security_result[0].detection_fields[0].key = "start_time" security_result[0].detection_fields[0].value = "2022-12-18T12:17:35.154368Z" security_result[0].detection_fields[1].key = "status" security_result[0].detection_fields[1].value = "NOT_STARTED" security_result[0].detection_fields[2].key = "trigger" security_result[0].detection_fields[2].value = "DRIVE_SHARE" security_result[0].rule_labels[0].key = "detector_name" security_result[0].rule_labels[0].value = "EMAIL_ADDRESS" network.email.to[0] = "foo.bar@altostrat.com"
코드 스니펫 - HTTP 사용자 에이전트 추가
예시 속성:
- 로그 소스 형식: JSON
- 데이터 매핑 접근 방식: 코드 스니펫
- 로그 유형: GCP_IDS
- 파서 확장 프로그램 목적: HTTP 사용자 에이전트 추가
설명:
이는 노 코드 접근 방식에서 지원하지 않는 비표준 UDM 객체 유형의 예이며 따라서 코드 스니펫을 사용해야 합니다. 기본 파서는
Network HTTP Parser User Agent
분석을 추출하지 않습니다. 또한 일관성을 위해 다음을 충족해야 합니다.requestUrl
에서Target Hostname
가 생성됩니다.- 애셋 기반 별칭 지정 및 보강이 실행되도록
Namespace
가 할당됩니다.
# GCP_LOADBALANCING # owner: @owner # updated: 2022-12-23 # Custom parser extension that: # 1) adds consistent Namespace # 2) adds Parsed User Agent Object filter { # Initialize placeholder mutate { replace => { "httpRequest.userAgent" => "" "httpRequest.requestUrl" => "" } } json { on_error => "not_json" source => "message" array_function => "split_columns" } if ![not_json] { #1 - Override Namespaces mutate { replace => { "event1.idm.read_only_udm.principal.namespace" => "TMO" } } mutate { replace => { "event1.idm.read_only_udm.target.namespace" => "TMO" } } mutate { replace => { "event1.idm.read_only_udm.src.namespace" => "TMO" } } #2 - Parsed User Agent if [httpRequest][requestUrl]!= "" { grok { match => { "httpRequest.requestUrl" => ["\/\/(?P<_hostname>.*?)\/"] } on_error => "_grok_hostname_failed" } if ![_grok_hostname_failed] { mutate { replace => { "event1.idm.read_only_udm.target.hostname" => "%{_hostname}" } } } } if [httpRequest][userAgent] != "" { mutate { convert => { "httpRequest.userAgent" => "parseduseragent" } } #Map the converted "user_agent" to the new UDM field "http.parsed_user_agent". mutate { rename => { "httpRequest.userAgent" => "event1.idm.read_only_udm.network.http.parsed_user_agent" } } } mutate { merge => { "@output" => "event1" } } } }
CSV 예시
다음 예에서는 로그 소스가 CSV 형식인 파서 확장 프로그램을 만드는 방법을 보여줍니다.
코드 스니펫 - 임의 필드를 additional
객체로 추출
예시 속성:
- 로그 소스 형식: CSV
- 데이터 매핑 접근 방식: 코드 스니펫
- 로그 유형: MISP_IOC
- 파서 확장 프로그램의 목적:
additional
객체로의 임의 필드 추출 설명:
이 예에서는 MISP_IOC UDM 엔티티 컨텍스트 통합이 사용됩니다.
additional
키-값 쌍 UDM 객체는 기본 파서에서 추출되지 않은 컨텍스트 정보를 캡처하고 조직별 필드를 추가하는 데 사용됩니다. 예를 들어 특정 MISP 인스턴스로 돌아가는 URL입니다.이 예의 CSV 기반 로그 소스는 다음과 같습니다.
1
9d66d38a-14e1-407f-a4d1-90b82aa1d59f
2
3908
3
Network activity
4
ip-dst
5
117.253.154.123
6
7
8
1687894564
9
10
11
12
13
14
DigitalSide Malware report\: MD5\: 59ce0baba11893f90527fc951ac69912
15
ORGNAME
16
DIGITALSIDE.IT
17
0
18
Medium
19
0
20
2023-06-23
21
tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch
22
1698036218
# MISP_IOC # owner: @owner # updated: 2024-06-21 # Custom parser extension that: # 1) adds a link back to internal MISP tenant # 2) extracts missing fields into UDM > Entity > Additional fields filter { # Set the base URL for MISP. Remember to replace this placeholder! mutate { replace => { "misp_base_url" => "https://<YOUR_MISP_URL>" } } # Parse the CSV data from the 'message' field. Uses a comma as the separator. # The 'on_error' option handles lines that are not properly formatted CSV. csv { source => "message" separator => "," on_error => "broken_csv" } # If the CSV parsing was successful... if ![broken_csv] { # Rename the CSV columns to more descriptive names. mutate { rename => { "column2" => "event_id" "column8" => "object_timestamp" "column16" => "event_source_org" "column17" => "event_distribution" "column19" => "event_analysis" "column22" => "attribute_timestamp" } } } # Add a link to view the event in MISP, if an event ID is available. # "column2" => "event_id" if [event_id] != "" { mutate { replace => { "additional_url.key" => "view_in_misp" "additional_url.value.string_value" => "%{misp_base_url}/events/view/%{event_id}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_url" } } } # Add the object timestamp as an additional field, if available. # "column8" => "object_timestamp" if [object_timestamp] != "" { mutate { replace => { "additional_object_timestamp.key" => "object_timestamp" "additional_object_timestamp.value.string_value" => "%{object_timestamp}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_object_timestamp" } } } # Add the event source organization as an additional field, if available. # "column16" => "event_source_org" if [event_source_org] != "" { mutate { replace => { "additional_event_source_org.key" => "event_source_org" "additional_event_source_org.value.string_value" => "%{event_source_org}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_event_source_org" } } } # Add the event distribution level as an additional field, if available. # Maps numerical values to descriptive strings. # "column17" => "event_distribution" if [event_distribution] != "" { if [event_distribution] == "0" { mutate { replace => { "additional_event_distribution.value.string_value" => "YOUR_ORGANIZATION_ONLY" } } } else if [event_distribution] == "1" { mutate { replace => { "additional_event_distribution.value.string_value" => "THIS_COMMUNITY_ONLY" } } } else if [event_distribution] == "2" { mutate { replace => { "additional_event_distribution.value.string_value" => "CONNECTED_COMMUNITIES" } } } else if [event_distribution] == "3" { mutate { replace => { "additional_event_distribution.value.string_value" => "ALL_COMMUNITIES" } } } else if [event_distribution] == "4" { mutate { replace => { "additional_event_distribution.value.string_value" => "SHARING_GROUP" } } } else if [event_distribution] == "5" { mutate { replace => { "additional_event_distribution.value.string_value" => "INHERIT_EVENT" } } } mutate { replace => { "additional_event_distribution.key" => "event_distribution" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_event_distribution" } } } # Add the event analysis level as an additional field, if available. # Maps numerical values to descriptive strings. # "column19" => "event_analysis" if [event_analysis] != "" { if [event_analysis] == "0" { mutate { replace => { "additional_event_analysis.value.string_value" => "INITIAL" } } } else if [event_analysis] == "1" { mutate { replace => { "additional_event_analysis.value.string_value" => "ONGOING" } } } else if [event_analysis] == "2" { mutate { replace => { "additional_event_analysis.value.string_value" => "COMPLETE" } } } mutate { replace => { "additional_event_analysis.key" => "event_analysis" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_event_analysis" } } } # Add the attribute timestamp as an additional field, if available. # "column22" => "attribute_timestamp" if [attribute_timestamp] != "" { mutate { replace => { "additional_attribute_timestamp.key" => "attribute_timestamp" "additional_attribute_timestamp.value.string_value" => "%{attribute_timestamp}" } } mutate { merge => { "event.idm.entity.additional.fields" => "additional_attribute_timestamp" } } } # Finally, merge the 'event' data into the '@output' field. mutate { merge => { "@output" => "event" } } }
파서 확장 프로그램을 실행하면 CSV의 맞춤 필드가
additional
객체에 추가됩니다.metadata.product_entity_id = "9d66d38a-14e1-407f-a4d1-90b82aa1d59f" metadata.collected_timestamp = "2024-10-31T15:16:08Z" metadata.vendor_name = "MISP" metadata.product_name = "MISP" metadata.entity_type = "IP_ADDRESS" metadata.description = "ip-dst" metadata.interval.start_time = "2023-06-27T19:36:04Z" metadata.interval.end_time = "9999-12-31T23:59:59Z" metadata.threat[0].category_details[0] = "Network activity" metadata.threat[0].description = "tlp:white,type:OSINT,source:DigitalSide.IT,source:urlhaus.abuse.ch - additional info: DigitalSide Malware report: MD5: 59ce0baba11893f90527fc951ac69912" metadata.threat[0].severity_details = "Medium" metadata.threat[0].threat_feed_name = "DIGITALSIDE.IT" entity.ip[0] = "117.253.154.123" additional.fields["view_in_misp"] = "https://
/events/view/3908" additional.fields["object_timestamp"] = "1687894564" additional.fields["event_source_org"] = "DIGITALSIDE.IT" additional.fields["event_distribution"] = "YOUR_ORGANIZATION_ONLY" additional.fields["event_analysis"] = "INITIAL" additional.fields["attribute_timestamp"] = "1698036218"
Grok 예
다음 예에서는 Grok 기반 파서 확장 프로그램을 만드는 방법을 보여줍니다.
코드 스니펫 (및 Grok) - 우선순위 및 심각도 추출
예시 속성:
- 로그 소스 형식: Syslog
- 데이터 매핑 접근 방식: Grok을 사용하는 코드 스니펫
- 로그 유형: POWERSHELL
- 파서 확장 프로그램 목적: 우선순위 및 심각도 추출
설명:
이 예에서는 Grok 기반 파서 확장 프로그램을 만들어 Syslog Facility 및 Severity 값을 UDM 보안 결과
Priority
및Severity
필드에 추출합니다.filter { # Use grok to parse syslog messages. The on_error clause handles messages that don't match the pattern. grok { match => { "message" => [ # Extract message with syslog headers. "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}" ] } on_error => "not_supported_format" } # If the grok parsing failed, tag the event as unsupported and drop it. if ![not_supported_format] { if [_syslog_priority] != "" { if [_syslog_priority] =~ /0|8|16|24|32|40|48|56|64|72|80|88|96|104|112|120|128|136|144|152|160|168|176|184/ { mutate { replace => { "_security_result.severity_details" => "EMERGENCY" } } } if [_syslog_priority] =~ /1|9|17|25|33|41|49|57|65|73|81|89|97|105|113|121|129|137|145|153|161|169|177|185/ { mutate { replace => { "_security_result.severity_details" => "ALERT" } } } if [_syslog_priority] =~ /2|10|18|26|34|42|50|58|66|74|82|90|98|106|114|122|130|138|146|154|162|170|178|186/ { mutate { replace => { "_security_result.severity_details" => "CRITICAL" } } } if [_syslog_priority] =~ /3|11|19|27|35|43|51|59|67|75|83|91|99|107|115|123|131|139|147|155|163|171|179|187/ { mutate { replace => { "_security_result.severity_details" => "ERROR" } } } if [_syslog_priority] =~ /4|12|20|28|36|44|52|60|68|76|84|92|100|108|116|124|132|140|148|156|164|172|180|188/ { mutate { replace => { "_security_result.severity_details" => "WARNING" } } } if [_syslog_priority] =~ /5|13|21|29|37|45|53|61|69|77|85|93|101|109|117|125|133|141|149|157|165|173|181|189/ { mutate { replace => { "_security_result.severity_details" => "NOTICE" } } } if [_syslog_priority] =~ /6|14|22|30|38|46|54|62|70|78|86|94|102|110|118|126|134|142|150|158|166|174|182|190/ { mutate { replace => { "_security_result.severity_details" => "INFORMATIONAL" } } } if [_syslog_priority] =~ /7|15|23|31|39|47|55|63|71|79|87|95|103|111|119|127|135|143|151|159|167|175|183|191/ { mutate { replace => { "_security_result.severity_details" => "DEBUG" } } } # Facilities (mapped to priority) if [_syslog_priority] =~ /0|1|2|3|4|5|6|7/ { mutate { replace => { "_security_result.priority_details" => "KERNEL" } } } if [_syslog_priority] =~ /8|9|10|11|12|13|14|15/ { mutate { replace => { "_security_result.priority_details" => "USER" } } } if [_syslog_priority] =~ /16|17|18|19|20|21|22|23/ { mutate { replace => { "_security_result.priority_details" => "MAIL" } } } if [_syslog_priority] =~ /24|25|26|27|28|29|30|31/ { mutate { replace => { "_security_result.priority_details" => "SYSTEM" } } } if [_syslog_priority] =~ /32|33|34|35|36|37|38|39/ { mutate { replace => { "_security_result.priority_details" => "SECURITY" } } } if [_syslog_priority] =~ /40|41|42|43|44|45|46|47/ { mutate { replace => { "_security_result.priority_details" => "SYSLOG" } } } if [_syslog_priority] =~ /48|49|50|51|52|53|54|55/ { mutate { replace => { "_security_result.priority_details" => "LPD" } } } if [_syslog_priority] =~ /56|57|58|59|60|61|62|63/ { mutate { replace => { "_security_result.priority_details" => "NNTP" } } } if [_syslog_priority] =~ /64|65|66|67|68|69|70|71/ { mutate { replace => { "_security_result.priority_details" => "UUCP" } } } if [_syslog_priority] =~ /72|73|74|75|76|77|78|79/ { mutate { replace => { "_security_result.priority_details" => "TIME" } } } if [_syslog_priority] =~ /80|81|82|83|84|85|86|87/ { mutate { replace => { "_security_result.priority_details" => "SECURITY" } } } if [_syslog_priority] =~ /88|89|90|91|92|93|94|95/ { mutate { replace => { "_security_result.priority_details" => "FTPD" } } } if [_syslog_priority] =~ /96|97|98|99|100|101|102|103/ { mutate { replace => { "_security_result.priority_details" => "NTPD" } } } if [_syslog_priority] =~ /104|105|106|107|108|109|110|111/ { mutate { replace => { "_security_result.priority_details" => "LOGAUDIT" } } } if [_syslog_priority] =~ /112|113|114|115|116|117|118|119/ { mutate { replace => { "_security_result.priority_details" => "LOGALERT" } } } if [_syslog_priority] =~ /120|121|122|123|124|125|126|127/ { mutate { replace => { "_security_result.priority_details" => "CLOCK" } } } if [_syslog_priority] =~ /128|129|130|131|132|133|134|135/ { mutate { replace => { "_security_result.priority_details" => "LOCAL0" } } } if [_syslog_priority] =~ /136|137|138|139|140|141|142|143/ { mutate { replace => { "_security_result.priority_details" => "LOCAL1" } } } if [_syslog_priority] =~ /144|145|146|147|148|149|150|151/ { mutate { replace => { "_security_result.priority_details" => "LOCAL2" } } } if [_syslog_priority] =~ /152|153|154|155|156|157|158|159/ { mutate { replace => { "_security_result.priority_details" => "LOCAL3" } } } if [_syslog_priority] =~ /160|161|162|163|164|165|166|167/ { mutate { replace => { "_security_result.priority_details" => "LOCAL4" } } } if [_syslog_priority] =~ /168|169|170|171|172|173|174|175/ { mutate { replace => { "_security_result.priority_details" => "LOCAL5" } } } if [_syslog_priority] =~ /176|177|178|179|180|181|182|183/ { mutate { replace => { "_security_result.priority_details" => "LOCAL6" } } } if [_syslog_priority] =~ /184|185|186|187|188|189|190|191/ { mutate { replace => { "_security_result.priority_details" => "LOCAL7" } } } mutate { merge => { "event.idm.read_only_udm.security_result" => "_security_result" } } } mutate { merge => { "@output" => "event" } } } }
파서 확장 프로그램의 결과를 보면 사람이 읽을 수 있는 형식이 표시됩니다.
metadata.product_log_id = "6161053" metadata.event_timestamp = "2024-10-31T15:10:10Z" metadata.event_type = "PROCESS_LAUNCH" metadata.vendor_name = "Microsoft" metadata.product_name = "PowerShell" metadata.product_event_type = "600" metadata.description = "Info" metadata.log_type = "POWERSHELL" principal.hostname = "win-adfs.lunarstiiiness.com" principal.resource.name = "in_powershell" principal.resource.resource_subtype = "im_msvistalog" principal.asset.hostname = "win-adfs.lunarstiiiness.com" target.hostname = "Default Host" target.process.command_line = "C:\Program Files\Microsoft Azure AD Sync\Bin\miiserver.exe" target.asset.hostname = "Default Host" target.asset.asset_id = "Host ID:bf203e94-72cf-4649-84a5-fc02baedb75f" security_result[0].severity_details = "INFORMATIONAL" security_result[0].priority_details = "USER"
코드 스니펫 (및 Grok) - 이벤트 데코레이션, 임시 변수 이름, 데이터 유형 변환
예시 속성:
- 로그 소스 형식: Syslog 헤더가 있는 JSON
- 데이터 매핑 접근 방식: Grok을 사용하는 코드 스니펫
- 로그 유형: WINDOWS_SYSMON
- 파서 확장 프로그램 목적: 이벤트 데코레이션, 임시 변수 이름, 데이터 유형
설명:
이 예에서는 파서 확장 프로그램을 만들 때 다음 작업을 수행하는 방법을 보여줍니다.
- 조건문에 기반한 데코레이션 및 코드 스니펫 내 데이터 유형 이해
- 데이터 유형 변환
- 가독성을 위한 임시 변수 이름
- 반복 필드
조건문에 기반한 장식
이 예에서는 WINDOWS_SYSMON에서 각 이벤트 유형이 무엇을 의미하는지 설명하는 (컨텍스트 정보)를 추가합니다. 조건문을 사용하여 EventID를 확인한 다음
Description
를 추가합니다. 예를 들어EventID
1은Process Creation
이벤트입니다.추출 필터(예: JSON)를 사용하는 경우 원래 데이터 유형이 유지될 수 있습니다.
다음 예시에서는
EventID
값이 기본적으로 정수로 추출됩니다. 조건문은EventID
값을 문자열이 아닌 정수로 평가합니다.if [EventID] == 1 { mutate { replace => { "_description" => "[1] Process creation" } } }
데이터 유형 변환
변환 함수를 사용하여 파서 확장 프로그램 내에서 데이터 유형을 변환할 수 있습니다.
mutate { convert => { "EventID" => "string" } on_error => "_convert_EventID_already_string" }
가독성을 위한 임시 변수 이름
코드 스니펫에서 임시 변수 이름을 사용한 후 최종 출력 UDM 이벤트 객체 이름과 일치하도록 이름을 바꿀 수 있습니다. 이렇게 하면 전반적인 가독성이 향상됩니다.
다음 예시에서는
description
변수의 이름이event.idm.read_only_udm.metadata.description
로 변경됩니다.mutate { rename => { "_description" => "event.idm.read_only_udm.metadata.description" } }
반복 필드
완전한 파서 확장 프로그램은 다음과 같습니다.
filter { # initialize variable mutate { replace => { "EventID" => "" } } # Use grok to parse syslog messages. # The on_error clause handles messages that don't match the pattern. grok { match => { "message" => [ "(<%{POSINT:_syslog_priority}>)%{SYSLOGTIMESTAMP:datetime} %{DATA:logginghost}: %{GREEDYDATA:log_data}" ] } on_error => "not_supported_format" } if ![not_supported_format] { json { source => "log_data" on_error => "not_json" } if ![not_json] { if [EventID] == 1 { mutate { replace => { "_description" => "[1] Process creation" } } } if [EventID] == 2 { mutate { replace => { "_description" => "[2] A process changed a file creation time" } } } if [EventID] == 3 { mutate { replace => { "_description" => "[3] Network connection" } } } if [EventID] == 4 { mutate { replace => { "_description" => "[4] Sysmon service state changed" } } } if [EventID] == 5 { mutate { replace => { "_description" => "[5] Process terminated" } } } if [EventID] == 6 { mutate { replace => { "_description" => "[6] Driver loaded" } } } if [EventID] == 7 { mutate { replace => { "_description" => "[7] Image loaded" } } } if [EventID] == 8 { mutate { replace => { "_description" => "[8] CreateRemoteThread" } } } if [EventID] == 9 { mutate { replace => { "_description" => "[9] RawAccessRead" } } } if [EventID] == 10 { mutate { replace => { "_description" => "[10] ProcessAccess" } } } if [EventID] == 11 { mutate { replace => { "_description" => "[11] FileCreate" } } } if [EventID] == 12 { mutate { replace => { "_description" => "[12] RegistryEvent (Object create and delete)" } } } if [EventID] == 13 { mutate { replace => { "_description" => "[13] RegistryEvent (Value Set)" } } } if [EventID] == 14 { mutate { replace => { "_description" => "[14] RegistryEvent (Key and Value Rename)" } } } if [EventID] == 15 { mutate { replace => { "_description" => "[15] FileCreateStreamHash" } } } if [EventID] == 16 { mutate { replace => { "_description" => "[16] ServiceConfigurationChange" } } } if [EventID] == 17 { mutate { replace => { "_description" => "[17] PipeEvent (Pipe Created)" } } } if [EventID] == 18 { mutate { replace => { "_description" => "[18] PipeEvent (Pipe Connected)" } } } if [EventID] == 19 { mutate { replace => { "_description" => "[19] WmiEvent (WmiEventFilter activity detected)" } } } if [EventID] == 20 { mutate { replace => { "_description" => "[20] WmiEvent (WmiEventConsumer activity detected)" } } } if [EventID] == 21 { mutate { replace => { "_description" => "[21] WmiEvent (WmiEventConsumerToFilter activity detected)" } } } if [EventID] == 22 { mutate { replace => { "_description" => "[22] DNSEvent (DNS query)" } } } if [EventID] == 23 { mutate { replace => { "_description" => "[23] FileDelete (File Delete archived)" } } } if [EventID] == 24 { mutate { replace => { "_description" => "[24] ClipboardChange (New content in the clipboard)" } } } if [EventID] == 25 { mutate { replace => { "_description" => "[25] ProcessTampering (Process image change)" } } } if [EventID] == 26 { mutate { replace => { "_description" => "[26] FileDeleteDetected (File Delete logged)" } } } if [EventID] == 255 { mutate { replace => { "_description" => "[255] Error" } } } mutate { rename => { "_description" => "event.idm.read_only_udm.metadata.description" } } statedump{} mutate { merge => { "@output" => "event" } } } } }
파서 확장 프로그램을 실행하면 장식이
metadata.description
필드에 추가됩니다.metadata.product_log_id = "6008459" metadata.event_timestamp = "2024-10-31T14:41:53.442Z" metadata.event_type = "REGISTRY_CREATION" metadata.vendor_name = "Microsoft" metadata.product_name = "Microsoft-Windows-Sysmon" metadata.product_event_type = "12" metadata.description = "[12] RegistryEvent (Object create and delete)" metadata.log_type = "WINDOWS_SYSMON" additional.fields["thread_id"] = "3972" additional.fields["channel"] = "Microsoft-Windows-Sysmon/Operational" additional.fields["Keywords"] = "-9223372036854776000" additional.fields["Opcode"] = "Info" additional.fields["ThreadID"] = "3972" principal.hostname = "win-adfs.lunarstiiiness.com" principal.user.userid = "tim.smith_admin" principal.user.windows_sid = "S-1-5-18" principal.process.pid = "6856" principal.process.file.full_path = "C:\Windows\system32\wsmprovhost.exe" principal.process.product_specific_process_id = "SYSMON:{927d35bf-a374-6495-f348-000000002900}" principal.administrative_domain = "LUNARSTIIINESS" principal.asset.hostname = "win-adfs.lunarstiiiness.com" target.registry.registry_key = "HKU\S-1-5-21-3263964631-4121654051-1417071188-1116\Software\Policies\Microsoft\SystemCertificates\CA\Certificates" observer.asset_id = "5770385F:C22A:43E0:BF4C:06F5698FFBD9" observer.process.pid = "2556" about[0].labels[0].key = "Category ID" about[0].labels[0].value = "RegistryEvent" security_result[0].rule_name = "technique_id=T1553.004,technique_name=Install Root Certificate" security_result[0].summary = "Registry object added or deleted" security_result[0].severity = "INFORMATIONAL" security_result[1].rule_name = "EventID: 12" security_result[2].summary = "12"
XML 예
다음 예에서는 로그 소스가 XML 형식인 파서 확장 프로그램을 만드는 방법을 보여줍니다.
코드 스니펫 - additional
객체로 임의 필드 추출
예시 속성:
- 로그 소스 형식: XML
- 데이터 매핑 접근 방식: 코드 스니펫
- 로그 유형: WINDOWS_DEFENDER_AV
- 파서 확장 프로그램 용도:
additional
객체로의 임의 필드 추출 설명:
이 예의 목표는
Platform Version
값을 추출하고 저장하는 것입니다. 예를 들어outdated platform versions
에 대해 보고하고 검색할 수 있습니다.중요한 UDM 필드 문서를 검토한 결과 적합한 표준 UDM 필드가 확인되지 않았습니다. 따라서 이 예에서는
additional
객체를 사용하여 이 정보를 맞춤 키-값 쌍으로 저장합니다.# Parser Extension for WINDOWS_DEFENDER_AV # 2024-10-29: Extracting 'Platform Version' into Additional filter { # Uses XPath to target the specific element(s) xml { source => "message" xpath => { "/Event/EventData/Data[@Name='Platform version']" => "platform_version" } on_error => "_xml_error" } # Conditional processing: Only proceed if XML parsing was successful if ![_xml_error] { # Prepare the additional field structure using a temporary variable mutate{ replace => { "additional_platform_version.key" => "Platform Version" "additional_platform_version.value.string_value" => "%{platform_version}" } on_error => "no_platform_version" } # Merge the additional field into the event1 structure. if ![no_platform_version] { mutate { merge => { "event1.idm.read_only_udm.additional.fields" => "additional_platform_version" } } } mutate { merge => { "@output" => "event1" } } } }
'PREVIEW UDM OUTPUT'을 실행하면 새 필드가 성공적으로 추가되었음을 확인할 수 있습니다.
metadata.event_timestamp = "2024-10-29T14:08:52Z" metadata.event_type = "STATUS_HEARTBEAT" metadata.vendor_name = "Microsoft" metadata.product_name = "Windows Defender AV" metadata.product_event_type = "MALWAREPROTECTION_SERVICE_HEALTH_REPORT" metadata.description = "Endpoint Protection client health report (time in UTC)." metadata.log_type = "WINDOWS_DEFENDER_AV" additional.fields["Platform Version"] = "4.18.24080.9" principal.hostname = "win-dc-01.ad.1823127835827.altostrat.com" security_result[0].description = "EventID: 1151" security_result[0].action[0] = "ALLOW" security_result[0].severity = "LOW"
코드 스니펫 (및 Grok) - 임의 필드를 주 호스트 이름으로 추출
예시 속성:
- 로그 소스 형식: XML
- 데이터 매핑 접근 방식: Grok을 사용하는 코드 스니펫
- 로그 유형: WINDOWS_DEFENDER_AV
- 파서 확장 프로그램 목적: 주 호스트 이름으로 임의 필드 추출
설명:
이 예의 목표는
FQDN
에서Hostname
를 추출하고principal.hostname
필드를 덮어쓰는 것입니다.이 예시에서는 원시 로그
Computer name
필드에FQDN
이 포함되어 있는지 확인합니다. 그렇다면Hostname
부분만 추출하고 UDMPrincipal Hostname
필드를 덮어씁니다.파서와 중요한 UDM 필드 문서를 검토한 결과
principal.hostname
필드를 사용해야 하는 것으로 확인되었습니다.# Parser Extension for WINDOWS_DEFENDER_AV # 2024-10-29: Extract Hostname from FQDN and overwrite principal.hostname filter { # Uses XPath to target the specific element(s) xml { source => "message" xpath => { "/Event/System/Computer" => "hostname" } on_error => "_xml_error" } # Conditional processing: Only proceed if XML parsing was successful if ![_xml_error] { # Extract all characters before the first dot in the hostname variable grok { match => { "hostname" => "(?<hostname>[^.]+)" } } mutate { replace => { "event1.idm.read_only_udm.principal.hostname" => "%{hostname}" } } mutate { merge => { "@output" => "event1" } } } }
이 파서 확장 프로그램은 Grok 문을 사용하여 정규 표현식(regex)을 실행하여
hostname
필드를 추출합니다. 정규식 자체는 명명된 캡처 그룹을 사용합니다. 즉, 괄호 안에 일치하는 항목은 점을 만날 때까지 하나 이상의 문자와 일치하는hostname
라는 필드에 저장됩니다. 이렇게 하면FQDN
내의hostname
만 캡처됩니다.하지만 PREVIEW UDM OUTPUT을 실행하면 오류가 반환됩니다. 이유가 무엇인가요?
generic::unknown: pipeline.ParseLogEntry failed: LOG_PARSING_CBN_ERROR: "generic::internal: pipeline failed: filter grok (2) failed: field\ "hostname\" already exists in data and is not overwritable"
Grok
overwrite
문Grok 문 내에서 이름이 지정된 캡처 그룹은
overwrite
문을 사용하여 명시적으로 지정하지 않는 한 기존 변수를 덮어쓸 수 없습니다. 이 시나리오에서는 Grok 문에서 명명된 캡처 그룹에 다른 변수 이름을 사용하거나 다음 코드 스니펫 예와 같이overwrite
문을 사용하여 기존hostname
변수를 명시적으로 덮어쓸 수 있습니다.# Parser Extension for WINDOWS_DEFENDER_AV # 2024-10-29: Overwriting principal Hostname filter { xml { source => "message" xpath => { "/Event/System/Computer" => "hostname" } on_error => "_xml_error" } if ![_xml_error] { grok { match => { "hostname" => "(?<hostname>[^.]+)" } overwrite => ["hostname"] on_error => "_grok_hostname_error" } mutate { replace => { "event1.idm.read_only_udm.principal.hostname" => "%{hostname}" } } mutate { merge => { "@output" => "event1" } } } }
PREVIEW UDM OUTPUT을 다시 실행하면
FQDN
에서hostname
를 추출한 후 새 필드가 추가된 것을 확인할 수 있습니다.metadata.event_timestamp"2024-10-29T14:08:52Z" metadata.event_type"STATUS_HEARTBEAT" metadata.vendor_name"Microsoft" metadata.product_name"Windows Defender AV" metadata.product_event_type"MALWAREPROTECTION_SERVICE_HEALTH_REPORT" metadata.description"Endpoint Protection client health report (time in UTC)." metadata.log_type"WINDOWS_DEFENDER_AV" principal.hostname"win-dc-01" security_result[0].description"EventID: 1151" security_result[0].action[0]"ALLOW" security_result[0].severity"LOW"
JSON, CSV, XML, Syslog, KV 예시
다음 예에서는 로그 소스가 JSON, CSV, XML, Syslog 또는 KV 형식인 파서 확장 프로그램을 만드는 방법을 보여줍니다.
코드 스니펫 - 기존 매핑 삭제
예시 속성:
- 로그 소스 형식: JSON, CSV, Syslog, XML, KV
- 데이터 매핑 접근 방식: 코드 스니펫
- 파서 확장 프로그램 목적: UDM 필드의 값 삭제
설명:
이러한 예의 목표는 UDM 필드의 값을 삭제하여 기존 매핑을 삭제하는 것입니다.
다음 예시에서는
string
필드의 값을 삭제합니다.filter { mutate{ replace => { "event.idm.read_only_udm.metadata.vendor_name" => "" } } mutate { merge => { "@output" => "event" } } }
다음 예시에서는
integer
필드의 값을 삭제합니다.filter { mutate { replace => { "principal_port" => "0" } } mutate { convert => { "principal_port" => "integer" } } mutate { rename => { "principal_port" => "event.idm.read_only_udm.principal.port" } } mutate { merge => { "@output" => "event" } } }
다음 예시에서는
float
필드의 값을 삭제합니다.filter { mutate { replace => { "security_result_object.risk_score" => "0.0" } convert => { "security_result_object.risk_score" => "float" } on_error => "default_risk_score_conversion_failed" } mutate { merge => { "event.idm.read_only_udm.security_result" => "security_result_object" } on_error => "security_result_merge_failed" } mutate { merge => { "@output" => "event" } } }
다음 예시에서는
boolean
필드의 값을 삭제합니다.filter { mutate{ replace => { "tls_established" => "false" } } mutate { convert => { "tls_established" => "boolean" } } mutate { rename => { "tls_established" => "event.idm.read_only_udm.network.tls.established" } } mutate { merge => { "@output" => "event" } } }
다음 예시에서는
extension
필드의 값을 삭제합니다.filter { mutate { replace => { "event.idm.read_only_udm.extensions.auth.auth_details" => "" } on_error => "logon_type_not_set" } mutate { merge => { "@output" => "event" } } }
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.