收集 AWS Elastic MapReduce 日志
支持的平台:
Google SecOps
SIEM
本文档介绍了如何将 AWS Elastic MapReduce (EMR) 日志提取到 Google Security Operations。AWS EMR 是一个云原生大数据平台,可快速处理大量数据。将 EMR 日志集成到 Google SecOps 后,您可以分析集群活动并检测潜在的安全威胁。
准备工作
- 确保您拥有 Google SecOps 实例。
- 确保您拥有对 AWS 的特权访问权限。
配置 Amazon S3 存储分区
- 按照以下用户指南创建 Amazon S3 存储分区:创建存储分区
- 保存存储分区的名称和区域,以备日后使用。
- 按照此用户指南中的说明创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 点击访问密钥部分中的创建访问密钥。
- 选择第三方服务作为用例。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索并选择 AmazonS3FullAccess 和 CloudWatchLogsFullAccess 政策。
- 点击下一步。
- 点击添加权限。
配置 AWS EMR 以转发日志
- 登录 AWS Management Console。
- 在搜索栏中输入 EMR,然后从服务列表中选择 Amazon EMR。
- 点击集群。
- 找到并选择要为其启用日志记录的 EMR 集群。
- 点击集群详情页面上的修改。
- 在修改集群界面中,前往日志记录部分。
- 选择启用日志记录。
- 指定将存储日志的 S3 存储分区。
- 使用
s3://your-bucket-name/
格式指定 S3 URI(这将在存储分区的根目录中存储所有 EMR 日志)。 - 选择以下日志类型:
Step logs
Application logs
YARN logs
System logs
HDFS Logs
(如果您使用的是 Hadoop)
- 点击保存。
在 Google SecOps 中配置 Feed 以提取 AWS EMR 日志
- 依次前往 SIEM 设置 > Feed。
- 点击新增。
- 在Feed 名称字段中,输入 Feed 的名称(例如 AWS EMR 日志)。
- 选择 Amazon S3 作为来源类型。
- 选择 AWS EMR 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
- 区域:Amazon S3 存储分区所在的区域。
- S3 URI:存储分区 URI。
s3://your-log-bucket-name/
- 将
your-log-bucket-name
替换为存储分区的实际名称。
- 将
- URI 是:选择目录或包含子目录的目录。
来源删除选项:根据您的偏好选择删除选项。
访问密钥 ID:有权访问 S3 存储分区的用户访问密钥。
私有访问密钥:有权访问 S3 存储分区的用户私有密钥。
资源命名空间:资源命名空间。
提取标签:要应用于此 Feed 中的事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
app_id |
additional.fields[].key |
通过解析器分配值“APP” |
app_id |
additional.fields[].value.string_value |
直接从原始日志中的 APP 字段映射。 |
app_name |
additional.fields[].key |
通过解析器分配值“APPNAME” |
app_name |
additional.fields[].value.string_value |
直接从原始日志中的 APPNAME 字段映射。 |
blockid |
additional.fields[].key |
值“blockid”是通过解析器分配的 |
blockid |
additional.fields[].value.string_value |
直接从原始日志中的 blockid 字段映射。 |
bytes |
network.received_bytes |
直接从原始日志中的 bytes 字段映射,并转换为无符号整数。 |
cliID |
additional.fields[].key |
通过解析器分配值“cliID” |
cliID |
additional.fields[].value.string_value |
直接从原始日志中的 cliID 字段映射。 |
cmd |
target.process.command_line |
直接从原始日志中的 cmd 字段映射。 |
comp_name |
additional.fields[].key |
通过解析器分配值“COMP” |
comp_name |
additional.fields[].value.string_value |
直接从原始日志中的 COMP 字段映射。 |
configuration_version |
additional.fields[].key |
值“configuration_version”是通过解析器分配的 |
configuration_version |
additional.fields[].value.string_value |
直接从原始日志中的 configuration_version 字段映射,并转换为字符串。 |
containerID |
additional.fields[].key |
值“containerID”是通过解析器分配的 |
containerID |
additional.fields[].value.string_value |
直接从原始日志中的 CONTAINERID 字段映射。 |
description |
security_result.description |
直接从原始日志中的 description 字段映射。 |
dfs.FSNamesystem.* |
additional.fields[].key |
键是通过将“dfs.FSNamesystem.”与 JSON 数据中的键串联而生成的。 |
dfs.FSNamesystem.* |
additional.fields[].value.string_value |
值直接从 dfs.FSNamesystem JSON 对象中的相应值映射,并转换为字符串。 |
duration |
additional.fields[].key |
值“duration”是通过解析器分配的 |
duration |
additional.fields[].value.string_value |
直接从原始日志中的 duration 字段映射。 |
duration |
network.session_duration.seconds |
直接从原始日志中的 duration 字段映射,并转换为整数。 |
environment |
additional.fields[].key |
通过解析器分配值“environment” |
environment |
additional.fields[].value.string_value |
直接从原始日志中的 environment 字段映射而来。使用 grok 和字符串操作从 ip_port 字段中提取。使用 grok 和字符串操作从 ip_port 字段中提取,并转换为整数。 |
event_type |
metadata.event_type |
由解析器逻辑根据 principal 和 target 信息的存在情况确定。可以是 NETWORK_CONNECTION 、USER_RESOURCE_ACCESS 、STATUS_UPDATE 或 GENERIC_EVENT 。 |
file_path |
target.file.full_path |
直接从原始日志中的 file_path 字段映射。 |
host |
principal.hostname |
直接从原始日志中的 host 字段映射。 |
host |
target.hostname |
直接从原始日志中的 host 字段映射。 |
host_ip |
principal.ip |
直接从原始日志中的 host_ip 字段映射。 |
host_port |
principal.port |
直接从原始日志中的 host_port 字段映射,并转换为整数。 |
http_url |
target.url |
直接从原始日志中的 http_url 字段映射。 |
index |
additional.fields[].key |
值“index”是通过解析器分配的 |
index |
additional.fields[].value.string_value |
直接从原始日志中的 index 字段映射。 |
kind |
metadata.product_event_type |
直接从原始日志中的 kind 字段映射而来。值“AWS_EMR”通过解析器分配 值“AWS EMR”通过解析器分配 值“AMAZON”通过解析器分配 |
offset |
additional.fields[].key |
值“offset”是通过解析器分配的 |
offset |
additional.fields[].value.string_value |
直接从原始日志中的 offset 字段映射。 |
op |
metadata.product_event_type |
直接从原始日志中的 op 或 OPERATION 字段映射而来。 |
proto |
network.application_protocol |
使用 grok 从 http_url 字段中提取,并转换为大写。 |
puppet_version |
additional.fields[].key |
值“puppet_version”是通过解析器分配的 |
puppet_version |
additional.fields[].value.string_value |
直接从原始日志中的 puppet_version 字段映射。 |
queue_name |
additional.fields[].key |
值“queue_name”是通过解析器分配的 |
queue_name |
additional.fields[].value.string_value |
直接从原始日志中的 queue_name 字段映射。 |
report_format |
additional.fields[].key |
值“report_format”是通过解析器分配的 |
report_format |
additional.fields[].value.string_value |
直接从原始日志中的 report_format 字段映射,并转换为字符串。 |
resource |
additional.fields[].key |
通过解析器分配值“resource” |
resource |
additional.fields[].value.string_value |
直接从原始日志中的 resource 字段映射。 |
result |
security_result.action_details |
直接从原始日志中的 RESULT 字段映射。 |
security_id |
additional.fields[].key |
值“security_id”是通过解析器分配的 |
security_id |
additional.fields[].value.string_value |
直接从原始日志中的 security_id 字段映射。 |
severity |
security_result.severity |
从原始日志中的 severity 字段映射而来。INFO 映射到 INFORMATIONAL ,WARN 映射到 MEDIUM 。 |
srvID |
additional.fields[].key |
值“srvID”是通过解析器分配的 |
srvID |
additional.fields[].value.string_value |
直接从原始日志中的 srvID 字段映射。 |
status |
additional.fields[].key |
值“status”是通过解析器分配的 |
status |
additional.fields[].value.string_value |
直接从原始日志中的 status 字段映射。 |
summary |
security_result.summary |
直接从原始日志中的 summary 字段映射。 |
target_app |
target.application |
直接从原始日志中的 TARGET 字段映射。 |
target_ip |
target.ip |
直接从原始日志中的 target_ip 或 IP 字段映射而来。 |
target_port |
target.port |
直接从原始日志中的 target_port 字段映射,并转换为整数。 |
timestamp |
metadata.event_timestamp |
直接从原始日志中的 timestamp 字段映射而来,会解析为 ISO8601 时间戳。 |
timestamp |
event.timestamp |
直接从原始日志中的 timestamp 字段映射而来,会解析为 ISO8601 时间戳。 |
trade_date |
additional.fields[].key |
通过解析器分配值“trade_date” |
trade_date |
additional.fields[].value.string_value |
直接从原始日志中的 trade_date 字段映射。 |
transaction_uuid |
additional.fields[].key |
值“transaction_uuid”是通过解析器分配的 |
transaction_uuid |
additional.fields[].value.string_value |
直接从原始日志中的 transaction_uuid 字段映射。 |
type |
additional.fields[].key |
值“type”是通过解析器分配的 |
type |
additional.fields[].value.string_value |
直接从原始日志中的 type 字段映射。 |
user |
target.user.userid |
直接从原始日志中的 USER 或 ugi 字段映射而来。 |
变化
2023-12-19
- bug 修复:修复了 Grok 模式的不稳定结果。
2023-10-30
- 新创建的解析器。
需要更多帮助?向社区成员和 Google SecOps 专业人士寻求解答。