로그 파싱 개요
이 문서에서는 Google Security Operations에서 원시 로그를 통합 데이터 모델(UDM) 형식으로 파싱하는 방법을 간략하게 설명합니다.
Google Security Operations는 다음 수집 소스에서 발생한 로그 데이터를 수신할 수 있습니다.
- Google Security Operations 전달자
- Google Security Operations API 피드
- Google Security Operations 수집 API
- 서드 파티 기술 파트너
일반적으로 고객은 데이터를 원본 원시 로그로 전송합니다. Google Security Operations는 LogType을 사용하여 로그를 생성한 기기를 고유하게 식별합니다. LogType은 다음 두 가지 모두 식별합니다.
- 로그를 생성한 공급업체 및 기기(예: Cisco 방화벽, Linux DHCP 서버, Bro DNS)
- 원시 로그를 구조화된 통합 데이터 모델 (UDM)로 변환하는 파서 파서와 LogType 간에는 일대일 관계가 있습니다. 각 파서는 단일 LogType으로 수신된 데이터를 변환합니다.
Google Security Operations는 원본 원시 로그를 읽고 원본 원시 로그의 데이터를 사용하여 구조화된 UDM 레코드를 생성하는 기본 파서 집합을 제공합니다. Google Security Operations에서 이러한 파서를 유지보수합니다. 고객은 고객별 파서를 만들어 커스텀 데이터 매핑 명령어를 정의할 수도 있습니다. 고객별 파서를 만드는 방법은 Google Security Operations 담당자에게 문의하세요.
파서에는 데이터 매핑 명령어가 포함되어 있습니다. 이 명령어는 원본 원시 로그에서 UDM 데이터 구조의 하나 이상의 필드에 데이터가 매핑되는 방식을 정의합니다.
파싱 오류가 없으면 Google Security Operations에서 원시 로그의 데이터를 사용하여 UDM 구조화된 레코드를 만듭니다. 원시 로그를 UDM 레코드로 변환하는 프로세스를 정규화라고 합니다.
기본 파서에서 원시 로그의 핵심 값 하위 집합을 매핑할 수 있습니다. 일반적으로 이러한 핵심 필드는 Google Security Operations에서 보안 통계를 제공하는 데 가장 중요한 역할을 합니다. 매핑되지 않은 값은 원시 로그에 유지되지만 UDM 레코드에는 저장되지 않습니다.
고객은 Ingestion API를 사용하여 구조화된 통합 데이터 모델(UDM) 형식으로 데이터를 전송할 수도 있습니다.
수집된 데이터 파싱 방법 맞춤설정
Google Security Operations는 고객이 수신되는 원본 로그 데이터에서 데이터 파싱을 맞춤설정할 수 있는 기능을 제공합니다.
- 고객별 파서: 고객이 특정 요구사항을 충족하는 특정 로그 유형에 대한 커스텀 파서 구성을 만듭니다. 고객별 파서는 특정 LogType의 기본 파서를 대체합니다. 고객별 파서를 만드는 방법은 Google Security Operations 담당자에게 문의하세요.
- 파서 확장 프로그램: 고객은 기본 파서 구성 외에 커스텀 매핑 명령어를 추가할 수 있습니다. 고객마다 고유한 커스텀 매핑 명령어 집합을 만들 수 있습니다. 이 매핑 명령어는 원본 원시 로그에서 UDM 필드로 추가 필드를 추출하고 변환하는 방법을 정의합니다. 파서 확장 프로그램은 기본 또는 고객별 파서를 대체하지 않습니다.
Squid 웹 프록시 로그 사용 예시
이 섹션에서는 Squid 웹 프록시 로그 예시를 제공하고 값이 UDM 레코드에 매핑되는 방식을 설명합니다. UDM 스키마의 모든 필드에 대한 설명은 통합 데이터 모델 필드 목록을 참조하세요.
Squid 웹 프록시 로그 예시에는 공백으로 구분된 값이 포함되어 있습니다. 각 레코드는 하나의 이벤트를 나타내며 타임스탬프, 기간, 클라이언트, 결과 코드/결과 상태, 전송된 바이트, 요청 메서드, URL, 사용자, 계층 구조 코드, 콘텐츠 유형과 같은 데이터를 저장합니다. 이 예시에서는 시간, 클라이언트, 결과 상태, 바이트, 요청 방법, URL 등의 필드를 추출하여 UDM 레코드에 매핑합니다.
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
구조를 비교할 때 원본 로그 데이터의 하위 집합만 UDM 레코드에 포함된다는 점에 유의하세요. 특정 필드는 필수이고 다른 필드는 선택사항입니다. 또한 UDM 레코드의 섹션 하위 집합에만 데이터가 포함됩니다. 파서에서 원본 로그의 데이터를 UDM 레코드로 매핑하지 않으면 Google Security Operations에 UDM 레코드 섹션이 표시되지 않습니다.
metadata
섹션에는 이벤트 타임스탬프가 저장됩니다. 값이 EPOCH에서 RFC 3339 형식으로 변환되었습니다. 이 변환은 선택사항입니다. 타임스탬프는 EPOCH 형식으로 저장할 수 있으며 이때 초 및 밀리초 부분을 별도의 필드로 분리하도록 사전 처리됩니다.
metadata.event_type
필드는 이벤트 유형을 식별하는 열거형 값인 NETWORK_HTTP
값을 저장합니다. metadata.event_type
값에 따라 필수 및 선택적 추가 UDM 필드가 결정됩니다. product_name
및 vendor_name
값에는 원본 로그를 기록한 기기의 사용자 친화적인 설명이 포함됩니다.
UDM 이벤트 레코드의 metadata.event_type
은 Ingestion API를 사용하여 데이터를 수집할 때 정의된 log_type과 다릅니다. 이 두 속성은 서로 다른 정보를 저장합니다.
network
섹션에는 원본 로그 이벤트의 값이 포함됩니다. 이 예시에서는 UDM 레코드에 기록하기 전에 원본 로그의 상태 값이 '결과 코드/상태' 필드에서 파싱되었습니다. result_code만 UDM 레코드에 포함되었습니다.
principal
섹션은 원본 로그의 클라이언트 정보를 저장합니다. target
섹션에는 정규화된 URL과 IP 주소가 모두 저장됩니다.
security_result
섹션은 원본 로그에 기록된 작업을 나타내기 위해 enum 값 중 하나를 저장합니다.
JSON 형식의 UDM 레코드입니다. 데이터를 포함하는 섹션만 포함됩니다. src
, observer
, intermediary
, about
, extensions
섹션은 포함되지 않습니다.
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
파서 명령어 내의 단계
파서 내의 데이터 매핑 명령어는 다음과 같은 공통적인 패턴을 따릅니다.
- 원본 로그에서 데이터를 파싱하고 추출합니다.
- 추출된 데이터를 조작합니다. 여기에는 선택적으로 값을 파싱하고, 데이터 유형을 변환하고, 값의 하위 문자열을 바꾸고, 대문자 또는 소문자로 변환하는 등의 조건부 로직이 포함됩니다.
- UDM 필드에 값을 할당합니다.
- 매핑된 UDM 레코드를 @output 키로 출력합니다.
원본 로그에서 데이터 파싱 및 추출
필터 문 설정
filter
문은 파싱 명령어 집합의 첫 번째 문입니다.
모든 추가 파싱 명령어는 filter
문 내에 포함되어 있습니다.
filter {
}
추출된 값을 저장할 변수 초기화
filter
문 내에서 파서가 로그에서 추출한 값을 저장하는 데 사용할 중간 변수를 초기화합니다.
이러한 변수는 개별 로그가 파싱될 때마다 사용됩니다. 각 중간 변수의 값은 나중에 파싱 명령어에서 하나 이상의 UDM 필드로 설정됩니다.
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
로그에서 개별 값 추출
Google Security Operations는 Logstash를 기반으로 원본 로그 파일에서 필드를 추출하는 필터 집합을 제공합니다. 로그 형식에 따라 추출 필터 하나 이상을 사용하여 로그에서 모든 데이터를 추출합니다. 문자열이 다음과 같은 경우:
- 네이티브 JSON, 파서 문법은 JSON 형식의 로그를 지원하는 JSON 필터와 비슷합니다. 중첩된 JSON은 지원되지 않습니다.
- XML 형식, 파서 문법은 XML 형식 로그를 지원하는 XML 필터와 비슷합니다.
- 키-값 쌍, 파서 문법은 키-값 형식의 메시지를 지원하는 Kv 필터와 비슷합니다.
- CSV 형식, 파서 문법은 csv 형식의 메시지를 지원하는 Csv 필터와 비슷합니다.
- 다른 모든 형식, 파서 문법은 GROK 기본 제공 패턴을 사용하는 GROK 필터와 비슷합니다 . 여기에는 정규식 스타일 추출 명령어가 사용됩니다.
Google Security Operations는 각 필터에서 사용할 수 있는 기능의 하위 집합을 제공합니다. 또한 Google Security Operations는 필터에서 사용할 수 없는 커스텀 데이터 매핑 구문을 제공합니다. 지원되는 기능 및 커스텀 함수에 대한 설명은 파서 문법 참조를 확인하세요.
계속해서 Squid 웹 프록시 로그 예시를 살펴보면 다음 데이터 추출 명령어에 Logstash Grok 문법과 정규 표현식의 조합이 포함되어 있습니다.
다음 추출 문은 다음 중간 변수에 값을 저장합니다.
when
srcip
action
returnCode
size
method
username
url
tgtip
이 예시 문은 overwrite
키워드를 사용하여 추출된 값을 각 변수에 저장합니다. 추출 프로세스가 오류를 반환하면 on_error
문은 not_valid_log
를 True
로 설정합니다.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
추출된 값 조작 및 변환
Google Security Operations는 Logstash 변형 필터 플러그인 기능을 사용하여 원본 로그에서 추출한 값을 조작할 수 있습니다. Google Security Operations는 플러그인을 사용할 수 있는 기능의 하위 집합을 제공합니다. 다음과 같은 지원되는 기능과 커스텀 함수에 대한 설명은 파서 문법을 참조하세요.
- 값을 다른 데이터 유형으로 변환
- 문자열의 값 바꾸기
- 두 배열을 병합하거나 배열에 문자열 추가. 문자열 값이 병합하기 전에 배열로 변환됩니다.
- 소문자 또는 대문자로 변환
이 섹션에서는 앞에 나온 Squid 웹 프록시 로그를 기반으로 빌드된 데이터 변환 예시를 제공합니다.
이벤트 타임스탬프 변환
UDM 레코드로 저장된 모든 이벤트에는 이벤트 타임스탬프가 있어야 합니다. 이 예시에서는 데이터 값이 로그에서 추출되었는지 확인합니다. 그런 다음 Grok 날짜 함수를 사용하여 값을 UNIX
시간 형식과 일치시킵니다.
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
username
값 변환
다음 예시 문은 username
변수의 값을 소문자로 변환합니다.
mutate {
lowercase => [ "username"]
}
action
값 변환
다음 예시에서는 action
중간 변수의 값을 평가하고 값을 security_result.action
UDM 필드의 유효한 값인 ALLOW, BLOCK 또는 UNKNOWN_ACTION으로 변경합니다. security_result.action
UDM 필드는 특정 값만 저장하는 열거형 유형입니다.
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
대상 IP 주소 변환
다음 예시에서는 tgtip
중간 변수의 값을 확인합니다.
발견되는 경우 사전 정의된 Grok 패턴을 사용하여 이 값을 IP 주소 패턴과 일치시킵니다. 값을 IP 주소 패턴과 일치시키는 데 오류가 발생하면 on_error
함수는 not_valid_tgtip
속성을 True
로 설정합니다. 일치가 성공하면 not_valid_tgtip
속성이 설정되지 않습니다.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
returnCode 및 size의 데이터 유형 변경
다음 예시는 size
변수의 값을 uinteger
로 변환하고 returnCode
변수의 값을 integer
로 변환합니다. size
변수가 int64
데이터 유형을 저장하는 network.received_bytes
UDM 필드에 저장되기 때문에 필요한 절차입니다. returnCode
변수는 int32
데이터 유형을 저장하는 network.http.response_code
UDM 필드에 저장됩니다.
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
이벤트의 UDM 필드에 값 할당
값을 추출하여 사전 처리한 후에 UDM 이벤트 레코드의 필드에 값을 할당합니다. 추출된 값과 정적 값을 모두 UDM 필드에 할당할 수 있습니다.
event.disambiguation_key
를 입력할 경우 이 필드가 지정된 로그에 대해 생성된 각 이벤트에 고유한지 확인합니다. 서로 다른 두 이벤트의 disambiguation_key
가 동일한 경우 시스템에서 예기치 않은 동작이 발생합니다.
이 섹션의 파서 예시는 위의 Squid 웹 프록시 로그 예시를 기반으로 합니다.
이벤트 타임스탬프 저장
모든 UDM 이벤트 레코드에는 metadata.event_timestamp
UDM 필드에 설정된 값이 있어야 합니다. 다음 예시에서는 로그에서 추출된 이벤트 타임스탬프를 @timestamp
기본 제공 변수에 저장합니다. Google Security Operations는 기본적으로 metadata.event_timestamp
UDM 필드에 저장합니다.
mutate {
rename => {
"when" => "timestamp"
}
}
이벤트 유형 설정
모든 UDM 이벤트 레코드에는 metadata.event_type
UDM 필드에 설정된 값이 있어야 합니다. 이 필드는 열거형입니다. 이 필드의 값에 따라 UDM 레코드를 저장하기 위해 채워야 할 추가 UDM 필드가 결정됩니다.
필수 필드에 유효한 데이터가 포함되어 있지 않으면 파싱 및 정규화 프로세스가 실패합니다.
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
replace
문을 사용하여 username
및 method
값을 저장합니다.
username
및 method
중간 필드의 값은 문자열입니다. 다음 예시에서는 유효한 값이 있는지 확인하고 유효한 값이 존재하는 경우 username
값을 principal.user.userid
UDM 필드에, method
값을 network.http.method
UDM 필드에 저장합니다.
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
security_result.action
UDM 필드에 action
저장
이전 섹션에서는 action
중간 변수의 값을 평가하고 security_result.action
UDM 필드의 표준 값 중 하나로 변환했습니다.
security_result
및 action
UDM 필드는 모두 항목의 배열을 저장하므로 이 값을 저장할 때는 약간 다른 접근 방식을 따라야 합니다.
먼저 변환된 값을 중간 security_result.action
필드에 저장합니다. security_result
필드는 action
필드의 상위 요소입니다.
mutate {
merge => {
"security_result.action" => "action"
}
}
그런 다음 중간 security_result.action
중개 필드를 security_result
UDM 필드에 저장합니다. security_result
UDM 필드는 항목의 배열을 저장하므로 이 필드에 값이 추가됩니다.
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
merge
문을 사용하여 대상 IP 주소와 소스 IP 주소를 저장합니다.
UDM 이벤트 레코드에 다음 값을 저장합니다.
principal.ip
UDM 필드에srcip
중간 변수의 값target.ip
UDM 필드에tgtip
중간 변수의 값
principal.ip
및 target.ip
UDM 필드 모두 항목 배열을 저장하므로 각 필드에 값이 추가됩니다.
아래 예시에서는 이러한 값을 저장하는 다양한 접근 방식을 보여줍니다.
변환 단계 중에 tgtip
중간 변수가 사전 정의된 Grok 패턴을 사용하여 IP 주소와 일치되었습니다. 다음 예시 문에서는 not_valid_tgtip
속성이 true인지, 즉 tgtip
값이 IP 주소 패턴과 일치할 수 없음을 나타내는지 확인합니다. false이면 tgtip
값을 target.ip
UDM 필드에 저장합니다.
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
srcip
중간 변수가 변환되지 않았습니다. 다음 문은 원본 로그에서 값이 추출되었는지 확인하고 값이 추출되었으면 principal.ip
UDM 필드에 값을 저장합니다.
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
rename
문을 사용하여 url
, returnCode
, size
를 저장합니다.
아래 예시 문은 rename
문을 사용하여 다음 값을 저장합니다.
target.url
UDM 필드에url
변수 저장network.http.response_code
UDM 필드에returnCode
중간 변수 저장network.received_bytes
UDM 필드에size
중간 변수 저장
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
출력에 UDM 레코드 결합
데이터 매핑 명령어의 마지막 문은 처리된 데이터를 UDM 이벤트 레코드로 출력합니다.
mutate {
merge => {
"@output" => "event"
}
}
전체 파서 코드
다음은 전체 파서 코드 예시입니다. 명령어 순서는 이 문서의 이전 섹션과 같은 순서를 따르지 않지만 동일한 출력을 반환합니다.
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter