ログ解析の概要

このドキュメントでは、Google Security Operations が未加工ログを解析して統合データモデル(UDM)形式にする方法の概要を説明します。

Google Security Operations は、次の取り込みソースから送信されたログデータを受信できます。

  • Google Security Operations フォワーダー
  • Google Security Operations API フィード
  • Google Security Operations Ingestion API
  • サードパーティの技術パートナー

通常、お客様は元の未加工ログとしてデータを送信します。Google Security Operations は、LogType を使用してログを生成したデバイスを一意に識別します。LogType は次の両方を識別します。

  • ログを生成したベンダーとデバイス(Cisco Firewall、Linux DHCP Server、Bro DNS など)。
  • 未加工ログが構造化統合データモデル(UDM)に変換するパーサー。パーサーと LogType には 1 対 1 の関係があります。各パーサーは単一の LogType で受信したデータを変換します。

Google Security Operations には、元の未加工ログを読み取り、元の未加工ログのデータを使用して構造化 UDM レコードを生成する一連のデフォルト パーサーが用意されています。Google Security Operations はこれらのパーサーを維持します。お客様固有のパーサーを作成して、カスタム データ マッピング手順を定義することもできます。お客様固有のパーサーの作成については、Google Security Operations の担当者にお問い合わせください。

取り込みと正規化のワークフロー

パーサーにはデータ マッピング指示が含まれています。これにより、元の未加工ログから UDM データ構造内の 1 つ以上のフィールドへのデータのマッピング方法が定義されます。

解析エラーがない場合、Google Security Operations は未加工ログのデータを使用して UDM 構造化レコードを作成します。未加工ログを UDM レコードに変換するプロセスを「正規化」といいます。

デフォルトのパーサーが、未加工ログから主要な値のサブセットをマッピングする場合があります。通常、これらのコア フィールドは、Google Security Operations でセキュリティ分析情報を提供するうえで最も重要です。マッピングされていない値は未加工のログに残りますが、UDM レコードには保存されません。

また、Ingestion API を使用して、構造化統合データモデル(UDM)形式でデータを送信することもできます。

取り込まれたデータの解析方法をカスタマイズする

Google Security Operations には、お客様が受信する元のログデータのデータ解析をカスタマイズできる次の機能が用意されています。

  • お客様固有のパーサー: 特定の要件を満たす特定のログタイプに対するカスタム パーサー構成を作成します。お客様固有のパーサーは、特定の LogType のデフォルト パーサーに代わるものです。お客様固有のパーサーの作成については、Google Security Operations の担当者にお問い合わせください。
  • パーサー拡張機能: ユーザーは、デフォルトのパーサー構成に加えて、カスタム マッピング手順を追加できます。各ユーザーは、独自のカスタム マッピング手順のセットを作成できます。これらのマッピング手順では、元の未加工ログから UDM フィールドに追加のフィールドを抽出して変換する方法を定義します。パーサー拡張機能は、デフォルト パーサーまたはユーザー固有のパーサーを置き換えるものではありません。

Squid ウェブプロキシ ログの使用例

このセクションでは、Squid ウェブプロキシのログの例と、値を UDM レコードにマッピングする方法について説明します。UDM スキーマのすべてのフィールドに関する説明については、統合データモデルのフィールド リストをご覧ください。

サンプルの Squid ウェブプロキシのログには、スペース区切りの値が含まれています。各レコードは 1 つのイベントを表し、タイムスタンプ、期間、クライアント、結果コード / 結果のステータス、送信されたバイト数、リクエスト メソッド、URL、ユーザー、階層コード、コンテンツ タイプのデータを保存します。この例では、時間、クライアント、結果のステータス、バイト、リクエスト メソッド、URLのフィールドが抽出され、UDM レコードにマッピングされています。

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Squid ウェブプロキシの例

これらの構造を比較すると、元のログデータのサブセットのみが UDM レコードに含まれることが分かります。特定のフィールドは必須ですが、他のフィールドは省略可能です。また、UDM レコード内のセクションのサブセットにのみデータが含まれます。パーサーが元のログから UDM レコードにデータをマッピングしていない場合、UDM レコードのこのセクションは Google Security Operations に表示されません。

UDM にマッピングされるログ値

metadata セクションには、イベント タイムスタンプが保存されます。値が EPOCH から RFC 3339 形式に変換されていることに注意してください。この変換は任意です。タイムスタンプは、秒部分とミリ秒部分を別個のフィールドに分離する前処理を経て、EPOCH 形式で保存できます。

metadata.event_type フィールドには、イベントのタイプを識別する列挙値である値 NETWORK_HTTP が保存されます。metadata.event_type の値によって、必須の UDM フィールドと省略可能な追加フィールドが決まります。product_namevendor_name の値には、元のログを記録したデバイスに関するわかりやすい説明が含まれています。

UDM イベントレコードの metadata.event_type は、Ingestion AP を使用してデータを取り込むときに定義される log_type と同じではありません。この 2 つの属性には、異なる情報が保存されます。

network セクションには、元のログイベントの値が含まれています。この例では、元のログのステータス値が UDM レコードに書き込まれる前に、「結果コード / ステータス」フィールドから解析されたことに注意してください。UDM レコードには結果コードのみが含まれていました。

UDM にマッピングされるログ値

principal セクションには、元のログのクライアント情報が保存されます。target セクションには、完全修飾 URL と IP アドレスの両方が保存されます。

security_result セクションには、元のログに記録されたアクションを表す列挙値の 1 つが保存されます。

これは、JSON 形式の UDM レコードです。データが含まれているセクションのみが含まれることに注意してください。srcobserverintermediaryaboutextensions セクションは含まれていません。

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

パーサー指示内の手順

パーサー内のデータ マッピング命令には、次のような共通のパターンがあります。

  1. 元のログからデータを解析して抽出します。
  2. 抽出したデータを操作します。これには、値の選択的な解析、データ型の変換、値内の部分文字列の置換、大文字または小文字への変換などのための条件付きロジックの使用が含まれます。
  3. UDM フィールドに値を割り当てます。
  4. マッピングされた UDM レコードを @output キーに出力します。

元のログからデータを解析して抽出する

フィルタ ステートメントを設定する

filter ステートメントは、一連の解析指示の最初のステートメントです。追加の解析指示はすべて filter ステートメントに含まれます。

filter {

}

抽出された値を保存する変数を初期化する

filter ステートメント内で、パーサーがログから抽出した値を保存するために使用する中間変数を初期化します。

これらの変数は、個々のログが解析されるたびに使用されます。各中間変数の値は、後の解析手順において 1 つ以上の UDM フィールドに設定されます。

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

ログから個別の値を抽出する

Google Security Operations には、Logstash に基づく一連のフィルタが用意されており、元のログファイルからフィールドを抽出します。ログの形式に応じて、1 つまたは複数の抽出フィルタを使用して、ログからすべてのデータを抽出します。文字列が次の場合:

  • パーサー構文であるネイティブ JSON は、JSON 形式のログをサポートする JSON フィルタに似ています。ネストされた JSON はサポートされていません。
  • パーサー構文である XML 形式は、XML 形式のログをサポートする XML フィルタに似ています。
  • パーサー構文である Key-Value ペアは、Key-Value 形式のメッセージをサポートする Kv フィルタに似ています。
  • パーサー構文である CSV 形式は、CSV 形式のメッセージをサポートする Csv フィルタに似ています。
  • 他のすべての形式のパーサー構文は、GROK 組み込みパターンを使用した GROK フィルタに似ています。これは正規表現形式の抽出指示を使用します。

Google Security Operations は、各フィルタで使用できる機能のサブセットを提供します。Google Security Operations は、フィルタでは使用できないカスタム データ マッピング構文も提供しています。サポートされている機能とカスタム関数の説明については、パーサー構文リファレンスをご覧ください。

Squid ウェブプロキシのログの例では、次のデータ抽出指示に Logstash Grok 構文と正規表現の組み合わせが含まれています。

次の抽出ステートメントでは、次の中間変数に値が保存されます。

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

このステートメント例では、抽出された値を各変数に格納するために overwrite キーワードも使用されます。抽出プロセスがエラーを返した場合、on_error ステートメントは not_valid_logTrue に設定します。

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

抽出された値を操作および変換する

Google Security Operations は Logstash の 変更フィルタ プラグイン機能を使用して、元のログから抽出された値を操作できます。Google Security Operations は、プラグインで使用できる機能のサブセットを提供します。サポートされている機能とカスタム関数の説明については、パーサー構文をご覧ください。例:

  • 値を別のデータ型にキャストする
  • 文字列の値を置き換える
  • 2 つの配列を結合するか、配列に文字列を追加する。文字列値は、マージする前に配列に変換されます。
  • 小文字または大文字に変換する

このセクションでは、前述した Squid ウェブプロキシのログをベースにしたデータ変換の例を示します。

イベントのタイムスタンプを変換する

UDM レコードとして保存されるすべてのイベントには、イベント タイムスタンプが必要です。この例は、データの値がログから抽出されたかどうかを確認します。次に、Grok date 関数を使用して、値を UNIX 時間形式と照合します。

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

username 値を変換する

次のステートメント例では、username 変数の値を小文字に変換します。

mutate {
   lowercase => [ "username"]
   }

action 値を変換する

次の例では、action 中間変数の値を評価し、値を security_result.action UDM フィールドの有効な値である ALLOW、BLOCK、UNKNOWN_Action に変更します。security_result.action UDM フィールドは、特定の値のみを格納する列挙型です。

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

宛先 IP アドレスを変換する

次の例では、tgtip 中間変数の値を確認します。検出された場合、値は事前定義された Grok パターンを使用して IP アドレス パターンと照合されます。値を IP アドレス パターンと照合する際にエラーが発生した場合、on_error 関数は not_valid_tgtip プロパティを True に設定します。一致が成功した場合、not_valid_tgtip プロパティは設定されません。

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

returnCode のデータ型とサイズを変更する

次の例では、size 変数の値を uinteger に、returnCode 変数の値を integer にキャストします。これは、size 変数が int64 データ型を格納する network.received_bytes UDM フィールドに保存されるためです。returnCode 変数は、int32 データ型を格納する network.http.response_code UDM フィールドに保存されます。

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

イベント内の UDM フィールドに値を割り当てる

値が抽出され、前処理されたら、それらの値を UDM イベントレコードのフィールドに割り当てます。UDM フィールドには、抽出した値と静的な値の両方を割り当てることができます。

event.disambiguation_key に値を入力する場合、このフィールドは特定のログに対して生成される各イベントに固有のものであることを確認してください。2 つの異なるイベントに同じ disambiguation_key がある場合、システムで予期しない動作が発生します。

このセクションのパーサーの例は、上記の Squid ウェブプロキシ ログの例に基づいています。

イベントのタイムスタンプを保存する

すべての UDM イベント レコードには、metadata.event_timestamp UDM フィールドの値を設定する必要があります。次の例では、ログから抽出したイベント タイムスタンプを @timestamp 組み込み変数に保存します。Google Security Operations は、デフォルトでこれを metadata.event_timestamp UDM フィールドに保存します。

mutate {
  rename => {
    "when" => "timestamp"
  }
}

イベントタイプを設定する

すべての UDM イベント レコードには、metadata.event_type UDM フィールドの値を設定する必要があります。このフィールドは列挙型です。このフィールドの値によって、UDM レコードを保存するために必要な追加の UDM フィールドが決まります。いずれかの必須フィールドに有効なデータが含まれていない場合、解析と正規化のプロセスは失敗します。

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

replace ステートメントを使用して usernamemethod の値を保存します。

中間フィールドの値が usernamemethod の場合は、文字列です。次の例は、有効な値が存在するかどうかを確認し、存在する場合は username 値を principal.user.userid UDM フィールドに保存し、method 値を network.http.method UDM フィールドに保存します。

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

actionsecurity_result.action UDM フィールドに保存する

前のセクションでは、action 中間変数の値が評価され、security_result.action UDM フィールドの標準値のいずれかに変換されました。

security_resultaction の両方の UDM フィールドにアイテムの配列が保存されます。つまり、この値を保存する際は、若干異なるアプローチを採用する必要があります。

まず、変換済みの値を中間の security_result.action フィールドに保存します。security_result フィールドは、action フィールドの親です。

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

次に、中間の security_result.action 中間フィールドを security_result UDM フィールドに保存します。security_result UDM フィールドにはアイテムの配列が保存され、このフィールドに値が付加されます。

# save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

merge ステートメントを使用して、ターゲット IP アドレスとソース IP アドレスを保存する

UDM イベント レコードに次の値を保存します。

  • srcip 中間変数の principal.ip UDM フィールドの値。
  • tgtip 中間変数の target.ip UDM フィールドの値。

principal.iptarget.ip の両方の UDM フィールドにアイテムの配列が保存されるため、各フィールドに値が追加されます。

以下の例は、これらの値を保存するさまざまな方法を示しています。変換ステップで、tgtip 中間変数が、事前定義された Grok パターンを使用して IP アドレスと照合されました。次のステートメントの例では、tgtip 値が IP アドレス パターンと一致しないことを示す not_valid_tgtip プロパティが true であるかどうかを確認します。false の場合、tgtip の値が target.ip UDM フィールドに保存されます。

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

srcip 中間変数は変換されていません。次のステートメントは、元のログから値が抽出されたかどうかをチェックし、抽出されている場合は principal.ip UDM フィールドに値を保存します。

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

rename ステートメントを使用して、urlreturnCodesize を保存します。

以下のステートメントの例では、rename ステートメントを使用して次の値を保存します。

  • target.url UDM フィールドに保存された url 変数。
  • network.http.response_code UDM フィールドに保存された returnCode 中間変数。
  • network.received_bytes UDM フィールドに保存された size 中間変数。
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

UDM レコードを出力にバインドする

データ マッピング指示の最後のステートメントでは、処理されたデータが UDM イベント レコードに出力されます。

mutate {
    merge => {
      "@output" => "event"
    }
  }

完全なパーサーコード

完全なパーサーコードの例を次に示します。命令の順序は、このドキュメントの前のセクションと同じ順序ではありませんが、同じ出力になります。

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter