ログ解析の概要
このドキュメントでは、Google Security Operations が未加工ログを解析して統合データモデル(UDM)形式にする方法の概要を説明します。
Google Security Operations は、次の取り込みソースから送信されたログデータを受信できます。
- Google Security Operations フォワーダー
- Google Security Operations API フィード
- Google Security Operations Ingestion API
- サードパーティの技術パートナー
通常、お客様は元の未加工ログとしてデータを送信します。Google Security Operations は、LogType を使用してログを生成したデバイスを一意に識別します。LogType は次の両方を識別します。
- ログを生成したベンダーとデバイス(Cisco Firewall、Linux DHCP Server、Bro DNS など)。
- 未加工ログが構造化統合データモデル(UDM)に変換するパーサー。パーサーと LogType には 1 対 1 の関係があります。各パーサーは単一の LogType で受信したデータを変換します。
Google Security Operations には、元の未加工ログを読み取り、元の未加工ログのデータを使用して構造化 UDM レコードを生成する一連のデフォルト パーサーが用意されています。Google Security Operations はこれらのパーサーを維持します。お客様固有のパーサーを作成して、カスタム データ マッピング手順を定義することもできます。お客様固有のパーサーの作成については、Google Security Operations の担当者にお問い合わせください。
パーサーにはデータ マッピング指示が含まれています。これにより、元の未加工ログから UDM データ構造内の 1 つ以上のフィールドへのデータのマッピング方法が定義されます。
解析エラーがない場合、Google Security Operations は未加工ログのデータを使用して UDM 構造化レコードを作成します。未加工ログを UDM レコードに変換するプロセスを「正規化」といいます。
デフォルトのパーサーが、未加工ログから主要な値のサブセットをマッピングする場合があります。通常、これらのコア フィールドは、Google Security Operations でセキュリティ分析情報を提供するうえで最も重要です。マッピングされていない値は未加工のログに残りますが、UDM レコードには保存されません。
また、Ingestion API を使用して、構造化統合データモデル(UDM)形式でデータを送信することもできます。
取り込まれたデータの解析方法をカスタマイズする
Google Security Operations には、お客様が受信する元のログデータのデータ解析をカスタマイズできる次の機能が用意されています。
- お客様固有のパーサー: 特定の要件を満たす特定のログタイプに対するカスタム パーサー構成を作成します。お客様固有のパーサーは、特定の LogType のデフォルト パーサーに代わるものです。お客様固有のパーサーの作成については、Google Security Operations の担当者にお問い合わせください。
- パーサー拡張機能: ユーザーは、デフォルトのパーサー構成に加えて、カスタム マッピング手順を追加できます。各ユーザーは、独自のカスタム マッピング手順のセットを作成できます。これらのマッピング手順では、元の未加工ログから UDM フィールドに追加のフィールドを抽出して変換する方法を定義します。パーサー拡張機能は、デフォルト パーサーまたはユーザー固有のパーサーを置き換えるものではありません。
Squid ウェブプロキシ ログの使用例
このセクションでは、Squid ウェブプロキシのログの例と、値を UDM レコードにマッピングする方法について説明します。UDM スキーマのすべてのフィールドに関する説明については、統合データモデルのフィールド リストをご覧ください。
サンプルの Squid ウェブプロキシのログには、スペース区切りの値が含まれています。各レコードは 1 つのイベントを表し、タイムスタンプ、期間、クライアント、結果コード / 結果のステータス、送信されたバイト数、リクエスト メソッド、URL、ユーザー、階層コード、コンテンツ タイプのデータを保存します。この例では、時間、クライアント、結果のステータス、バイト、リクエスト メソッド、URLのフィールドが抽出され、UDM レコードにマッピングされています。
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
これらの構造を比較すると、元のログデータのサブセットのみが UDM レコードに含まれることが分かります。特定のフィールドは必須ですが、他のフィールドは省略可能です。また、UDM レコード内のセクションのサブセットにのみデータが含まれます。パーサーが元のログから UDM レコードにデータをマッピングしていない場合、UDM レコードのこのセクションは Google Security Operations に表示されません。
metadata
セクションには、イベント タイムスタンプが保存されます。値が EPOCH から RFC 3339 形式に変換されていることに注意してください。この変換は任意です。タイムスタンプは、秒部分とミリ秒部分を別個のフィールドに分離する前処理を経て、EPOCH 形式で保存できます。
metadata.event_type
フィールドには、イベントのタイプを識別する列挙値である値 NETWORK_HTTP
が保存されます。metadata.event_type
の値によって、必須の UDM フィールドと省略可能な追加フィールドが決まります。product_name
と vendor_name
の値には、元のログを記録したデバイスに関するわかりやすい説明が含まれています。
UDM イベントレコードの metadata.event_type
は、Ingestion AP を使用してデータを取り込むときに定義される log_type と同じではありません。この 2 つの属性には、異なる情報が保存されます。
network
セクションには、元のログイベントの値が含まれています。この例では、元のログのステータス値が UDM レコードに書き込まれる前に、「結果コード / ステータス」フィールドから解析されたことに注意してください。UDM レコードには結果コードのみが含まれていました。
principal
セクションには、元のログのクライアント情報が保存されます。target
セクションには、完全修飾 URL と IP アドレスの両方が保存されます。
security_result
セクションには、元のログに記録されたアクションを表す列挙値の 1 つが保存されます。
これは、JSON 形式の UDM レコードです。データが含まれているセクションのみが含まれることに注意してください。src
、observer
、intermediary
、about
、extensions
セクションは含まれていません。
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
パーサー指示内の手順
パーサー内のデータ マッピング命令には、次のような共通のパターンがあります。
- 元のログからデータを解析して抽出します。
- 抽出したデータを操作します。これには、値の選択的な解析、データ型の変換、値内の部分文字列の置換、大文字または小文字への変換などのための条件付きロジックの使用が含まれます。
- UDM フィールドに値を割り当てます。
- マッピングされた UDM レコードを @output キーに出力します。
元のログからデータを解析して抽出する
フィルタ ステートメントを設定する
filter
ステートメントは、一連の解析指示の最初のステートメントです。追加の解析指示はすべて filter
ステートメントに含まれます。
filter {
}
抽出された値を保存する変数を初期化する
filter
ステートメント内で、パーサーがログから抽出した値を保存するために使用する中間変数を初期化します。
これらの変数は、個々のログが解析されるたびに使用されます。各中間変数の値は、後の解析手順において 1 つ以上の UDM フィールドに設定されます。
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
ログから個別の値を抽出する
Google Security Operations には、Logstash に基づく一連のフィルタが用意されており、元のログファイルからフィールドを抽出します。ログの形式に応じて、1 つまたは複数の抽出フィルタを使用して、ログからすべてのデータを抽出します。文字列が次の場合:
- パーサー構文であるネイティブ JSON は、JSON 形式のログをサポートする JSON フィルタに似ています。ネストされた JSON はサポートされていません。
- パーサー構文である XML 形式は、XML 形式のログをサポートする XML フィルタに似ています。
- パーサー構文である Key-Value ペアは、Key-Value 形式のメッセージをサポートする Kv フィルタに似ています。
- パーサー構文である CSV 形式は、CSV 形式のメッセージをサポートする Csv フィルタに似ています。
- 他のすべての形式のパーサー構文は、GROK 組み込みパターンを使用した GROK フィルタに似ています。これは正規表現形式の抽出指示を使用します。
Google Security Operations は、各フィルタで使用できる機能のサブセットを提供します。Google Security Operations は、フィルタでは使用できないカスタム データ マッピング構文も提供しています。サポートされている機能とカスタム関数の説明については、パーサー構文リファレンスをご覧ください。
Squid ウェブプロキシのログの例では、次のデータ抽出指示に Logstash Grok 構文と正規表現の組み合わせが含まれています。
次の抽出ステートメントでは、次の中間変数に値が保存されます。
when
srcip
action
returnCode
size
method
username
url
tgtip
このステートメント例では、抽出された値を各変数に格納するために overwrite
キーワードも使用されます。抽出プロセスがエラーを返した場合、on_error
ステートメントは not_valid_log
を True
に設定します。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
抽出された値を操作および変換する
Google Security Operations は Logstash の 変更フィルタ プラグイン機能を使用して、元のログから抽出された値を操作できます。Google Security Operations は、プラグインで使用できる機能のサブセットを提供します。サポートされている機能とカスタム関数の説明については、パーサー構文をご覧ください。例:
- 値を別のデータ型にキャストする
- 文字列の値を置き換える
- 2 つの配列を結合するか、配列に文字列を追加する。文字列値は、マージする前に配列に変換されます。
- 小文字または大文字に変換する
このセクションでは、前述した Squid ウェブプロキシのログをベースにしたデータ変換の例を示します。
イベントのタイムスタンプを変換する
UDM レコードとして保存されるすべてのイベントには、イベント タイムスタンプが必要です。この例は、データの値がログから抽出されたかどうかを確認します。次に、Grok date 関数を使用して、値を UNIX
時間形式と照合します。
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
username
値を変換する
次のステートメント例では、username
変数の値を小文字に変換します。
mutate {
lowercase => [ "username"]
}
action
値を変換する
次の例では、action
中間変数の値を評価し、値を security_result.action
UDM フィールドの有効な値である ALLOW、BLOCK、UNKNOWN_Action に変更します。security_result.action
UDM フィールドは、特定の値のみを格納する列挙型です。
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
宛先 IP アドレスを変換する
次の例では、tgtip
中間変数の値を確認します。検出された場合、値は事前定義された Grok パターンを使用して IP アドレス パターンと照合されます。値を IP アドレス パターンと照合する際にエラーが発生した場合、on_error
関数は not_valid_tgtip
プロパティを True
に設定します。一致が成功した場合、not_valid_tgtip
プロパティは設定されません。
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
returnCode のデータ型とサイズを変更する
次の例では、size
変数の値を uinteger
に、returnCode
変数の値を integer
にキャストします。これは、size
変数が int64
データ型を格納する network.received_bytes
UDM フィールドに保存されるためです。returnCode
変数は、int32
データ型を格納する network.http.response_code
UDM フィールドに保存されます。
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
イベント内の UDM フィールドに値を割り当てる
値が抽出され、前処理されたら、それらの値を UDM イベントレコードのフィールドに割り当てます。UDM フィールドには、抽出した値と静的な値の両方を割り当てることができます。
event.disambiguation_key
に値を入力する場合、このフィールドは特定のログに対して生成される各イベントに固有のものであることを確認してください。2 つの異なるイベントに同じ disambiguation_key
がある場合、システムで予期しない動作が発生します。
このセクションのパーサーの例は、上記の Squid ウェブプロキシ ログの例に基づいています。
イベントのタイムスタンプを保存する
すべての UDM イベント レコードには、metadata.event_timestamp
UDM フィールドの値を設定する必要があります。次の例では、ログから抽出したイベント タイムスタンプを @timestamp
組み込み変数に保存します。Google Security Operations は、デフォルトでこれを metadata.event_timestamp
UDM フィールドに保存します。
mutate {
rename => {
"when" => "timestamp"
}
}
イベントタイプを設定する
すべての UDM イベント レコードには、metadata.event_type
UDM フィールドの値を設定する必要があります。このフィールドは列挙型です。このフィールドの値によって、UDM レコードを保存するために必要な追加の UDM フィールドが決まります。いずれかの必須フィールドに有効なデータが含まれていない場合、解析と正規化のプロセスは失敗します。
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
replace
ステートメントを使用して username
と method
の値を保存します。
中間フィールドの値が username
と method
の場合は、文字列です。次の例は、有効な値が存在するかどうかを確認し、存在する場合は username
値を principal.user.userid
UDM フィールドに保存し、method
値を network.http.method
UDM フィールドに保存します。
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
action
を security_result.action
UDM フィールドに保存する
前のセクションでは、action
中間変数の値が評価され、security_result.action
UDM フィールドの標準値のいずれかに変換されました。
security_result
と action
の両方の UDM フィールドにアイテムの配列が保存されます。つまり、この値を保存する際は、若干異なるアプローチを採用する必要があります。
まず、変換済みの値を中間の security_result.action
フィールドに保存します。security_result
フィールドは、action
フィールドの親です。
mutate {
merge => {
"security_result.action" => "action"
}
}
次に、中間の security_result.action
中間フィールドを security_result
UDM フィールドに保存します。security_result
UDM フィールドにはアイテムの配列が保存され、このフィールドに値が付加されます。
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
merge
ステートメントを使用して、ターゲット IP アドレスとソース IP アドレスを保存する
UDM イベント レコードに次の値を保存します。
srcip
中間変数のprincipal.ip
UDM フィールドの値。tgtip
中間変数のtarget.ip
UDM フィールドの値。
principal.ip
と target.ip
の両方の UDM フィールドにアイテムの配列が保存されるため、各フィールドに値が追加されます。
以下の例は、これらの値を保存するさまざまな方法を示しています。変換ステップで、tgtip
中間変数が、事前定義された Grok パターンを使用して IP アドレスと照合されました。次のステートメントの例では、tgtip
値が IP アドレス パターンと一致しないことを示す not_valid_tgtip
プロパティが true であるかどうかを確認します。false の場合、tgtip
の値が target.ip
UDM フィールドに保存されます。
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
srcip
中間変数は変換されていません。次のステートメントは、元のログから値が抽出されたかどうかをチェックし、抽出されている場合は principal.ip
UDM フィールドに値を保存します。
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
rename
ステートメントを使用して、url
、returnCode
、size
を保存します。
以下のステートメントの例では、rename
ステートメントを使用して次の値を保存します。
target.url
UDM フィールドに保存されたurl
変数。network.http.response_code
UDM フィールドに保存されたreturnCode
中間変数。network.received_bytes
UDM フィールドに保存されたsize
中間変数。
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
UDM レコードを出力にバインドする
データ マッピング指示の最後のステートメントでは、処理されたデータが UDM イベント レコードに出力されます。
mutate {
merge => {
"@output" => "event"
}
}
完全なパーサーコード
完全なパーサーコードの例を次に示します。命令の順序は、このドキュメントの前のセクションと同じ順序ではありませんが、同じ出力になります。
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter