Elasticsearch のログを収集する
このドキュメントでは、Amazon S3 を使用して Elasticsearch ログを Google Security Operations に取り込む方法について説明します。パーサーは、未加工の JSON 形式のログを統合データモデル(UDM)に変換します。ネストされた JSON 構造からフィールドを抽出し、UDM フィールドにマッピングして、重大度レベルやユーザーロールなどのセキュリティ関連のコンテキストでデータを拡充します。
始める前に
- Google SecOps インスタンス
- Elasticsearch クラスタ管理への特権アクセス
- AWS(S3、IAM、EC2)への特権アクセス
- Logstash を実行する EC2 インスタンスまたは永続ホスト
Elasticsearch の前提条件を取得する
- 管理者として Elasticsearch クラスタにログインします。
- Elasticsearch サブスクリプションにセキュリティ機能(監査ロギングに必要)が含まれていることを確認します。
- 参照用に Elasticsearch クラスタの名前とバージョンをメモします。
- 監査ログが書き込まれるパスを特定します(デフォルト:
$ES_HOME/logs/<clustername>_audit.json
)。
Elasticsearch 監査ロギングを有効にする
- 各 Elasticsearch ノードで、elasticsearch.yml 構成ファイルを編集します。
次の設定を追加します。
xpack.security.audit.enabled: true
クラスタのローリング再起動を実行して、変更を適用します。
- シャード割り当てを無効にする:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- 各ノードを 1 つずつ停止して再起動します。
- シャード割り当てを再度有効にする:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- シャード割り当てを無効にする:
監査ログがログ ディレクトリの
<clustername>_audit.json
に生成されていることを確認します。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
elastic-search-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションの [権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
監査ログを S3 に送信するように Logstash を構成する
- Elasticsearch 監査ログファイルにアクセスできる EC2 インスタンスまたは永続ホストに Logstash をインストールします。
S3 出力プラグインがまだ存在しない場合は、インストールします。
bin/logstash-plugin install logstash-output-s3
Logstash 構成ファイル(
elastic-to-s3.conf
)を作成します。input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
構成を使用して Logstash を起動します。
bin/logstash -f elastic-to-s3.conf
省略可: Google SecOps 用の読み取り専用 IAM ユーザーを作成する
- AWS コンソール > IAM > ユーザー > ユーザーを追加 に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「
secops-reader
」と入力します。 - アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
- ユーザー: 「
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON エディタで、次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Elasticsearch のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Elasticsearch Logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Elastic Search] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://elastic-search-logs/logs/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
レベル | security_result.severity | このロジックは「Level」フィールドの値を確認し、対応する UDM の重大度レベルにマッピングします。 - 「INFO」、「ALL」、「OFF」、「TRACE」、「DEBUG」は「INFORMATIONAL」にマッピングされます。 - 「WARN」は「LOW」にマッピングされます。 - 「ERROR」は「ERROR」にマッピングされます。 - 「FATAL」は「CRITICAL」にマッピングされます。 |
message.@timestamp | timestamp | タイムスタンプは、未加工ログの「message」フィールド内の「@timestamp」フィールドから、「yyyy-MM-ddTHH:mm:ss.SSS」形式で解析されます。 |
message.action | security_result.action_details | 値は、未加工ログの「message」フィールド内の「action」フィールドから取得されます。 |
message.event.action | security_result.summary | 値は、未加工ログの「message」フィールド内の「event.action」フィールドから取得されます。 |
message.event.type | metadata.product_event_type | 値は、未加工ログの「message」フィールド内の「event.type」フィールドから取得されます。 |
message.host.ip | target.ip | 値は、未加工ログの「message」フィールド内の「host.ip」フィールドから取得されます。 |
message.host.name | target.hostname | 値は、未加工ログの「message」フィールド内の「host.name」フィールドから取得されます。 |
message.indices | target.labels.value | 値は、未加工ログの「message」フィールド内の「indices」フィールドから取得されます。 |
message.mrId | target.hostname | 値は、未加工ログの「message」フィールド内の「mrId」フィールドから取得されます。 |
message.node.id | principal.asset.product_object_id | 値は、未加工ログの「message」フィールド内の「node.id」フィールドから取得されます。 |
message.node.name | target.asset.hostname | 値は、未加工ログの「message」フィールド内の「node.name」フィールドから取得されます。 |
message.origin.address | principal.ip | IP アドレスは、未加工ログの「message」フィールド内の「origin.address」フィールドから、ポート番号を削除して抽出されます。 |
message.origin.type | principal.resource.resource_subtype | 値は、未加工ログの「message」フィールド内の「origin.type」フィールドから取得されます。 |
message.properties.host_group | principal.hostname | 値は、未加工ログの「message」フィールド内の「properties.host_group」フィールドから取得されます。 |
message.properties.host_group | target.group.group_display_name | 値は、未加工ログの「message」フィールド内の「properties.host_group」フィールドから取得されます。 |
message.request.id | target.resource.product_object_id | 値は、未加工ログの「message」フィールド内の「request.id」フィールドから取得されます。 |
message.request.name | target.resource.name | 値は、未加工ログの「message」フィールド内の「request.name」フィールドから取得されます。 |
message.user.name | principal.user.userid | 値は、未加工ログの「message」フィールド内の「user.name」フィールドから取得されます。 |
message.user.realm | principal.user.attribute.permissions.name | 値は、未加工ログの「message」フィールド内の「user.realm」フィールドから取得されます。 |
message.user.roles | about.user.attribute.roles.name | 値は、未加工ログの「message」フィールド内の「user.roles」フィールドから取得されます。 |
metadata.event_type | ハードコードされた値: 「USER_RESOURCE_ACCESS」 | |
metadata.log_type | ハードコードされた値: 「ELASTIC_SEARCH」 | |
metadata.product_name | ハードコードされた値: 「ELASTICSEARCH」 | |
metadata.vendor_name | ハードコードされた値: 「ELASTIC」 | |
principal.port | ポート番号は、未加工ログの「message」フィールド内の「origin.address」フィールドから抽出されます。 | |
target.labels.key | ハードコードされた値: 「Indice」 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。