收集 Elasticsearch 日志
支持的平台:
Google SecOps
SIEM
本文档介绍了如何使用 Amazon S3 将 Elasticsearch 日志注入到 Google Security Operations。解析器会将原始 JSON 格式的日志转换为统一数据模型 (UDM)。它从嵌套的 JSON 结构中提取字段,将其映射到 UDM 字段,并使用与安全性相关的上下文(例如严重程度和用户角色)来丰富数据。
准备工作
- Google SecOps 实例
- 对 Elasticsearch 集群管理的特权访问权限
- 对 AWS(S3、IAM、EC2)的特权访问权限
- 用于运行 Logstash 的 EC2 实例或持久性主机
获取 Elasticsearch 前提条件
- 以管理员身份登录 Elasticsearch 集群。
- 验证您的 Elasticsearch 订阅是否包含安全功能(审核日志记录需要此功能)。
- 记下您的 Elasticsearch 集群名称和版本以供参考。
- 确定审核日志的写入路径(默认:
$ES_HOME/logs/<clustername>_audit.json
)。
启用 Elasticsearch 审核日志记录
- 在每个 Elasticsearch 节点上,修改 elasticsearch.yml 配置文件。
添加以下设置:
xpack.security.audit.enabled: true
对集群执行滚动重启以应用更改:
- 停用分片分配:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- 逐一停止并重启每个节点。
- 重新启用分片分配:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- 停用分片分配:
验证是否在日志目录中的
<clustername>_audit.json
生成审核日志。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
elastic-search-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
配置 Logstash 以将审核日志发送到 S3
- 在可访问 Elasticsearch 审核日志文件的 EC2 实例或持久主机上安装 Logstash。
安装 S3 输出插件(如果尚未安装):
bin/logstash-plugin install logstash-output-s3
创建 Logstash 配置文件 (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
使用配置启动 Logstash:
bin/logstash -f elastic-to-s3.conf
可选:为 Google SecOps 创建只读 IAM 用户
- 依次前往 AWS 控制台 > IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader
。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Elasticsearch 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Elasticsearch Logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Elastic Search 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://elastic-search-logs/logs/
- 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
级别 | security_result.severity | 该逻辑会检查“级别”字段的值,并将其映射到相应的 UDM 严重程度级别: - “INFO”“ALL”“OFF”“TRACE”“DEBUG”会映射到“INFORMATIONAL”。 -“WARN”映射到“LOW”。 -“ERROR”映射到“ERROR”。 -“FATAL”映射到“CRITICAL”。 |
message.@timestamp | 时间戳 | 时间戳是从原始日志的“message”字段中的“@timestamp”字段解析出来的,采用“yyyy-MM-ddTHH:mm:ss.SSS”格式。 |
message.action | security_result.action_details | 值取自原始日志的“message”字段中的“action”字段。 |
message.event.action | security_result.summary | 值取自原始日志的“message”字段中的“event.action”字段。 |
message.event.type | metadata.product_event_type | 值取自原始日志的“message”字段中的“event.type”字段。 |
message.host.ip | target.ip | 值取自原始日志的“message”字段中的“host.ip”字段。 |
message.host.name | target.hostname | 值取自原始日志的“message”字段中的“host.name”字段。 |
message.indices | target.labels.value | 值取自原始日志的“message”字段中的“indices”字段。 |
message.mrId | target.hostname | 值取自原始日志的“message”字段中的“mrId”字段。 |
message.node.id | principal.asset.product_object_id | 值取自原始日志的“message”字段中的“node.id”字段。 |
message.node.name | target.asset.hostname | 值取自原始日志的“message”字段中的“node.name”字段。 |
message.origin.address | principal.ip | 系统会从原始日志的“message”字段中的“origin.address”字段提取 IP 地址,方法是移除端口号。 |
message.origin.type | principal.resource.resource_subtype | 值取自原始日志的“message”字段中的“origin.type”字段。 |
message.properties.host_group | principal.hostname | 值取自原始日志的“message”字段中的“properties.host_group”字段。 |
message.properties.host_group | target.group.group_display_name | 值取自原始日志的“message”字段中的“properties.host_group”字段。 |
message.request.id | target.resource.product_object_id | 值取自原始日志的“message”字段中的“request.id”字段。 |
message.request.name | target.resource.name | 值取自原始日志的“message”字段中的“request.name”字段。 |
message.user.name | principal.user.userid | 值取自原始日志的“message”字段中的“user.name”字段。 |
message.user.realm | principal.user.attribute.permissions.name | 值取自原始日志的“message”字段中的“user.realm”字段。 |
message.user.roles | about.user.attribute.roles.name | 值取自原始日志的“message”字段中的“user.roles”字段。 |
metadata.event_type | 硬编码值:“USER_RESOURCE_ACCESS” | |
metadata.log_type | 硬编码值:“ELASTIC_SEARCH” | |
metadata.product_name | 硬编码值:“ELASTICSEARCH” | |
metadata.vendor_name | 硬编码值:“ELASTIC” | |
principal.port | 端口号是从原始日志的“message”字段中的“origin.address”字段提取的。 | |
target.labels.key | 硬编码值:“Indice” |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。