收集 AWS Elastic MapReduce 日志

支持的平台:

本文档介绍了如何将 AWS Elastic MapReduce (EMR) 日志提取到 Google Security Operations。AWS EMR 是一个云原生大数据平台,可快速处理大量数据。将 EMR 日志集成到 Google SecOps 后,您可以分析集群活动并检测潜在的安全威胁。

准备工作

  • 确保您拥有 Google SecOps 实例。
  • 确保您拥有对 AWS 的特权访问权限。

配置 Amazon S3 存储分区

  1. 按照以下用户指南创建 Amazon S3 存储分区创建存储分区
  2. 保存存储分区的名称区域,以备日后使用。
  3. 按照此用户指南中的说明创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 点击访问密钥部分中的创建访问密钥
  7. 选择第三方服务作为用例
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccessCloudWatchLogsFullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

配置 AWS EMR 以转发日志

  1. 登录 AWS Management Console
  2. 在搜索栏中输入 EMR,然后从服务列表中选择 Amazon EMR
  3. 点击集群
  4. 找到并选择要为其启用日志记录的 EMR 集群
  5. 点击集群详情页面上的修改
  6. 修改集群界面中,前往日志记录部分。
  7. 选择启用日志记录。
  8. 指定将存储日志的 S3 存储分区
  9. 使用 s3://your-bucket-name/ 格式指定 S3 URI(这将在存储分区的根目录中存储所有 EMR 日志)。
  10. 选择以下日志类型:
    • Step logs
    • Application logs
    • YARN logs
    • System logs
    • HDFS Logs(如果您使用的是 Hadoop)
  11. 点击保存

在 Google SecOps 中配置 Feed 以提取 AWS EMR 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击新增
  3. Feed 名称字段中,输入 Feed 的名称(例如 AWS EMR 日志)。
  4. 选择 Amazon S3 作为来源类型
  5. 选择 AWS EMR 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:

    • 区域:Amazon S3 存储分区所在的区域。
    • S3 URI:存储分区 URI。
      • s3://your-log-bucket-name/
        • your-log-bucket-name 替换为存储分区的实际名称。
    • URI 是:选择目录包含子目录的目录
    • 来源删除选项:根据您的偏好选择删除选项。

    • 访问密钥 ID:有权访问 S3 存储分区的用户访问密钥。

    • 私有访问密钥:有权访问 S3 存储分区的用户私有密钥。

    • 资源命名空间资源命名空间

    • 提取标签:要应用于此 Feed 中的事件的标签。

  8. 点击下一步

  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
app_id additional.fields[].key 通过解析器分配值“APP”
app_id additional.fields[].value.string_value 直接从原始日志中的 APP 字段映射。
app_name additional.fields[].key 通过解析器分配值“APPNAME”
app_name additional.fields[].value.string_value 直接从原始日志中的 APPNAME 字段映射。
blockid additional.fields[].key 值“blockid”是通过解析器分配的
blockid additional.fields[].value.string_value 直接从原始日志中的 blockid 字段映射。
bytes network.received_bytes 直接从原始日志中的 bytes 字段映射,并转换为无符号整数。
cliID additional.fields[].key 通过解析器分配值“cliID”
cliID additional.fields[].value.string_value 直接从原始日志中的 cliID 字段映射。
cmd target.process.command_line 直接从原始日志中的 cmd 字段映射。
comp_name additional.fields[].key 通过解析器分配值“COMP”
comp_name additional.fields[].value.string_value 直接从原始日志中的 COMP 字段映射。
configuration_version additional.fields[].key 值“configuration_version”是通过解析器分配的
configuration_version additional.fields[].value.string_value 直接从原始日志中的 configuration_version 字段映射,并转换为字符串。
containerID additional.fields[].key 值“containerID”是通过解析器分配的
containerID additional.fields[].value.string_value 直接从原始日志中的 CONTAINERID 字段映射。
description security_result.description 直接从原始日志中的 description 字段映射。
dfs.FSNamesystem.* additional.fields[].key 键是通过将“dfs.FSNamesystem.”与 JSON 数据中的键串联而生成的。
dfs.FSNamesystem.* additional.fields[].value.string_value 值直接从 dfs.FSNamesystem JSON 对象中的相应值映射,并转换为字符串。
duration additional.fields[].key 值“duration”是通过解析器分配的
duration additional.fields[].value.string_value 直接从原始日志中的 duration 字段映射。
duration network.session_duration.seconds 直接从原始日志中的 duration 字段映射,并转换为整数。
environment additional.fields[].key 通过解析器分配值“environment”
environment additional.fields[].value.string_value 直接从原始日志中的 environment 字段映射而来。使用 grok 和字符串操作从 ip_port 字段中提取。使用 grok 和字符串操作从 ip_port 字段中提取,并转换为整数。
event_type metadata.event_type 由解析器逻辑根据 principaltarget 信息的存在情况确定。可以是 NETWORK_CONNECTIONUSER_RESOURCE_ACCESSSTATUS_UPDATEGENERIC_EVENT
file_path target.file.full_path 直接从原始日志中的 file_path 字段映射。
host principal.hostname 直接从原始日志中的 host 字段映射。
host target.hostname 直接从原始日志中的 host 字段映射。
host_ip principal.ip 直接从原始日志中的 host_ip 字段映射。
host_port principal.port 直接从原始日志中的 host_port 字段映射,并转换为整数。
http_url target.url 直接从原始日志中的 http_url 字段映射。
index additional.fields[].key 值“index”是通过解析器分配的
index additional.fields[].value.string_value 直接从原始日志中的 index 字段映射。
kind metadata.product_event_type 直接从原始日志中的 kind 字段映射而来。值“AWS_EMR”通过解析器分配 值“AWS EMR”通过解析器分配 值“AMAZON”通过解析器分配
offset additional.fields[].key 值“offset”是通过解析器分配的
offset additional.fields[].value.string_value 直接从原始日志中的 offset 字段映射。
op metadata.product_event_type 直接从原始日志中的 opOPERATION 字段映射而来。
proto network.application_protocol 使用 grok 从 http_url 字段中提取,并转换为大写。
puppet_version additional.fields[].key 值“puppet_version”是通过解析器分配的
puppet_version additional.fields[].value.string_value 直接从原始日志中的 puppet_version 字段映射。
queue_name additional.fields[].key 值“queue_name”是通过解析器分配的
queue_name additional.fields[].value.string_value 直接从原始日志中的 queue_name 字段映射。
report_format additional.fields[].key 值“report_format”是通过解析器分配的
report_format additional.fields[].value.string_value 直接从原始日志中的 report_format 字段映射,并转换为字符串。
resource additional.fields[].key 通过解析器分配值“resource”
resource additional.fields[].value.string_value 直接从原始日志中的 resource 字段映射。
result security_result.action_details 直接从原始日志中的 RESULT 字段映射。
security_id additional.fields[].key 值“security_id”是通过解析器分配的
security_id additional.fields[].value.string_value 直接从原始日志中的 security_id 字段映射。
severity security_result.severity 从原始日志中的 severity 字段映射而来。INFO 映射到 INFORMATIONALWARN 映射到 MEDIUM
srvID additional.fields[].key 值“srvID”是通过解析器分配的
srvID additional.fields[].value.string_value 直接从原始日志中的 srvID 字段映射。
status additional.fields[].key 值“status”是通过解析器分配的
status additional.fields[].value.string_value 直接从原始日志中的 status 字段映射。
summary security_result.summary 直接从原始日志中的 summary 字段映射。
target_app target.application 直接从原始日志中的 TARGET 字段映射。
target_ip target.ip 直接从原始日志中的 target_ipIP 字段映射而来。
target_port target.port 直接从原始日志中的 target_port 字段映射,并转换为整数。
timestamp metadata.event_timestamp 直接从原始日志中的 timestamp 字段映射而来,会解析为 ISO8601 时间戳。
timestamp event.timestamp 直接从原始日志中的 timestamp 字段映射而来,会解析为 ISO8601 时间戳。
trade_date additional.fields[].key 通过解析器分配值“trade_date”
trade_date additional.fields[].value.string_value 直接从原始日志中的 trade_date 字段映射。
transaction_uuid additional.fields[].key 值“transaction_uuid”是通过解析器分配的
transaction_uuid additional.fields[].value.string_value 直接从原始日志中的 transaction_uuid 字段映射。
type additional.fields[].key 值“type”是通过解析器分配的
type additional.fields[].value.string_value 直接从原始日志中的 type 字段映射。
user target.user.userid 直接从原始日志中的 USERugi 字段映射而来。

变化

2023-12-19

  • bug 修复:修复了 Grok 模式的不稳定结果。

2023-10-30

  • 新创建的解析器。

需要更多帮助?向社区成员和 Google SecOps 专业人士寻求解答。