收集 Akamai Cloud Monitor 日志
本文档介绍了如何使用 AWS S3 将 Akamai Cloud Monitor(负载平衡器、流量整形器、ADC)日志注入到 Google Security Operations。Akamai 将 JSON 事件推送到您的 HTTPS 端点;API Gateway + Lambda 接收器将事件写入 S3 (JSONL, gz)。解析器会将 JSON 日志转换为 UDM。它从 JSON 载荷中提取字段,执行数据类型转换,重命名字段以匹配 UDM 架构,并处理自定义字段和网址构建的特定逻辑。它还包含基于字段存在情况的错误处理和条件逻辑。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Akamai Control Center 和 Property Manager 的特权访问权限
- 对 AWS(S3、IAM、Lambda、API Gateway)的特权访问权限
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
akamai-cloud-monitor
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传(Lambda)配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON,然后粘贴以下政策。
JSON 政策(将
akamai-cloud-monitor
替换为您的 S3 存储桶名称):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutAkamaiObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::akamai-cloud-monitor/*" } ] }
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加 JSON 政策。
将角色命名为
WriteAkamaiCMToS3Role
,然后点击创建角色。
创建 Lambda 函数
设置 | 值 |
---|---|
名称 | akamai_cloud_monitor_to_s3 |
运行时 | Python 3.13 |
架构 | x86_64 |
执行角色 | WriteAkamaiCMToS3Role |
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
akamai_cloud_monitor_to_s3.py
):#!/usr/bin/env python3 # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3 import os, json, gzip, io, uuid, base64, datetime as dt import boto3 S3_BUCKET = os.environ["S3_BUCKET_NAME"] S3_PREFIX = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret in URL query (?token=...) s3 = boto3.client("s3") def _write_jsonl_gz(objs: list) -> str: key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "n").encode()) buf.seek(0) s3.upload_fileobj( buf, S3_BUCKET, f"{S3_PREFIX}{key}", ExtraArgs={ "ContentType": "application/json", "ContentEncoding": "gzip", }, ) return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}" def _parse_records_from_event(event) -> list: # HTTP API (Lambda proxy) event: body is a JSON string body = event.get("body") or "" if event.get("isBase64Encoded"): body = base64.b64decode(body).decode("utf-8", "replace") try: data = json.loads(body) except Exception: # accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] def lambda_handler(event, context=None): # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: qs = event.get("queryStringParameters") or {} token = qs.get("token") if token != INGEST_TOKEN: return {"statusCode": 403, "body": "forbidden"} records = _parse_records_from_event(event) if not records: return {"statusCode": 204, "body": "no content"} key = _write_jsonl_gz(records) return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}), }
依次前往配置 > 环境变量 > 修改。
点击添加新的环境变量,然后设置以下值:
环境变量
键 示例 S3_BUCKET_NAME
akamai-cloud-monitor
S3_PREFIX
akamai/cloud-monitor/json/
INGEST_TOKEN
random-shared-secret
依次前往配置 > 常规配置。
点击修改,并将超时设置为 5 分钟(300 秒)。
点击保存。
创建 Amazon API 网关(适用于 Akamai 的 HTTPS 端点)
- 在 AWS 控制台中,依次前往 API Gateway > 创建 API。
- 依次选择 HTTP API > 构建。
- 提供以下配置详细信息:
- 集成:选择 Lambda,然后选择
akamai_cloud_monitor_to_s3
。 - 路由:添加 ANY
/{proxy+}
或创建特定路由(例如 POST/akamai/cloud-monitor
)。 - 阶段:创建或使用 $default。
- 集成:选择 Lambda,然后选择
- 部署 API 并复制调用网址(例如
https://abc123.execute-api.<region>.amazonaws.com
)。
配置 Akamai Cloud Monitor 以推送日志
- 在 Akamai Control Center 中,在 Property Manager 中打开您的媒体资源。
- 点击添加规则 > 选择“云管理”。
- 添加 Cloud Monitor Instrumentation,然后选择所需的数据集。
- 添加 Cloud Monitor 数据传送。
- 交付主机名:输入您的 API 网关调用网址(例如
abc123.execute-api.<region>.amazonaws.com
)。 - 投放网址路径:您的路线加上可选的查询令牌,例如:
/akamai/cloud-monitor?token=<INGEST_TOKEN>
。
- 交付主机名:输入您的 API 网关调用网址(例如
- 保存并启用相应媒体资源版本。
在 Google SecOps 中配置 Feed 以注入 Akamai Cloud Monitor (S3 JSON)
- 依次前往 SIEM 设置> Feed。
- 点击添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Akamai Cloud Monitor — S3
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Akamai Cloud Monitor 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://akamai-cloud-monitor/akamai/cloud-monitor/json/
- 源删除选项:是否在转移后删除文件和/或目录。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:一个包含 20 个字符的字母数字账号访问密钥(例如,AKIAIOSFODNN7EXAMPLE)。
- Secret 访问密钥:一个包含 40 个字符的字母数字账号 Secret 访问密钥(例如,wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY)。
- 资源命名空间:
akamai.cloud_monitor
- 注入标签:标签会添加到相应 Feed 中的所有事件(例如
source=akamai_cloud_monitor
、format=json
)。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
accLang |
network.http.user_agent |
如果不是“-”或空字符串,则直接映射。 |
city |
principal.location.city |
如果不是“-”或空字符串,则直接映射。 |
cliIP |
principal.ip |
如果不是空字符串,则直接映射。 |
country |
principal.location.country_or_region |
如果不是“-”或空字符串,则直接映射。 |
cp |
additional.fields |
映射为键值对,键为“cp”。 |
customField |
about.ip 、about.labels 、src.ip |
解析为键值对。针对“eIp”和“pIp”的特殊处理,分别映射到 src.ip 和 about.ip 。其他键则映射为 about 中的标签。 |
errorCode |
security_result.summary ,security_result.severity |
如果存在,则将 security_result.severity 设置为“ERROR”,并将该值映射到 security_result.summary 。 |
geo.city |
principal.location.city |
如果 city 为“-”或空字符串,则直接映射。 |
geo.country |
principal.location.country_or_region |
如果 country 为“-”或空字符串,则直接映射。 |
geo.lat |
principal.location.region_latitude |
直接映射,转换为浮点数。 |
geo.long |
principal.location.region_longitude |
直接映射,转换为浮点数。 |
geo.region |
principal.location.state |
直接映射。 |
id |
metadata.product_log_id |
如果不是空字符串,则直接映射。 |
message.cliIP |
principal.ip |
如果 cliIP 为空字符串,则直接映射。 |
message.fwdHost |
principal.hostname |
直接映射。 |
message.reqHost |
target.hostname ,target.url |
用于构建 target.url 和提取 target.hostname 。 |
message.reqLen |
network.sent_bytes |
直接映射,如果 totalBytes 为空或“-”,则转换为无符号整数。 |
message.reqMethod |
network.http.method |
如果 reqMethod 为空字符串,则直接映射。 |
message.reqPath |
target.url |
附加到 target.url 。 |
message.reqPort |
target.port |
直接映射,如果 reqPort 为空字符串,则转换为整数。 |
message.respLen |
network.received_bytes |
直接映射,转换为无符号整数。 |
message.sslVer |
network.tls.version |
直接映射。 |
message.status |
network.http.response_code |
直接映射,如果 statusCode 为空或“-”,则转换为整数。 |
message.UA |
network.http.user_agent |
如果 UA 为“-”或空字符串,则直接映射。 |
network.asnum |
additional.fields |
映射为键值对,键为“asnum”。 |
network.edgeIP |
intermediary.ip |
直接映射。 |
network.network |
additional.fields |
映射为键值对,键为“network”。 |
network.networkType |
additional.fields |
映射为键值对,键为“networkType”。 |
proto |
network.application_protocol |
用于确定 network.application_protocol 。 |
queryStr |
target.url |
如果不是“-”或空字符串,则附加到 target.url 。 |
referer |
network.http.referral_url ,about.hostname |
如果不是“-”,则直接映射。提取的主机名会映射到 about.hostname 。 |
reqHost |
target.hostname ,target.url |
用于构建 target.url 和提取 target.hostname 。 |
reqId |
metadata.product_log_id ,network.session_id |
如果 id 为空字符串,则直接映射。还映射到 network.session_id 。 |
reqMethod |
network.http.method |
如果不是空字符串,则直接映射。 |
reqPath |
target.url |
如果不是“-”,则附加到 target.url 。 |
reqPort |
target.port |
直接映射,转换为整数。 |
reqTimeSec |
metadata.event_timestamp ,timestamp |
用于设置事件时间戳。 |
start |
metadata.event_timestamp ,timestamp |
如果 reqTimeSec 为空字符串,则用于设置事件时间戳。 |
statusCode |
network.http.response_code |
直接映射,如果不是“-”或空字符串,则转换为整数。 |
tlsVersion |
network.tls.version |
直接映射。 |
totalBytes |
network.sent_bytes |
直接映射,如果非空或“-”,则转换为无符号整数。 |
type |
metadata.product_event_type |
直接映射。 |
UA |
network.http.user_agent |
如果不是“-”或空字符串,则直接映射。 |
version |
metadata.product_version |
直接映射。 |
xForwardedFor |
principal.ip |
如果不是“-”或空字符串,则直接映射。 |
(解析器逻辑) | metadata.vendor_name |
设置为“Akamai”。 |
(解析器逻辑) | metadata.product_name |
设置为“DataStream”。 |
(解析器逻辑) | metadata.event_type |
设置为“NETWORK_HTTP”。 |
(解析器逻辑) | metadata.product_version |
如果 version 为空字符串,则设置为“2”。 |
(解析器逻辑) | metadata.log_type |
设置为“AKAMAI_CLOUD_MONITOR”。 |
(解析器逻辑) | network.application_protocol |
根据 proto 或 message.proto 确定。如果任一字符串包含“HTTPS”(不区分大小写),则设置为“HTTPS”,否则设置为“HTTP”。 |
(解析器逻辑) | security_result.severity |
如果 errorCode 为“-”或空字符串,则设置为“INFORMATIONAL”。 |
(解析器逻辑) | target.url |
由 protocol 、reqHost (或 message.reqHost )、reqPath (或 message.reqPath )和 queryStr 构成。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。