パーサーの作成に関するヒントとトラブルシューティング
このドキュメントでは、パーサーコードの作成時に発生する可能性のある問題について説明します。
パーサーコードを作成するときに、解析手順が想定どおりに機能しない場合にエラーが発生します。エラーの発生原因となる可能性がある状況には次のものがあります。
Grok
パターンが失敗するrename
オペレーションまたはreplace
オペレーションが失敗する- パーサーコードの構文エラー
パーサーコードでの一般的な手法
以降のセクションでは、問題のトラブルシューティングに役立つベスト プラクティス、ヒント、ソリューションについて説明します。
変数名にドットやハイフンを使用しないでください
変数名にハイフンやドットを使用すると、多くの場合、UDM フィールドに値を格納するために merge
オペレーションを実行した際に、予期しない動作が発生する可能性があります。解析に関する問題が断続的に発生することもあります。
たとえば、次の変数名は使用しないでください。
my.variable.result
my-variable-result
代わりに、my_variable_result
という変数名を使用してください。
変数名として特別な意味を持つ用語を使用しない
event
や timestamp
などの特定の単語は、パーサーコードで特別な意味を持つ場合があります。
文字列 event
は、単一の UDM レコードを表すためによく使用され、@output
ステートメントで使用されます。ログ メッセージに event
というフィールドが含まれている場合、または event
という中間変数を定義し、パーサーコードで @output
ステートメントで event
という単語を使用している場合、名前の競合に関するエラー メッセージが表示されます。
中間変数の名前を変更するか、UDM フィールド名と @output
ステートメントの接頭辞として event1
という用語を使用します。
timestamp
という単語は、元の未加工ログの作成されたタイムスタンプを表します。この中間変数に設定された値は、metadata.event_timestamp
UDM フィールドに保存されます。@timestamp
という用語は、未加工ログが解析されて UDM レコードが作成された日時を表します。
次の例では、metadata.event_timestamp
UDM フィールドに、未加工ログが解析された日時を設定します。
# Save the log parse date and time to the timestamp variable
mutate {
rename => {
"@timestamp" => "timestamp"
}
}
次の例では、metadata.event_timestamp
UDM フィールドを、元の未加工ログから抽出され、when
中間変数に保存された日時に設定します。
# Save the event timestamp to timestamp variable
mutate {
rename => {
"when" => "timestamp"
}
}
次の用語は変数として使用しないでください。
- collectiontimestamp
- createtimestamp
- 方法です。
- filename
- message
- 名前空間
- 出力
- onerrorcount
- timestamp
- タイムゾーン
各データ値を個別の UDM フィールドに保存する
複数のフィールドを区切り文字で連結して 1 つの UDM フィールドに格納しないでください。次に例を示します。
"principal.user.first_name" => "first:%{first_name},last:%{last_name}"
代わりに、各値を個別の UDM フィールドに保存します。
"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"
コードでタブではなくスペースを使用する
パーサー コードではタブを使用しないでください。スペースのみを使用し、一度に 2 つのスペースでインデントします。
1 つのオペレーションで複数の統合アクションを実行しない
1 つのオペレーションで複数のフィールドを結合すると、結果が不整合になる可能性があります。代わりに、merge
ステートメントを個別のオペレーションに配置します。
たとえば、次の例を置き換えます。
mutate {
merge => {
"security_result.category_details" => "category_details"
"security_result.category_details" => "super_category_details"
}
}
上記のコードブロックを次のコードブロックに置き換えます。
mutate {
merge => {
"security_result.category_details" => "category_details"
}
}
mutate {
merge => {
"security_result.category_details" => "super_category_details"
}
}
if
条件式と if else
条件式の選択
テスト対象の条件値に 1 つの一致のみが設定可能である場合は、if else
条件文を使用します。このアプローチは、やや効率的です。ただし、テスト対象の値が複数回一致する可能性があるシナリオでは、複数の異なる if
ステートメントを使用し、最も一般的なケースから最も具体的なケースに順序付けます。
パーサーの変更をテストする代表的なログファイルのセットを指定する
さまざまな形式の未加工ログサンプルを使用してパーサーコードをテストすることをおすすめします。これにより、パーサーが処理する必要がある一意のログやエッジケースを見つけることができます。
パーサーコードに説明的なコメントを追加する
ステートメントの機能ではなく、ステートメントが重要な理由を説明するコメントをパーサーコードに追加します。このコメントは、パーサーをメンテナンスするすべてのユーザーがフローを追跡するのに役立ちます。次に例を示します。
# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
mutate {
replace => {
"event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
}
}
}
中間変数を早期に初期化する
元の未加工ログから値を抽出する前に、テスト値の保存に使用される中間変数を初期化します。
これにより、中間変数が存在しないことを示すエラーが返されなくなります。
次のステートメントは、product
変数の値を metadata.product_name
UDM フィールドに割り当てます。
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
product
変数が存在しない場合、次のエラーが発生します。
"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"
on_error
ステートメントを追加してエラーをキャッチできます。次に例を示します。
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
on_error => "_error_does_not_exist"
}
上記の例のステートメントは、解析エラーを _error_does_not_exist
というブール値の中間変数に正常にキャッチします。このことによっても、product
変数を if
などの条件文で使用できるようにはなりません。次に例を示します。
if [product] != "" {
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
on_error => "_error_does_not_exist"
}
上記の例では、if
条件句が on_error
ステートメントをサポートしていないため、次のエラーが返されます。
"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"
この問題を解決するには、抽出フィルタ(json
、csv
、xml
、kv
、grok
)を実行する前に中間変数を初期化する別のステートメント ブロックを追加します。次に例を示します。
filter {
# Initialize intermediate variables for any field you will use for a conditional check
mutate {
replace => {
"timestamp" => ""
"does_not_exist" => ""
}
}
# load the logs fields from the message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
}
更新されたパーサーコードのスニペットは、条件ステートメントを使用してフィールドが存在するかどうかを確認し、複数のシナリオを処理します。また、on_error
ステートメントは、発生する可能性のあるエラーを処理します。
SHA-256 を base64 に変換する
次の例では、SHA-256 値を抽出し、base64 でエンコードし、エンコードされたデータを 16 進数文字列に変換してから、抽出および処理された値に特定のフィールドを置き換えます。
if [Sha256] != ""
{
base64
{
encoding => "RawStandard"
source => "Sha256"
target => "base64_sha256"
on_error => "base64_message_error"
}
mutate
{
convert =>
{
"base64_sha256" => "bytestohex"
}
on_error => "already_a_string"
}
mutate
{
replace =>
{
"event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
"event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
}
}
}
パーサー ステートメントのエラーを処理する
受信ログの形式が予期しないログ形式であるか、データの形式が正しくない場合があります。
これらのエラーを処理するようにパーサーを構築できます。ベスト プラクティスとして、抽出フィルタに on_error
ハンドラを追加し、中間変数をテストしてから、パーサー ロジックの次のセグメントに進みます。
次の例では、on_error
ステートメントで json
抽出フィルタを使用して、_not_json
ブール値変数を設定します。_not_json
が true
に設定されている場合、受信したログエントリが有効な JSON 形式ではなく、ログエントリが正常に解析されなかったことを意味します。_not_json
変数が false
の場合、受信したログエントリは有効な JSON 形式です。
# load the incoming log from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
フィールドの形式が正しいかどうかをテストすることもできます。次の例では、_not_json
が true
に設定されているかどうかを確認します。これは、ログが想定される形式ではなかったことを示します。
# Test that the received log matches the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
# timestamp is always expected
if [timestamp] != "" {
# ...additional parser logic goes here …
} else {
# if the timestamp field does not exist, it's not a log source
drop { tag => "TAG_UNSUPPORTED" }
}
}
これにより、指定されたログタイプに対してログが正しくない形式で取り込まれても、解析が失敗することはありません。
tag
変数で drop
フィルタを使用して、BigQuery の取り込み指標テーブルに条件がキャプチャされるようにします。
TAG_UNSUPPORTED
TAG_MALFORMED_ENCODING
TAG_MALFORMED_MESSAGE
TAG_NO_SECURITY_VALUE
drop
フィルタを使用すると、パーサーが未加工ログの処理、フィールドの正規化、UDM レコードの作成を停止します。元の元ログは Google Security Operations に取り込まれ、Google Security Operations の元ログ検索を使用して検索できます。
tag
変数に渡された値は、取り込み指標テーブルの drop_reason_code
フィールドに保存されます。次のようなアドホック クエリをテーブルに対して実行できます。
SELECT
log_type,
drop_reason_code,
COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC
検証エラーのトラブルシューティング
パーサーをビルドするときに、UDM レコードに必須フィールドが設定されていないなど、検証に関連するエラーが発生することがあります。エラーは次のような形式になります。
Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"
パーサーコードは正常に実行されますが、生成された UDM レコードには、metadata.event_type
に設定された値で定義されている必要な UDM フィールドがすべて含まれていません。このエラーの原因となるその他の例を次に示します。
metadata.event_type
がUSER_LOGIN
で、target.user value
UDM フィールドが設定されていない場合。metadata.event_type
がNETWORK_CONNECTION
で、target.hostname
UDM フィールドが設定されていない場合。
metadata.event_type
UDM フィールドと必須フィールドの詳細については、UDM 使用ガイドをご覧ください。
このタイプのエラーのトラブルシューティングを行う方法の一つは、最初に UDM フィールドに静的な値を設定することです。 必要な UDM フィールドをすべて定義したら、元の未加工ログを調べて、解析して UDM レコードに保存する値を確認します。元の未加工ログに特定のフィールドが含まれていない場合は、デフォルト値の設定が必要な場合があります。
次のテンプレートは、このアプローチを示す USER_LOGIN
イベントタイプに固有のものです。
次の点に注意してください。
- テンプレートは中間変数を初期化し、それぞれを静的文字列に設定します。
- [フィールドの割り当て] セクションのコードは、中間変数の値を UDM フィールドに設定します。
このコードを拡張するには、追加の中間変数と UDM フィールドを追加します。入力する必要があるすべての UDM フィールドを特定したら、次の操作を行います。
[入力構成] セクションで、元の未加工ログからフィールドを抽出し、値を中間変数に設定するコードを追加します。
[日付の抽出] セクションで、元の未加工ログからイベント タイムスタンプを抽出し、変換して中間変数に設定するコードを追加します。
必要に応じて、各中間変数に設定された初期化値を空の文字列に置き換えます。
filter {
mutate {
replace => {
# UDM > Metadata
"metadata_event_timestamp" => ""
"metadata_vendor_name" => "Example"
"metadata_product_name" => "Example SSO"
"metadata_product_version" => "1.0"
"metadata_product_event_type" => "login"
"metadata_product_log_id" => "12345678"
"metadata_description" => "A user logged in."
"metadata_event_type" => "USER_LOGIN"
# UDM > Principal
"principal_ip" => "192.168.2.10"
# UDM > Target
"target_application" => "Example Connect"
"target_user_user_display_name" => "Mary Smith"
"target_user_userid" => "mary@example.com"
# UDM > Extensions
"auth_type" => "SSO"
"auth_mechanism" => "USERNAME_PASSWORD"
# UDM > Security Results
"securityResult_action" => "ALLOW"
"security_result.severity" => "LOW"
}
}
# ------------ Input Configuration --------------
# Extract values from the message using one of the extraction filters: json, kv, grok
# ------------ Date Extract --------------
# If the date {} function is not used, the default is the normalization process time
# ------------ Field Assignment --------------
# UDM Metadata
mutate {
replace => {
"event1.idm.read_only_udm.metadata.vendor_name" => "%{metadata_vendor_name}"
"event1.idm.read_only_udm.metadata.product_name" => "%{metadata_product_name}"
"event1.idm.read_only_udm.metadata.product_version" => "%{metadata_product_version}"
"event1.idm.read_only_udm.metadata.product_event_type" => "%{metadata_product_event_type}"
"event1.idm.read_only_udm.metadata.product_log_id" => "%{metadata_product_log_id}"
"event1.idm.read_only_udm.metadata.description" => "%{metadata_description}"
"event1.idm.read_only_udm.metadata.event_type" => "%{metadata_event_type}"
}
}
# Set the UDM > auth fields
mutate {
replace => {
"event1.idm.read_only_udm.extensions.auth.type" => "%{auth_type}"
}
merge => {
"event1.idm.read_only_udm.extensions.auth.mechanism" => "auth_mechanism"
}
}
# Set the UDM > principal fields
mutate {
merge => {
"event1.idm.read_only_udm.principal.ip" => "principal_ip"
}
}
# Set the UDM > target fields
mutate {
replace => {
"event1.idm.read_only_udm.target.user.userid" => "%{target_user_userid}"
"event1.idm.read_only_udm.target.user.user_display_name" => "%{target_user_user_display_name}"
"event1.idm.read_only_udm.target.application" => "%{target_application}"
}
}
# Set the UDM > security_results fields
mutate {
merge => {
"security_result.action" => "securityResult_action"
}
}
# Set the security result
mutate {
merge => {
"event1.idm.read_only_udm.security_result" => "security_result"
}
}
# ------------ Output the event --------------
mutate {
merge => {
"@output" => "event1"
}
}
}
Grok 関数を使用して非構造化テキストを解析する
Grok 関数を使用して非構造化テキストから値を抽出する場合は、事前定義された Grok パターンと正規表現ステートメントを使用できます。Grok パターンを使用すると、コードの読みやすさが向上します。正規表現に省略文字(\w
、\s
など)が含まれていない場合は、ステートメントをコピーしてパーサーコードに直接貼り付けることができます。
Grok パターンはステートメントの追加の抽象化レイヤであるため、エラーが発生した場合のトラブルシューティングが複雑になる可能性があります。次の例は、事前定義された Grok パターンと正規表現の両方を含む Grok 関数です。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
}
Grok パターンを使用しない抽出ステートメントの方がパフォーマンスが高い場合があります。たとえば、次の例では、一致する処理ステップの半分以下で済みます。ログソースの量が多い場合は、この点が重要な考慮事項となります。
RE2 と PCRE の正規表現の違いを理解する
Google Security Operations パーサーは、正規表現エンジンとして RE2 を使用します。PCRE 構文に精通している場合は、違いに気付くことがあります。次に例を示します。
PCRE ステートメントは次のとおりです。(?<_custom_field>\w+)\s
次のコードは、パーサー コードの RE2 ステートメントです。(?P<_custom_field>\\w+)\\s
エスケープ文字を必ずエスケープしてください
Google Security Operations は、受信した元のログデータを JSON エンコード形式で保存します。これは、正規表現の省略形のように見える文字列がリテラル文字列として解釈されるようにするためです。たとえば、\t
はタブ文字ではなく、リテラル文字列として解釈されます。
次の例は、元の未加工ログと JSON エンコードされた形式のログを示しています。entry
という用語を囲む各バックスラッシュ文字の前にエスケープ文字が追加されていることに注目してください。
元の未加工ログは次のとおりです。
field=\entry\
以下に示すようにログが JSON エンコード形式に変換されます。
field=\\entry\\
パーサーコードで正規表現を使用する場合は、値のみを抽出する場合は、エスケープ文字を追加する必要があります。元の未加工ログのバックスラッシュと一致するには、抽出ステートメントで 4 つのバックスラッシュを使用します。
以下は、パーサー コードの正規表現です。
^field=\\\\(?P<_value>.*)\\\\$
生成された結果は次のとおりです。_value
という名前のグループには、entry
という用語が保存されます。
"_value": "entry"
標準の正規表現ステートメントをパーサーコードに移動する場合は、抽出ステートメントで正規表現の省略形文字をエスケープします。たとえば、\s
を \\s
に変更します。
抽出ステートメントで二重エスケープする場合は、正規表現の特殊文字を変更しないままにします。たとえば、\\
は \\
として変更されない状態で保持されます。
次の正規表現は標準の正規表現です。
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
次の正規表現は、パーサー コード内で機能するように変更されています。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
次の表に、標準の正規表現をパーサー コードに含める前に、追加のエスケープ文字を含める必要がある場合を示します。
正規表現 | パーサーコードの正規表現を変更 | 変更の説明 |
---|---|---|
\s |
\\s |
省略記号はエスケープする必要があります。 |
\. |
\\. |
予約文字はエスケープする必要があります。 |
\\" |
\\\" |
予約文字はエスケープする必要があります。 |
\] |
\\] |
予約文字はエスケープする必要があります。 |
\| |
\\| |
予約文字はエスケープする必要があります。 |
[^\\]+ |
[^\\\\]+ |
文字クラス グループ内の特殊文字はエスケープする必要があります。 |
\\\\ |
\\\\ |
文字クラス グループ外の特殊文字や省略形文字は、追加のエスケープ処理は必要ありません。 |
正規表現に名前付きキャプチャ グループを含める必要があります
"^.*$"
などの正規表現は、有効な RE2 構文です。ただし、パーサー コードでは、次のエラーで失敗します。
"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"
有効なキャプチャ グループを式に追加する必要があります。 Grok パターンを使用する場合、デフォルトでは名前付きキャプチャ グループが含まれます。 正規表現のオーバーライドを使用する場合は、必ず名前付きグループを含めてください。
パーサーコードの正規表現の例を次に示します。
"^(?P<_catchall>.*$)"
結果は次のようになります。_catchall
という名前のグループに割り当てられたテキストが表示されます。
"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."
キャッチオールの名前付きグループを使用して、式の作成を開始する
抽出ステートメントを作成する場合は、必要なものよりも多くのものをキャッチする式から始めます。次に、式を 1 つのフィールドずつ展開します。
次の例では、メッセージ全体に一致する名前付きグループ(_catchall
)を使用することから開始しています。 次に、テキストの追加部分を照合して、式を段階的に構築します。各ステップで、_catchall
という名前の名前付きグループは含まれている元のテキストが少ない状態です。 _catchall
という名前付きグループが不要になるまで、メッセージを照合するステップを 1 つずつ繰り返します。
ステップ | パーサーコードの正規表現 | _catchall という名前付きキャプチャ グループの出力 |
---|---|---|
1 | "^(?P<_catchall>.*$)" |
User \"BOB\" logged on to workstation \"DESKTOP-01\". |
2 | ^User\s\\\"(?P<_catchall>.*$) |
BOB\" logged on to workstation \"DESKTOP-01\". |
3 | ^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$) |
logged on to workstation \"DESKTOP-01\". |
正規表現がテキスト文字列全体と一致するまで続けます。 |
正規表現の省略形文字をエスケープする
パーサーコードで式を使用する場合は、正規表現の簡略文字をエスケープしてください。 以下に、テキスト文字列の例と、最初の単語 This
を抽出する標準の正規表現を示します。
This is a sample log.
次の標準正規表現は、最初の単語 This
を抽出します。ただし、この式をパーサーコードで実行すると、結果に文字 s
がありません。
標準正規表現 | _firstWord という名前付きキャプチャ グループの出力 |
---|---|
"^(?P<_firstWord>[^\s]+)\s.*$" |
"_firstWord": "Thi", |
これは、パーサーコードの正規表現では、省略形の文字に追加のエスケープ文字が必要になるためです。上の例では、\s
を \\s
に変更する必要があります。
パーサーコードの正規表現を改訂 | _firstWord という名前付きキャプチャ グループの出力 |
---|---|
"^(?P<_firstWord>[^\\s]+)\\s.*$" |
"_firstWord": "This", |
これは、\s
、\r
、\t
などの省略文字にのみ適用されます。「``」などの他の文字については、それ以上エスケープする必要はありません。
完全なコード例
このセクションでは、前述のルールをエンドツーエンドの例として説明します。以下に、構造化されていないテキスト文字列と、文字列を解析するために記述された標準の正規表現を示します。最後に、パーサーコードで機能する変更された正規表現が含まれています。
元のテキスト文字列は次のとおりです。
User "BOB" logged on to workstation "DESKTOP-01".
以下に示すのは、テキスト文字列を解析する標準の RE2 正規表現です。
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
この式は次のフィールドを抽出します。
マッチグループ | 文字の位置 | テキスト文字列 |
---|---|---|
完全一致 | 0-53 | User \"BOB\" logged on to workstation \"DESKTOP-01\". |
グループ「_user」 | 7-10 | BOB |
グループ 2 | 13-22 | logged on |
グループ「_device」 | 40-50 | DESKTOP-01 |
これが変更後の式です。標準の RE2 正規表現を、パーサーコードで機能するように変更しました。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$