收集 Akamai Cloud Monitor 記錄

支援的國家/地區:

本文說明如何使用 AWS S3,將 Akamai Cloud Monitor (Load Balancer、Traffic Shaper、ADC) 記錄檔擷取至 Google Security Operations。Akamai 會將 JSON 事件推送至 HTTPS 端點;API Gateway + Lambda 接收器會將事件寫入 S3 (JSONL、gz)。剖析器會將 JSON 記錄轉換為 UDM。這項功能會從 JSON 酬載中擷取欄位、執行資料類型轉換、重新命名欄位以符合 UDM 結構定義,並處理自訂欄位和網址建構的特定邏輯。此外,這項功能還會根據欄位是否存在,加入錯誤處理機制和條件式邏輯。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • Akamai Control Center 和 Property Manager 的特殊存取權
  • AWS 的特殊權限存取權 (S3、IAM、Lambda、API Gateway)

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 akamai-cloud-monitor)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的 IAM 政策和角色 (Lambda)

  1. AWS Console 中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」,然後貼上下方政策。
  2. JSON 政策 (請將 akamai-cloud-monitor 替換為 S3 bucket 名稱):

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutAkamaiObjects",
        "Effect": "Allow",
        "Action": ["s3:PutObject"],
        "Resource": "arn:aws:s3:::akamai-cloud-monitor/*"
        }
    ]
    }
    
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  5. 附加 JSON 政策。

  6. 為角色命名 WriteAkamaiCMToS3Role,然後按一下「建立角色」

建立 Lambda 函式

設定
名稱 akamai_cloud_monitor_to_s3
執行階段 Python 3.13
架構 x86_64
執行角色 WriteAkamaiCMToS3Role
  1. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (akamai_cloud_monitor_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3
    
    import os, json, gzip, io, uuid, base64, datetime as dt
    import boto3
    
    S3_BUCKET  = os.environ["S3_BUCKET_NAME"]
    S3_PREFIX  = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret in URL query (?token=...)
    
    s3 = boto3.client("s3")
    
    def _write_jsonl_gz(objs: list) -> str:
        key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(
            buf,
            S3_BUCKET,
            f"{S3_PREFIX}{key}",
            ExtraArgs={
                "ContentType": "application/json",
                "ContentEncoding": "gzip",
            },
        )
        return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}"
    
    def _parse_records_from_event(event) -> list:
        # HTTP API (Lambda proxy) event: body is a JSON string
        body = event.get("body") or ""
        if event.get("isBase64Encoded"):
            body = base64.b64decode(body).decode("utf-8", "replace")
        try:
            data = json.loads(body)
        except Exception:
            # accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    def lambda_handler(event, context=None):
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            qs = event.get("queryStringParameters") or {}
            token = qs.get("token")
            if token != INGEST_TOKEN:
                return {"statusCode": 403, "body": "forbidden"}
    
        records = _parse_records_from_event(event)
        if not records:
            return {"statusCode": 204, "body": "no content"}
    
        key = _write_jsonl_gz(records)
        return {
            "statusCode": 200,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}),
        }
    
  2. 依序前往「Configuration」>「Environment variables」>「Edit」

  3. 按一下「新增環境變數」,然後設定下列值:

    環境變數

    範例
    S3_BUCKET_NAME akamai-cloud-monitor
    S3_PREFIX akamai/cloud-monitor/json/
    INGEST_TOKEN random-shared-secret
  4. 依序前往「設定」>「一般設定」

  5. 按一下「編輯」,然後將「逾時」設為「5 分鐘 (300 秒)」

  6. 按一下 [儲存]

建立 Amazon API Gateway (Akamai 的 HTTPS 端點)

  1. AWS 控制台中,依序前往「API Gateway」>「Create API」
  2. 依序選取「HTTP API」>「Build」
  3. 提供下列設定詳細資料:
    • 「Integrations」(整合):選擇「Lambda」,然後選取 akamai_cloud_monitor_to_s3
    • 路徑:新增 ANY /{proxy+} 或建立特定路徑 (例如 POST /akamai/cloud-monitor)。
    • 階段:建立或使用 $default
  4. 部署 API 並複製「叫用網址」 (例如 https://abc123.execute-api.<region>.amazonaws.com)。

設定 Akamai Cloud Monitor,推送記錄檔

  1. Akamai Control Center 中,於 Property Manager 開啟「Property」
  2. 依序按一下「新增規則」> 選擇「雲端管理」
  3. 新增 Cloud Monitor Instrumentation,然後選取所需的資料集
  4. 新增 Cloud Monitor Data Delivery
    • 傳送主機名稱:輸入 API Gateway 叫用網址 (例如 abc123.execute-api.<region>.amazonaws.com)。
    • 放送網址路徑:您的路徑加上選用的查詢權杖,例如:/akamai/cloud-monitor?token=<INGEST_TOKEN>
  5. 儲存啟用屬性版本。

在 Google SecOps 中設定資訊提供,以便擷取 Akamai Cloud Monitor (S3 JSON)

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Akamai Cloud Monitor — S3)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Akamai Cloud Monitor」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://akamai-cloud-monitor/akamai/cloud-monitor/json/
    • 來源刪除選項:是否要在轉移後刪除檔案和/或目錄。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:20 個半形英數字元的帳戶存取金鑰 (例如 AKIAIOSFODNN7EXAMPLE)。
    • 存取密鑰:40 個字元的英數字元帳戶存取密鑰 (例如 wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY)。
    • 資產命名空間akamai.cloud_monitor
    • 擷取標籤:系統會為這個動態饋給中的所有事件新增標籤 (例如 source=akamai_cloud_monitorformat=json)。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
accLang network.http.user_agent 如果不是「-」或空字串,則直接對應。
city principal.location.city 如果不是「-」或空字串,則直接對應。
cliIP principal.ip 如果不是空字串,則直接對應。
country principal.location.country_or_region 如果不是「-」或空字串,則直接對應。
cp additional.fields 以鍵/值組合形式對應,鍵為「cp」。
customField about.ipabout.labelssrc.ip 剖析為鍵/值組合。特殊處理「eIp」和「pIp」,分別對應至 src.ipabout.ip。其他鍵會對應為 about 中的標籤。
errorCode security_result.summarysecurity_result.severity 如有,則將 security_result.severity 設為「ERROR」,並將值對應至 security_result.summary
geo.city principal.location.city 如果 city 是「-」或空字串,則直接對應。
geo.country principal.location.country_or_region 如果 country 是「-」或空字串,則直接對應。
geo.lat principal.location.region_latitude 直接對應,並轉換為浮點數。
geo.long principal.location.region_longitude 直接對應,並轉換為浮點數。
geo.region principal.location.state 直接對應。
id metadata.product_log_id 如果不是空字串,則直接對應。
message.cliIP principal.ip 如果 cliIP 為空字串,則直接對應。
message.fwdHost principal.hostname 直接對應。
message.reqHost target.hostnametarget.url 用於建構 target.url 和擷取 target.hostname
message.reqLen network.sent_bytes 直接對應,如果 totalBytes 為空或「-」,則轉換為不帶正負號的整數。
message.reqMethod network.http.method 如果 reqMethod 為空字串,則直接對應。
message.reqPath target.url 已附加至「target.url」。
message.reqPort target.port 直接對應,如果 reqPort 是空字串,則會轉換為整數。
message.respLen network.received_bytes 直接對應,轉換為不帶正負號的整數。
message.sslVer network.tls.version 直接對應。
message.status network.http.response_code 直接對應,如果 statusCode 為空或「-」,則轉換為整數。
message.UA network.http.user_agent 如果 UA 是「-」或空字串,則直接對應。
network.asnum additional.fields 以鍵/值組合形式對應,索引鍵為「asnum」。
network.edgeIP intermediary.ip 直接對應。
network.network additional.fields 以鍵/值組合形式對應,索引鍵為「network」。
network.networkType additional.fields 以鍵/值組合形式對應,鍵為「networkType」。
proto network.application_protocol 用於判斷 network.application_protocol
queryStr target.url 如果不是「-」或空字串,則會附加至 target.url
referer network.http.referral_urlabout.hostname 如果不是「-」,則直接對應。擷取的主機名稱會對應至 about.hostname
reqHost target.hostnametarget.url 用於建構 target.url 和擷取 target.hostname
reqId metadata.product_log_idnetwork.session_id 如果 id 為空字串,則直接對應。也對應到「network.session_id」。
reqMethod network.http.method 如果不是空字串,則直接對應。
reqPath target.url 如果不是「-」,則會附加至 target.url
reqPort target.port 直接對應,並轉換為整數。
reqTimeSec metadata.event_timestamptimestamp 用於設定事件時間戳記。
start metadata.event_timestamptimestamp 如果 reqTimeSec 為空字串,則用於設定事件時間戳記。
statusCode network.http.response_code 直接對應,如果不是「-」或空字串,則轉換為整數。
tlsVersion network.tls.version 直接對應。
totalBytes network.sent_bytes 直接對應,如果不是空白或「-」,則轉換為不帶正負號的整數。
type metadata.product_event_type 直接對應。
UA network.http.user_agent 如果不是「-」或空字串,則直接對應。
version metadata.product_version 直接對應。
xForwardedFor principal.ip 如果不是「-」或空字串,則直接對應。
(剖析器邏輯) metadata.vendor_name 設為「Akamai」。
(剖析器邏輯) metadata.product_name 設為「DataStream」。
(剖析器邏輯) metadata.event_type 設為「NETWORK_HTTP」。
(剖析器邏輯) metadata.product_version 如果 version 是空白字串,請設為「2」。
(剖析器邏輯) metadata.log_type 設為「AKAMAI_CLOUD_MONITOR」。
(剖析器邏輯) network.application_protocol 取決於 protomessage.proto。如果兩者都包含「HTTPS」(不區分大小寫),則設為「HTTPS」,否則設為「HTTP」。
(剖析器邏輯) security_result.severity 如果 errorCode 為「-」或空字串,請設為「INFORMATIONAL」。
(剖析器邏輯) target.url protocolreqHost (或 message.reqHost)、reqPath (或 message.reqPath) 和 queryStr 建構而成。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。