パーサーの作成に関するヒントとトラブルシューティング

以下でサポートされています。

このドキュメントでは、パーサーコードの作成時に発生する可能性のある問題について説明します。

パーサーコードを作成するときに、解析手順が想定どおりに機能しない場合にエラーが発生します。エラーの発生原因となる可能性がある状況には次のものがあります。

  • Grok パターンが失敗する
  • rename オペレーションまたは replace オペレーションが失敗する
  • パーサーコードの構文エラー

パーサーコードでの一般的な手法

以降のセクションでは、問題のトラブルシューティングに役立つベスト プラクティス、ヒント、ソリューションについて説明します。

変数名にドットやハイフンを使用しないでください

変数名にハイフンやドットを使用すると、多くの場合、UDM フィールドに値を格納するために merge オペレーションを実行した際に、予期しない動作が発生する可能性があります。解析に関する問題が断続的に発生することもあります。

たとえば、次の変数名は使用しないでください。

  • my.variable.result
  • my-variable-result

代わりに、my_variable_result という変数名を使用してください。

変数名として特別な意味を持つ用語を使用しない

eventtimestamp などの特定の単語は、パーサーコードで特別な意味を持つ場合があります。

文字列 event は、単一の UDM レコードを表すためによく使用され、@output ステートメントで使用されます。ログ メッセージに event というフィールドが含まれている場合、または event という中間変数を定義し、パーサーコードで @output ステートメントで event という単語を使用している場合、名前の競合に関するエラー メッセージが表示されます。

中間変数の名前を変更するか、UDM フィールド名と @output ステートメントの接頭辞として event1 という用語を使用します。

timestamp という単語は、元の未加工ログの作成されたタイムスタンプを表します。この中間変数に設定された値は、metadata.event_timestamp UDM フィールドに保存されます。@timestamp という用語は、未加工ログが解析されて UDM レコードが作成された日時を表します。

次の例では、metadata.event_timestamp UDM フィールドに、未加工ログが解析された日時を設定します。

 # Save the log parse date and time to the timestamp variable
  mutate {
     rename => {
       "@timestamp" => "timestamp"
     }
   }

次の例では、metadata.event_timestamp UDM フィールドを、元の未加工ログから抽出され、when 中間変数に保存された日時に設定します。

   # Save the event timestamp to timestamp variable
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

次の用語は変数として使用しないでください。

  • collectiontimestamp
  • createtimestamp
  • 方法です。
  • filename
  • message
  • 名前空間
  • 出力
  • onerrorcount
  • timestamp
  • タイムゾーン

各データ値を個別の UDM フィールドに保存する

複数のフィールドを区切り文字で連結して 1 つの UDM フィールドに格納しないでください。次に例を示します。

"principal.user.first_name" => "first:%{first_name},last:%{last_name}"

代わりに、各値を個別の UDM フィールドに保存します。

"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"

コードでタブではなくスペースを使用する

パーサー コードではタブを使用しないでください。スペースのみを使用し、一度に 2 つのスペースでインデントします。

1 つのオペレーションで複数の統合アクションを実行しない

1 つのオペレーションで複数のフィールドを結合すると、結果が不整合になる可能性があります。代わりに、merge ステートメントを個別のオペレーションに配置します。

たとえば、次の例を置き換えます。

mutate {
  merge => {
      "security_result.category_details" => "category_details"
      "security_result.category_details" => "super_category_details"
  }
}

上記のコードブロックを次のコードブロックに置き換えます。

mutate {
  merge => {
    "security_result.category_details" => "category_details"
  }
}

mutate {
  merge => {
    "security_result.category_details" => "super_category_details"
  }
}

if 条件式と if else 条件式の選択

テスト対象の条件値に 1 つの一致のみが設定可能である場合は、if else 条件文を使用します。このアプローチは、やや効率的です。ただし、テスト対象の値が複数回一致する可能性があるシナリオでは、複数の異なる if ステートメントを使用し、最も一般的なケースから最も具体的なケースに順序付けます。

パーサーの変更をテストする代表的なログファイルのセットを指定する

さまざまな形式の未加工ログサンプルを使用してパーサーコードをテストすることをおすすめします。これにより、パーサーが処理する必要がある一意のログやエッジケースを見つけることができます。

パーサーコードに説明的なコメントを追加する

ステートメントの機能ではなく、ステートメントが重要な理由を説明するコメントをパーサーコードに追加します。このコメントは、パーサーをメンテナンスするすべてのユーザーがフローを追跡するのに役立ちます。次に例を示します。

# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
  mutate {
    replace => {
      "event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
    }
  }
}

中間変数を早期に初期化する

元の未加工ログから値を抽出する前に、テスト値の保存に使用される中間変数を初期化します。

これにより、中間変数が存在しないことを示すエラーが返されなくなります。

次のステートメントは、product 変数の値を metadata.product_name UDM フィールドに割り当てます。

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
  }
}

product 変数が存在しない場合、次のエラーが発生します。

"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"

on_error ステートメントを追加してエラーをキャッチできます。次に例を示します。

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  on_error => "_error_does_not_exist"
  }

上記の例のステートメントは、解析エラーを _error_does_not_exist というブール値の中間変数に正常にキャッチします。このことによっても、product 変数を if などの条件文で使用できるようにはなりません。次に例を示します。

if [product] != "" {
  mutate{
    replace => {
      "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  }
  on_error => "_error_does_not_exist"
}

上記の例では、if 条件句が on_error ステートメントをサポートしていないため、次のエラーが返されます。

"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"

この問題を解決するには、抽出フィルタ(jsoncsvxmlkvgrok)を実行する前に中間変数を初期化する別のステートメント ブロックを追加します。次に例を示します。

filter {
  # Initialize intermediate variables for any field you will use for a conditional check
  mutate {
    replace => {
      "timestamp" => ""
      "does_not_exist" => ""
    }
  }

  # load the logs fields from the message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }
}

更新されたパーサーコードのスニペットは、条件ステートメントを使用してフィールドが存在するかどうかを確認し、複数のシナリオを処理します。また、on_error ステートメントは、発生する可能性のあるエラーを処理します。

SHA-256 を base64 に変換する

次の例では、SHA-256 値を抽出し、base64 でエンコードし、エンコードされたデータを 16 進数文字列に変換してから、抽出および処理された値に特定のフィールドを置き換えます。

if [Sha256] != "" 
{
  base64
  {
  encoding => "RawStandard"
  source => "Sha256"
  target => "base64_sha256"
  on_error => "base64_message_error"
  }
  mutate
  {
    convert =>
    {
      "base64_sha256" => "bytestohex"
    }
    on_error => "already_a_string"
  }
  mutate
  {
    replace => 
  {
     "event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
     "event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
  }
  }
}

パーサー ステートメントのエラーを処理する

受信ログの形式が予期しないログ形式であるか、データの形式が正しくない場合があります。

これらのエラーを処理するようにパーサーを構築できます。ベスト プラクティスとして、抽出フィルタに on_error ハンドラを追加し、中間変数をテストしてから、パーサー ロジックの次のセグメントに進みます。

次の例では、on_error ステートメントで json 抽出フィルタを使用して、_not_json ブール値変数を設定します。_not_jsontrue に設定されている場合、受信したログエントリが有効な JSON 形式ではなく、ログエントリが正常に解析されなかったことを意味します。_not_json 変数が false の場合、受信したログエントリは有効な JSON 形式です。

 # load the incoming log from the default message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }

フィールドの形式が正しいかどうかをテストすることもできます。次の例では、_not_jsontrue に設定されているかどうかを確認します。これは、ログが想定される形式ではなかったことを示します。

 # Test that the received log matches the expected format
  if [_not_json] {
    drop { tag => "TAG_MALFORMED_MESSAGE" }
  } else {
    # timestamp is always expected
    if [timestamp] != "" {

      # ...additional parser logic goes here …

    } else {

      # if the timestamp field does not exist, it's not a log source
      drop { tag => "TAG_UNSUPPORTED" }
    }
  }

これにより、指定されたログタイプに対してログが正しくない形式で取り込まれても、解析が失敗することはありません。

tag 変数で drop フィルタを使用して、BigQuery の取り込み指標テーブルに条件がキャプチャされるようにします。

  • TAG_UNSUPPORTED
  • TAG_MALFORMED_ENCODING
  • TAG_MALFORMED_MESSAGE
  • TAG_NO_SECURITY_VALUE

drop フィルタを使用すると、パーサーが未加工ログの処理、フィールドの正規化、UDM レコードの作成を停止します。元の元ログは Google Security Operations に取り込まれ、Google Security Operations の元ログ検索を使用して検索できます。

tag 変数に渡された値は、取り込み指標テーブルの drop_reason_code フィールドに保存されます。次のようなアドホック クエリをテーブルに対して実行できます。

SELECT
  log_type,
  drop_reason_code,
  COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC

検証エラーのトラブルシューティング

パーサーをビルドするときに、UDM レコードに必須フィールドが設定されていないなど、検証に関連するエラーが発生することがあります。エラーは次のような形式になります。

Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"

パーサーコードは正常に実行されますが、生成された UDM レコードには、metadata.event_type に設定された値で定義されている必要な UDM フィールドがすべて含まれていません。このエラーの原因となるその他の例を次に示します。

  • metadata.event_typeUSER_LOGIN で、target.user value UDM フィールドが設定されていない場合。
  • metadata.event_typeNETWORK_CONNECTION で、target.hostname UDM フィールドが設定されていない場合。

metadata.event_type UDM フィールドと必須フィールドの詳細については、UDM 使用ガイドをご覧ください。

このタイプのエラーのトラブルシューティングを行う方法の一つは、最初に UDM フィールドに静的な値を設定することです。 必要な UDM フィールドをすべて定義したら、元の未加工ログを調べて、解析して UDM レコードに保存する値を確認します。元の未加工ログに特定のフィールドが含まれていない場合は、デフォルト値の設定が必要な場合があります。

次のテンプレートは、このアプローチを示す USER_LOGIN イベントタイプに固有のものです。

次の点に注意してください。

  • テンプレートは中間変数を初期化し、それぞれを静的文字列に設定します。
  • [フィールドの割り当て] セクションのコードは、中間変数の値を UDM フィールドに設定します。

このコードを拡張するには、追加の中間変数と UDM フィールドを追加します。入力する必要があるすべての UDM フィールドを特定したら、次の操作を行います。

  • [入力構成] セクションで、元の未加工ログからフィールドを抽出し、値を中間変数に設定するコードを追加します。

  • [日付の抽出] セクションで、元の未加工ログからイベント タイムスタンプを抽出し、変換して中間変数に設定するコードを追加します。

  • 必要に応じて、各中間変数に設定された初期化値を空の文字列に置き換えます。

filter {
 mutate {
   replace => {
     # UDM > Metadata
     "metadata_event_timestamp"    => ""
     "metadata_vendor_name"        => "Example"
     "metadata_product_name"       => "Example SSO"
     "metadata_product_version"    => "1.0"
     "metadata_product_event_type" => "login"
     "metadata_product_log_id"     => "12345678"
     "metadata_description"        => "A user logged in."
     "metadata_event_type"         => "USER_LOGIN"

     # UDM > Principal
     "principal_ip"       => "192.168.2.10"

     # UDM > Target
     "target_application"            => "Example Connect"
     "target_user_user_display_name" => "Mary Smith"
     "target_user_userid"            => "mary@example.com"

     # UDM > Extensions
     "auth_type"          => "SSO"
     "auth_mechanism"     => "USERNAME_PASSWORD"

     # UDM > Security Results
     "securityResult_action"         => "ALLOW"
     "security_result.severity"       => "LOW"

   }
 }

 # ------------ Input Configuration  --------------
  # Extract values from the message using one of the extraction filters: json, kv, grok

 # ------------ Date Extract  --------------
 # If the  date {} function is not used, the default is the normalization process time

  # ------------ Field Assignment  --------------
  # UDM Metadata
  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.vendor_name"        =>  "%{metadata_vendor_name}"
      "event1.idm.read_only_udm.metadata.product_name"       =>  "%{metadata_product_name}"
      "event1.idm.read_only_udm.metadata.product_version"    =>  "%{metadata_product_version}"
      "event1.idm.read_only_udm.metadata.product_event_type" =>  "%{metadata_product_event_type}"
      "event1.idm.read_only_udm.metadata.product_log_id"     =>  "%{metadata_product_log_id}"
      "event1.idm.read_only_udm.metadata.description"        =>  "%{metadata_description}"
      "event1.idm.read_only_udm.metadata.event_type"         =>  "%{metadata_event_type}"
    }
  }

  # Set the UDM > auth fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.extensions.auth.type"        => "%{auth_type}"
    }
    merge => {
      "event1.idm.read_only_udm.extensions.auth.mechanism"   => "auth_mechanism"
    }
  }

  # Set the UDM > principal fields
  mutate {
    merge => {
      "event1.idm.read_only_udm.principal.ip"                => "principal_ip"
    }
  }

  # Set the UDM > target fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.target.user.userid"             =>  "%{target_user_userid}"
      "event1.idm.read_only_udm.target.user.user_display_name"  =>  "%{target_user_user_display_name}"
      "event1.idm.read_only_udm.target.application"             =>  "%{target_application}"
    }
  }

  # Set the UDM > security_results fields
  mutate {
    merge => {
      "security_result.action" => "securityResult_action"
    }
  }

  # Set the security result
  mutate {
    merge => {
      "event1.idm.read_only_udm.security_result" => "security_result"
    }
  }

 # ------------ Output the event  --------------
  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

Grok 関数を使用して非構造化テキストを解析する

Grok 関数を使用して非構造化テキストから値を抽出する場合は、事前定義された Grok パターンと正規表現ステートメントを使用できます。Grok パターンを使用すると、コードの読みやすさが向上します。正規表現に省略文字(\w\s など)が含まれていない場合は、ステートメントをコピーしてパーサーコードに直接貼り付けることができます。

Grok パターンはステートメントの追加の抽象化レイヤであるため、エラーが発生した場合のトラブルシューティングが複雑になる可能性があります。次の例は、事前定義された Grok パターンと正規表現の両方を含む Grok 関数です。

grok {
  match => {
    "message" => [
      "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
    ]
  }
}

Grok パターンを使用しない抽出ステートメントの方がパフォーマンスが高い場合があります。たとえば、次の例では、一致する処理ステップの半分以下で済みます。ログソースの量が多い場合は、この点が重要な考慮事項となります。

RE2 と PCRE の正規表現の違いを理解する

Google Security Operations パーサーは、正規表現エンジンとして RE2 を使用します。PCRE 構文に精通している場合は、違いに気付くことがあります。次に例を示します。

PCRE ステートメントは次のとおりです。(?<_custom_field>\w+)\s

次のコードは、パーサー コードの RE2 ステートメントです。(?P<_custom_field>\\w+)\\s

エスケープ文字を必ずエスケープしてください

Google Security Operations は、受信した元のログデータを JSON エンコード形式で保存します。これは、正規表現の省略形のように見える文字列がリテラル文字列として解釈されるようにするためです。たとえば、\t はタブ文字ではなく、リテラル文字列として解釈されます。

次の例は、元の未加工ログと JSON エンコードされた形式のログを示しています。entry という用語を囲む各バックスラッシュ文字の前にエスケープ文字が追加されていることに注目してください。

元の未加工ログは次のとおりです。

field=\entry\

以下に示すようにログが JSON エンコード形式に変換されます。

field=\\entry\\

パーサーコードで正規表現を使用する場合は、値のみを抽出する場合は、エスケープ文字を追加する必要があります。元の未加工ログのバックスラッシュと一致するには、抽出ステートメントで 4 つのバックスラッシュを使用します。

以下は、パーサー コードの正規表現です。

^field=\\\\(?P<_value>.*)\\\\$

生成された結果は次のとおりです。_value という名前のグループには、entry という用語が保存されます。

"_value": "entry"

標準の正規表現ステートメントをパーサーコードに移動する場合は、抽出ステートメントで正規表現の省略形文字をエスケープします。たとえば、\s\\s に変更します。

抽出ステートメントで二重エスケープする場合は、正規表現の特殊文字を変更しないままにします。たとえば、\\\\ として変更されない状態で保持されます。

次の正規表現は標準の正規表現です。

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

次の正規表現は、パーサー コード内で機能するように変更されています。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

次の表に、標準の正規表現をパーサー コードに含める前に、追加のエスケープ文字を含める必要がある場合を示します。

正規表現 パーサーコードの正規表現を変更 変更の説明
\s
\\s
省略記号はエスケープする必要があります。
\.
\\.
予約文字はエスケープする必要があります。
\\"
\\\"
予約文字はエスケープする必要があります。
\]
\\]
予約文字はエスケープする必要があります。
\|
\\|
予約文字はエスケープする必要があります。
[^\\]+
[^\\\\]+
文字クラス グループ内の特殊文字はエスケープする必要があります。
\\\\
\\\\
文字クラス グループ外の特殊文字や省略形文字は、追加のエスケープ処理は必要ありません。

正規表現に名前付きキャプチャ グループを含める必要があります

"^.*$" などの正規表現は、有効な RE2 構文です。ただし、パーサー コードでは、次のエラーで失敗します。

"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"

有効なキャプチャ グループを式に追加する必要があります。 Grok パターンを使用する場合、デフォルトでは名前付きキャプチャ グループが含まれます。 正規表現のオーバーライドを使用する場合は、必ず名前付きグループを含めてください。

パーサーコードの正規表現の例を次に示します。

"^(?P<_catchall>.*$)"

結果は次のようになります。_catchall という名前のグループに割り当てられたテキストが表示されます。

"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."

キャッチオールの名前付きグループを使用して、式の作成を開始する

抽出ステートメントを作成する場合は、必要なものよりも多くのものをキャッチする式から始めます。次に、式を 1 つのフィールドずつ展開します。

次の例では、メッセージ全体に一致する名前付きグループ(_catchall)を使用することから開始しています。 次に、テキストの追加部分を照合して、式を段階的に構築します。各ステップで、_catchall という名前の名前付きグループは含まれている元のテキストが少ない状態です。 _catchall という名前付きグループが不要になるまで、メッセージを照合するステップを 1 つずつ繰り返します。

ステップ パーサーコードの正規表現 _catchall という名前付きキャプチャ グループの出力
1
"^(?P<_catchall>.*$)"
User \"BOB\" logged on to workstation \"DESKTOP-01\".
2
^User\s\\\"(?P<_catchall>.*$)
BOB\" logged on to workstation \"DESKTOP-01\".
3
^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$)
logged on to workstation \"DESKTOP-01\".
正規表現がテキスト文字列全体と一致するまで続けます。

正規表現の省略形文字をエスケープする

パーサーコードで式を使用する場合は、正規表現の簡略文字をエスケープしてください。 以下に、テキスト文字列の例と、最初の単語 This を抽出する標準の正規表現を示します。

  This is a sample log.

次の標準正規表現は、最初の単語 This を抽出します。ただし、この式をパーサーコードで実行すると、結果に文字 s がありません。

標準正規表現 _firstWord という名前付きキャプチャ グループの出力
"^(?P<_firstWord>[^\s]+)\s.*$" "_firstWord": "Thi",

これは、パーサーコードの正規表現では、省略形の文字に追加のエスケープ文字が必要になるためです。上の例では、\s\\s に変更する必要があります。

パーサーコードの正規表現を改訂 _firstWord という名前付きキャプチャ グループの出力
"^(?P<_firstWord>[^\\s]+)\\s.*$" "_firstWord": "This",

これは、\s\r\t などの省略文字にのみ適用されます。「``」などの他の文字については、それ以上エスケープする必要はありません。

完全なコード例

このセクションでは、前述のルールをエンドツーエンドの例として説明します。以下に、構造化されていないテキスト文字列と、文字列を解析するために記述された標準の正規表現を示します。最後に、パーサーコードで機能する変更された正規表現が含まれています。

元のテキスト文字列は次のとおりです。

User "BOB" logged on to workstation "DESKTOP-01".

以下に示すのは、テキスト文字列を解析する標準の RE2 正規表現です。

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

この式は次のフィールドを抽出します。

マッチグループ 文字の位置 テキスト文字列
完全一致 0-53
User \"BOB\" logged on to workstation \"DESKTOP-01\".
グループ「_user」 7-10
BOB
グループ 2 13-22
logged on
グループ「_device」 40-50
DESKTOP-01

これが変更後の式です。標準の RE2 正規表現を、パーサーコードで機能するように変更しました。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$