收集 Elasticsearch 記錄
支援的國家/地區:
Google SecOps
SIEM
本文說明如何使用 Amazon S3,將 Elasticsearch 記錄擷取至 Google Security Operations。剖析器會將原始 JSON 格式的記錄轉換為統一資料模型 (UDM)。這項功能會從巢狀 JSON 結構中擷取欄位,將這些欄位對應至 UDM 欄位,並使用嚴重性等級和使用者角色等安全性相關內容,擴充資料。
事前準備
- Google SecOps 執行個體
- Elasticsearch 叢集管理員的特殊存取權
- AWS (S3、IAM、EC2) 的特殊存取權
- 執行 Logstash 的 EC2 執行個體或永久主機
取得 Elasticsearch 必要條件
- 以管理員身分登入 Elasticsearch 叢集。
- 確認 Elasticsearch 訂閱方案包含安全性功能 (稽核記錄功能需要這項功能)。
- 記下 Elasticsearch 叢集名稱和版本,以供參考。
- 找出稽核記錄的寫入路徑 (預設為
$ES_HOME/logs/<clustername>_audit.json
)。
啟用 Elasticsearch 稽核記錄
- 在每個 Elasticsearch 節點上,編輯 elasticsearch.yml 設定檔。
新增下列設定:
xpack.security.audit.enabled: true
對叢集執行輪動式重新啟動,以套用變更:
- 停用分片分配:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- 逐一停止並重新啟動每個節點。
- 重新啟用分片分配:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- 停用分片分配:
確認稽核記錄是否在記錄目錄的
<clustername>_audit.json
中產生。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
elastic-search-logs
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 Logstash,將稽核記錄傳送至 S3
- 在可存取 Elasticsearch 稽核記錄檔的 EC2 執行個體或持續性主機上,安裝 Logstash。
如果沒有 S3 輸出外掛程式,請安裝:
bin/logstash-plugin install logstash-output-s3
建立 Logstash 設定檔 (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
使用設定檔啟動 Logstash:
bin/logstash -f elastic-to-s3.conf
選用:為 Google SecOps 建立唯讀 IAM 使用者
- 依序前往 AWS 管理中心 > IAM >「Users」(使用者) >「Add users」(新增使用者)。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定資訊提供,擷取 Elasticsearch 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Elasticsearch Logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Elastic Search」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://elastic-search-logs/logs/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
等級 | security_result.severity | 這項邏輯會檢查「層級」欄位的值,並對應至相應的 UDM 嚴重程度層級: -「INFO」、「ALL」、「OFF」、「TRACE」、「DEBUG」會對應至「INFORMATIONAL」。 -「WARN」已對應至「LOW」。 -「ERROR」對應至「ERROR」。 -「FATAL」會對應至「CRITICAL」。 |
message.@timestamp | 時間戳記 | 系統會使用「yyyy-MM-ddTHH:mm:ss.SSS」格式,從原始記錄的「message」欄位中剖析「@timestamp」欄位。 |
message.action | security_result.action_details | 值取自原始記錄「message」欄位中的「action」欄位。 |
message.event.action | security_result.summary | 值取自原始記錄的「message」欄位中的「event.action」欄位。 |
message.event.type | metadata.product_event_type | 值取自原始記錄的「message」欄位中的「event.type」欄位。 |
message.host.ip | target.ip | 值取自原始記錄「message」欄位中的「host.ip」欄位。 |
message.host.name | target.hostname | 值取自原始記錄「message」欄位中的「host.name」欄位。 |
message.indices | target.labels.value | 值取自原始記錄「message」欄位中的「indices」欄位。 |
message.mrId | target.hostname | 值取自原始記錄「message」欄位中的「mrId」欄位。 |
message.node.id | principal.asset.product_object_id | 值取自原始記錄「message」欄位中的「node.id」欄位。 |
message.node.name | target.asset.hostname | 值取自原始記錄的「message」欄位中的「node.name」欄位。 |
message.origin.address | principal.ip | 系統會從原始記錄的「message」欄位中,移除通訊埠編號,並從「origin.address」欄位擷取 IP 位址。 |
message.origin.type | principal.resource.resource_subtype | 值取自原始記錄「message」欄位中的「origin.type」欄位。 |
message.properties.host_group | principal.hostname | 值取自原始記錄「message」欄位中的「properties.host_group」欄位。 |
message.properties.host_group | target.group.group_display_name | 值取自原始記錄「message」欄位中的「properties.host_group」欄位。 |
message.request.id | target.resource.product_object_id | 值取自原始記錄「message」欄位中的「request.id」欄位。 |
message.request.name | target.resource.name | 值取自原始記錄「message」欄位中的「request.name」欄位。 |
message.user.name | principal.user.userid | 值取自原始記錄「message」欄位中的「user.name」欄位。 |
message.user.realm | principal.user.attribute.permissions.name | 值取自原始記錄「message」欄位中的「user.realm」欄位。 |
message.user.roles | about.user.attribute.roles.name | 值取自原始記錄「message」欄位中的「user.roles」欄位。 |
metadata.event_type | 硬式編碼值:「USER_RESOURCE_ACCESS」 | |
metadata.log_type | 硬式編碼值:「ELASTIC_SEARCH」 | |
metadata.product_name | 硬式編碼值:「ELASTICSEARCH」 | |
metadata.vendor_name | 硬式編碼值:「ELASTIC」 | |
principal.port | 通訊埠號碼是從原始記錄的「message」欄位中「origin.address」欄位擷取。 | |
target.labels.key | 硬式編碼值:「Indice」 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。