收集 Akamai Cloud Monitor 記錄
本文說明如何使用 AWS S3,將 Akamai Cloud Monitor (Load Balancer、Traffic Shaper、ADC) 記錄檔擷取至 Google Security Operations。Akamai 會將 JSON 事件推送至 HTTPS 端點;API Gateway + Lambda 接收器會將事件寫入 S3 (JSONL、gz)。剖析器會將 JSON 記錄轉換為 UDM。這項功能會從 JSON 酬載中擷取欄位、執行資料類型轉換、重新命名欄位以符合 UDM 結構定義,並處理自訂欄位和網址建構的特定邏輯。此外,這項功能還會根據欄位是否存在,加入錯誤處理機制和條件式邏輯。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- Akamai Control Center 和 Property Manager 的特殊存取權
- AWS 的特殊權限存取權 (S3、IAM、Lambda、API Gateway)
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 值區:建立值區
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
akamai-cloud-monitor
)。 - 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的 IAM 政策和角色 (Lambda)
- 在 AWS Console 中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」,然後貼上下方政策。
JSON 政策 (請將
akamai-cloud-monitor
替換為 S3 bucket 名稱):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutAkamaiObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::akamai-cloud-monitor/*" } ] }
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加 JSON 政策。
為角色命名
WriteAkamaiCMToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
設定 | 值 |
---|---|
名稱 | akamai_cloud_monitor_to_s3 |
執行階段 | Python 3.13 |
架構 | x86_64 |
執行角色 | WriteAkamaiCMToS3Role |
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
akamai_cloud_monitor_to_s3.py
):#!/usr/bin/env python3 # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3 import os, json, gzip, io, uuid, base64, datetime as dt import boto3 S3_BUCKET = os.environ["S3_BUCKET_NAME"] S3_PREFIX = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret in URL query (?token=...) s3 = boto3.client("s3") def _write_jsonl_gz(objs: list) -> str: key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "n").encode()) buf.seek(0) s3.upload_fileobj( buf, S3_BUCKET, f"{S3_PREFIX}{key}", ExtraArgs={ "ContentType": "application/json", "ContentEncoding": "gzip", }, ) return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}" def _parse_records_from_event(event) -> list: # HTTP API (Lambda proxy) event: body is a JSON string body = event.get("body") or "" if event.get("isBase64Encoded"): body = base64.b64decode(body).decode("utf-8", "replace") try: data = json.loads(body) except Exception: # accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] def lambda_handler(event, context=None): # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: qs = event.get("queryStringParameters") or {} token = qs.get("token") if token != INGEST_TOKEN: return {"statusCode": 403, "body": "forbidden"} records = _parse_records_from_event(event) if not records: return {"statusCode": 204, "body": "no content"} key = _write_jsonl_gz(records) return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}), }
依序前往「Configuration」>「Environment variables」>「Edit」。
按一下「新增環境變數」,然後設定下列值:
環境變數
鍵 範例 S3_BUCKET_NAME
akamai-cloud-monitor
S3_PREFIX
akamai/cloud-monitor/json/
INGEST_TOKEN
random-shared-secret
依序前往「設定」>「一般設定」。
按一下「編輯」,然後將「逾時」設為「5 分鐘 (300 秒)」。
按一下 [儲存]。
建立 Amazon API Gateway (Akamai 的 HTTPS 端點)
- 在 AWS 控制台中,依序前往「API Gateway」>「Create API」。
- 依序選取「HTTP API」>「Build」。
- 提供下列設定詳細資料:
- 「Integrations」(整合):選擇「Lambda」,然後選取
akamai_cloud_monitor_to_s3
。 - 路徑:新增 ANY
/{proxy+}
或建立特定路徑 (例如 POST/akamai/cloud-monitor
)。 - 階段:建立或使用 $default。
- 「Integrations」(整合):選擇「Lambda」,然後選取
- 部署 API 並複製「叫用網址」 (例如
https://abc123.execute-api.<region>.amazonaws.com
)。
設定 Akamai Cloud Monitor,推送記錄檔
- 在 Akamai Control Center 中,於 Property Manager 開啟「Property」。
- 依序按一下「新增規則」> 選擇「雲端管理」。
- 新增 Cloud Monitor Instrumentation,然後選取所需的資料集。
- 新增 Cloud Monitor Data Delivery。
- 傳送主機名稱:輸入 API Gateway 叫用網址 (例如
abc123.execute-api.<region>.amazonaws.com
)。 - 放送網址路徑:您的路徑加上選用的查詢權杖,例如:
/akamai/cloud-monitor?token=<INGEST_TOKEN>
。
- 傳送主機名稱:輸入 API Gateway 叫用網址 (例如
- 儲存並啟用屬性版本。
在 Google SecOps 中設定資訊提供,以便擷取 Akamai Cloud Monitor (S3 JSON)
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Akamai Cloud Monitor — S3
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Akamai Cloud Monitor」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://akamai-cloud-monitor/akamai/cloud-monitor/json/
- 來源刪除選項:是否要在轉移後刪除檔案和/或目錄。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:20 個半形英數字元的帳戶存取金鑰 (例如 AKIAIOSFODNN7EXAMPLE)。
- 存取密鑰:40 個字元的英數字元帳戶存取密鑰 (例如 wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY)。
- 資產命名空間:
akamai.cloud_monitor
- 擷取標籤:系統會為這個動態饋給中的所有事件新增標籤 (例如
source=akamai_cloud_monitor
、format=json
)。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
accLang |
network.http.user_agent |
如果不是「-」或空字串,則直接對應。 |
city |
principal.location.city |
如果不是「-」或空字串,則直接對應。 |
cliIP |
principal.ip |
如果不是空字串,則直接對應。 |
country |
principal.location.country_or_region |
如果不是「-」或空字串,則直接對應。 |
cp |
additional.fields |
以鍵/值組合形式對應,鍵為「cp」。 |
customField |
about.ip 、about.labels 、src.ip |
剖析為鍵/值組合。特殊處理「eIp」和「pIp」,分別對應至 src.ip 和 about.ip 。其他鍵會對應為 about 中的標籤。 |
errorCode |
security_result.summary 、security_result.severity |
如有,則將 security_result.severity 設為「ERROR」,並將值對應至 security_result.summary 。 |
geo.city |
principal.location.city |
如果 city 是「-」或空字串,則直接對應。 |
geo.country |
principal.location.country_or_region |
如果 country 是「-」或空字串,則直接對應。 |
geo.lat |
principal.location.region_latitude |
直接對應,並轉換為浮點數。 |
geo.long |
principal.location.region_longitude |
直接對應,並轉換為浮點數。 |
geo.region |
principal.location.state |
直接對應。 |
id |
metadata.product_log_id |
如果不是空字串,則直接對應。 |
message.cliIP |
principal.ip |
如果 cliIP 為空字串,則直接對應。 |
message.fwdHost |
principal.hostname |
直接對應。 |
message.reqHost |
target.hostname 、target.url |
用於建構 target.url 和擷取 target.hostname 。 |
message.reqLen |
network.sent_bytes |
直接對應,如果 totalBytes 為空或「-」,則轉換為不帶正負號的整數。 |
message.reqMethod |
network.http.method |
如果 reqMethod 為空字串,則直接對應。 |
message.reqPath |
target.url |
已附加至「target.url 」。 |
message.reqPort |
target.port |
直接對應,如果 reqPort 是空字串,則會轉換為整數。 |
message.respLen |
network.received_bytes |
直接對應,轉換為不帶正負號的整數。 |
message.sslVer |
network.tls.version |
直接對應。 |
message.status |
network.http.response_code |
直接對應,如果 statusCode 為空或「-」,則轉換為整數。 |
message.UA |
network.http.user_agent |
如果 UA 是「-」或空字串,則直接對應。 |
network.asnum |
additional.fields |
以鍵/值組合形式對應,索引鍵為「asnum」。 |
network.edgeIP |
intermediary.ip |
直接對應。 |
network.network |
additional.fields |
以鍵/值組合形式對應,索引鍵為「network」。 |
network.networkType |
additional.fields |
以鍵/值組合形式對應,鍵為「networkType」。 |
proto |
network.application_protocol |
用於判斷 network.application_protocol 。 |
queryStr |
target.url |
如果不是「-」或空字串,則會附加至 target.url 。 |
referer |
network.http.referral_url 、about.hostname |
如果不是「-」,則直接對應。擷取的主機名稱會對應至 about.hostname 。 |
reqHost |
target.hostname 、target.url |
用於建構 target.url 和擷取 target.hostname 。 |
reqId |
metadata.product_log_id 、network.session_id |
如果 id 為空字串,則直接對應。也對應到「network.session_id 」。 |
reqMethod |
network.http.method |
如果不是空字串,則直接對應。 |
reqPath |
target.url |
如果不是「-」,則會附加至 target.url 。 |
reqPort |
target.port |
直接對應,並轉換為整數。 |
reqTimeSec |
metadata.event_timestamp 、timestamp |
用於設定事件時間戳記。 |
start |
metadata.event_timestamp 、timestamp |
如果 reqTimeSec 為空字串,則用於設定事件時間戳記。 |
statusCode |
network.http.response_code |
直接對應,如果不是「-」或空字串,則轉換為整數。 |
tlsVersion |
network.tls.version |
直接對應。 |
totalBytes |
network.sent_bytes |
直接對應,如果不是空白或「-」,則轉換為不帶正負號的整數。 |
type |
metadata.product_event_type |
直接對應。 |
UA |
network.http.user_agent |
如果不是「-」或空字串,則直接對應。 |
version |
metadata.product_version |
直接對應。 |
xForwardedFor |
principal.ip |
如果不是「-」或空字串,則直接對應。 |
(剖析器邏輯) | metadata.vendor_name |
設為「Akamai」。 |
(剖析器邏輯) | metadata.product_name |
設為「DataStream」。 |
(剖析器邏輯) | metadata.event_type |
設為「NETWORK_HTTP」。 |
(剖析器邏輯) | metadata.product_version |
如果 version 是空白字串,請設為「2」。 |
(剖析器邏輯) | metadata.log_type |
設為「AKAMAI_CLOUD_MONITOR」。 |
(剖析器邏輯) | network.application_protocol |
取決於 proto 或 message.proto 。如果兩者都包含「HTTPS」(不區分大小寫),則設為「HTTPS」,否則設為「HTTP」。 |
(剖析器邏輯) | security_result.severity |
如果 errorCode 為「-」或空字串,請設為「INFORMATIONAL」。 |
(剖析器邏輯) | target.url |
由 protocol 、reqHost (或 message.reqHost )、reqPath (或 message.reqPath ) 和 queryStr 建構而成。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。