收集 Elasticsearch 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Elasticsearch 日志注入到 Google Security Operations。解析器会将原始 JSON 格式的日志转换为统一数据模型 (UDM)。它从嵌套的 JSON 结构中提取字段,将其映射到 UDM 字段,并使用与安全性相关的上下文(例如严重程度和用户角色)来丰富数据。

准备工作

  • Google SecOps 实例
  • Elasticsearch 集群管理的特权访问权限
  • AWS(S3、IAM、EC2)的特权访问权限
  • 用于运行 Logstash 的 EC2 实例或持久性主机

获取 Elasticsearch 前提条件

  1. 以管理员身份登录 Elasticsearch 集群
  2. 验证您的 Elasticsearch 订阅是否包含安全功能(审核日志记录需要此功能)。
  3. 记下您的 Elasticsearch 集群名称和版本以供参考。
  4. 确定审核日志的写入路径(默认:$ES_HOME/logs/<clustername>_audit.json)。

启用 Elasticsearch 审核日志记录

  1. 在每个 Elasticsearch 节点上,修改 elasticsearch.yml 配置文件。
  2. 添加以下设置:

    xpack.security.audit.enabled: true
    
  3. 对集群执行滚动重启以应用更改:

    • 停用分片分配:PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • 逐一停止并重启每个节点。
    • 重新启用分片分配:PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. 验证是否在日志目录中的 <clustername>_audit.json 生成审核日志。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 elastic-search-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

配置 Logstash 以将审核日志发送到 S3

  1. 在可访问 Elasticsearch 审核日志文件的 EC2 实例或持久主机上安装 Logstash
  2. 安装 S3 输出插件(如果尚未安装):

    bin/logstash-plugin install logstash-output-s3
    
  3. 创建 Logstash 配置文件 (elastic-to-s3.conf):

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. 使用配置启动 Logstash:

    bin/logstash -f elastic-to-s3.conf
    

可选:为 Google SecOps 创建只读 IAM 用户

  1. 依次前往 AWS 控制台 > IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. 将名称设置为 secops-reader-policy

  8. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  10. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Elasticsearch 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Elasticsearch Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Elastic Search 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://elastic-search-logs/logs/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
级别 security_result.severity 该逻辑会检查“级别”字段的值,并将其映射到相应的 UDM 严重程度级别:
- “INFO”“ALL”“OFF”“TRACE”“DEBUG”会映射到“INFORMATIONAL”。
-“WARN”映射到“LOW”。
-“ERROR”映射到“ERROR”。
-“FATAL”映射到“CRITICAL”。
message.@timestamp 时间戳 时间戳是从原始日志的“message”字段中的“@timestamp”字段解析出来的,采用“yyyy-MM-ddTHH:mm:ss.SSS”格式。
message.action security_result.action_details 值取自原始日志的“message”字段中的“action”字段。
message.event.action security_result.summary 值取自原始日志的“message”字段中的“event.action”字段。
message.event.type metadata.product_event_type 值取自原始日志的“message”字段中的“event.type”字段。
message.host.ip target.ip 值取自原始日志的“message”字段中的“host.ip”字段。
message.host.name target.hostname 值取自原始日志的“message”字段中的“host.name”字段。
message.indices target.labels.value 值取自原始日志的“message”字段中的“indices”字段。
message.mrId target.hostname 值取自原始日志的“message”字段中的“mrId”字段。
message.node.id principal.asset.product_object_id 值取自原始日志的“message”字段中的“node.id”字段。
message.node.name target.asset.hostname 值取自原始日志的“message”字段中的“node.name”字段。
message.origin.address principal.ip 系统会从原始日志的“message”字段中的“origin.address”字段提取 IP 地址,方法是移除端口号。
message.origin.type principal.resource.resource_subtype 值取自原始日志的“message”字段中的“origin.type”字段。
message.properties.host_group principal.hostname 值取自原始日志的“message”字段中的“properties.host_group”字段。
message.properties.host_group target.group.group_display_name 值取自原始日志的“message”字段中的“properties.host_group”字段。
message.request.id target.resource.product_object_id 值取自原始日志的“message”字段中的“request.id”字段。
message.request.name target.resource.name 值取自原始日志的“message”字段中的“request.name”字段。
message.user.name principal.user.userid 值取自原始日志的“message”字段中的“user.name”字段。
message.user.realm principal.user.attribute.permissions.name 值取自原始日志的“message”字段中的“user.realm”字段。
message.user.roles about.user.attribute.roles.name 值取自原始日志的“message”字段中的“user.roles”字段。
metadata.event_type 硬编码值:“USER_RESOURCE_ACCESS”
metadata.log_type 硬编码值:“ELASTIC_SEARCH”
metadata.product_name 硬编码值:“ELASTICSEARCH”
metadata.vendor_name 硬编码值:“ELASTIC”
principal.port 端口号是从原始日志的“message”字段中的“origin.address”字段提取的。
target.labels.key 硬编码值:“Indice”

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。