收集 Akamai Cloud Monitor 日志

支持的语言:

本文档介绍了如何使用 AWS S3 将 Akamai Cloud Monitor(负载平衡器、流量整形器、ADC)日志注入到 Google Security Operations。Akamai 将 JSON 事件推送到您的 HTTPS 端点;API Gateway + Lambda 接收器将事件写入 S3 (JSONL, gz)。解析器会将 JSON 日志转换为 UDM。它从 JSON 载荷中提取字段,执行数据类型转换,重命名字段以匹配 UDM 架构,并处理自定义字段和网址构建的特定逻辑。它还包含基于字段存在情况的错误处理和条件逻辑。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 对 Akamai Control Center 和 Property Manager 的特权访问权限
  • 对 AWS(S3、IAM、Lambda、API Gateway)的特权访问权限

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 akamai-cloud-monitor)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传(Lambda)配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON,然后粘贴以下政策。
  2. JSON 政策(将 akamai-cloud-monitor 替换为您的 S3 存储桶名称):

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutAkamaiObjects",
        "Effect": "Allow",
        "Action": ["s3:PutObject"],
        "Resource": "arn:aws:s3:::akamai-cloud-monitor/*"
        }
    ]
    }
    
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加 JSON 政策。

  6. 将角色命名为 WriteAkamaiCMToS3Role,然后点击创建角色

创建 Lambda 函数

设置
名称 akamai_cloud_monitor_to_s3
运行时 Python 3.13
架构 x86_64
执行角色 WriteAkamaiCMToS3Role
  1. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (akamai_cloud_monitor_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3
    
    import os, json, gzip, io, uuid, base64, datetime as dt
    import boto3
    
    S3_BUCKET  = os.environ["S3_BUCKET_NAME"]
    S3_PREFIX  = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret in URL query (?token=...)
    
    s3 = boto3.client("s3")
    
    def _write_jsonl_gz(objs: list) -> str:
        key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(
            buf,
            S3_BUCKET,
            f"{S3_PREFIX}{key}",
            ExtraArgs={
                "ContentType": "application/json",
                "ContentEncoding": "gzip",
            },
        )
        return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}"
    
    def _parse_records_from_event(event) -> list:
        # HTTP API (Lambda proxy) event: body is a JSON string
        body = event.get("body") or ""
        if event.get("isBase64Encoded"):
            body = base64.b64decode(body).decode("utf-8", "replace")
        try:
            data = json.loads(body)
        except Exception:
            # accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    def lambda_handler(event, context=None):
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            qs = event.get("queryStringParameters") or {}
            token = qs.get("token")
            if token != INGEST_TOKEN:
                return {"statusCode": 403, "body": "forbidden"}
    
        records = _parse_records_from_event(event)
        if not records:
            return {"statusCode": 204, "body": "no content"}
    
        key = _write_jsonl_gz(records)
        return {
            "statusCode": 200,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}),
        }
    
  2. 依次前往配置 > 环境变量 > 修改

  3. 点击添加新的环境变量,然后设置以下值:

    环境变量

    示例
    S3_BUCKET_NAME akamai-cloud-monitor
    S3_PREFIX akamai/cloud-monitor/json/
    INGEST_TOKEN random-shared-secret
  4. 依次前往配置 > 常规配置

  5. 点击修改,并将超时设置为 5 分钟(300 秒)

  6. 点击保存

创建 Amazon API 网关(适用于 Akamai 的 HTTPS 端点)

  1. AWS 控制台中,依次前往 API Gateway > 创建 API
  2. 依次选择 HTTP API > 构建
  3. 提供以下配置详细信息:
    • 集成:选择 Lambda,然后选择 akamai_cloud_monitor_to_s3
    • 路由:添加 ANY /{proxy+} 或创建特定路由(例如 POST /akamai/cloud-monitor)。
    • 阶段:创建或使用 $default
  4. 部署 API 并复制调用网址(例如 https://abc123.execute-api.<region>.amazonaws.com)。

配置 Akamai Cloud Monitor 以推送日志

  1. Akamai Control Center 中,在 Property Manager 中打开您的媒体资源
  2. 点击添加规则 > 选择“云管理”
  3. 添加 Cloud Monitor Instrumentation,然后选择所需的数据集
  4. 添加 Cloud Monitor 数据传送
    • 交付主机名:输入您的 API 网关调用网址(例如 abc123.execute-api.<region>.amazonaws.com)。
    • 投放网址路径:您的路线加上可选的查询令牌,例如:/akamai/cloud-monitor?token=<INGEST_TOKEN>
  5. 保存启用相应媒体资源版本。

在 Google SecOps 中配置 Feed 以注入 Akamai Cloud Monitor (S3 JSON)

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Akamai Cloud Monitor — S3)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Akamai Cloud Monitor 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://akamai-cloud-monitor/akamai/cloud-monitor/json/
    • 源删除选项:是否在转移后删除文件和/或目录。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:一个包含 20 个字符的字母数字账号访问密钥(例如,AKIAIOSFODNN7EXAMPLE)。
    • Secret 访问密钥:一个包含 40 个字符的字母数字账号 Secret 访问密钥(例如,wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY)。
    • 资源命名空间akamai.cloud_monitor
    • 注入标签:标签会添加到相应 Feed 中的所有事件(例如 source=akamai_cloud_monitorformat=json)。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
accLang network.http.user_agent 如果不是“-”或空字符串,则直接映射。
city principal.location.city 如果不是“-”或空字符串,则直接映射。
cliIP principal.ip 如果不是空字符串,则直接映射。
country principal.location.country_or_region 如果不是“-”或空字符串,则直接映射。
cp additional.fields 映射为键值对,键为“cp”。
customField about.ipabout.labelssrc.ip 解析为键值对。针对“eIp”和“pIp”的特殊处理,分别映射到 src.ipabout.ip。其他键则映射为 about 中的标签。
errorCode security_result.summarysecurity_result.severity 如果存在,则将 security_result.severity 设置为“ERROR”,并将该值映射到 security_result.summary
geo.city principal.location.city 如果 city 为“-”或空字符串,则直接映射。
geo.country principal.location.country_or_region 如果 country 为“-”或空字符串,则直接映射。
geo.lat principal.location.region_latitude 直接映射,转换为浮点数。
geo.long principal.location.region_longitude 直接映射,转换为浮点数。
geo.region principal.location.state 直接映射。
id metadata.product_log_id 如果不是空字符串,则直接映射。
message.cliIP principal.ip 如果 cliIP 为空字符串,则直接映射。
message.fwdHost principal.hostname 直接映射。
message.reqHost target.hostnametarget.url 用于构建 target.url 和提取 target.hostname
message.reqLen network.sent_bytes 直接映射,如果 totalBytes 为空或“-”,则转换为无符号整数。
message.reqMethod network.http.method 如果 reqMethod 为空字符串,则直接映射。
message.reqPath target.url 附加到 target.url
message.reqPort target.port 直接映射,如果 reqPort 为空字符串,则转换为整数。
message.respLen network.received_bytes 直接映射,转换为无符号整数。
message.sslVer network.tls.version 直接映射。
message.status network.http.response_code 直接映射,如果 statusCode 为空或“-”,则转换为整数。
message.UA network.http.user_agent 如果 UA 为“-”或空字符串,则直接映射。
network.asnum additional.fields 映射为键值对,键为“asnum”。
network.edgeIP intermediary.ip 直接映射。
network.network additional.fields 映射为键值对,键为“network”。
network.networkType additional.fields 映射为键值对,键为“networkType”。
proto network.application_protocol 用于确定 network.application_protocol
queryStr target.url 如果不是“-”或空字符串,则附加到 target.url
referer network.http.referral_urlabout.hostname 如果不是“-”,则直接映射。提取的主机名会映射到 about.hostname
reqHost target.hostnametarget.url 用于构建 target.url 和提取 target.hostname
reqId metadata.product_log_idnetwork.session_id 如果 id 为空字符串,则直接映射。还映射到 network.session_id
reqMethod network.http.method 如果不是空字符串,则直接映射。
reqPath target.url 如果不是“-”,则附加到 target.url
reqPort target.port 直接映射,转换为整数。
reqTimeSec metadata.event_timestamptimestamp 用于设置事件时间戳。
start metadata.event_timestamptimestamp 如果 reqTimeSec 为空字符串,则用于设置事件时间戳。
statusCode network.http.response_code 直接映射,如果不是“-”或空字符串,则转换为整数。
tlsVersion network.tls.version 直接映射。
totalBytes network.sent_bytes 直接映射,如果非空或“-”,则转换为无符号整数。
type metadata.product_event_type 直接映射。
UA network.http.user_agent 如果不是“-”或空字符串,则直接映射。
version metadata.product_version 直接映射。
xForwardedFor principal.ip 如果不是“-”或空字符串,则直接映射。
(解析器逻辑) metadata.vendor_name 设置为“Akamai”。
(解析器逻辑) metadata.product_name 设置为“DataStream”。
(解析器逻辑) metadata.event_type 设置为“NETWORK_HTTP”。
(解析器逻辑) metadata.product_version 如果 version 为空字符串,则设置为“2”。
(解析器逻辑) metadata.log_type 设置为“AKAMAI_CLOUD_MONITOR”。
(解析器逻辑) network.application_protocol 根据 protomessage.proto 确定。如果任一字符串包含“HTTPS”(不区分大小写),则设置为“HTTPS”,否则设置为“HTTP”。
(解析器逻辑) security_result.severity 如果 errorCode 为“-”或空字符串,则设置为“INFORMATIONAL”。
(解析器逻辑) target.url protocolreqHost(或 message.reqHost)、reqPath(或 message.reqPath)和 queryStr 构成。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。