收集 Symantec WSS 記錄
本文說明如何使用 Amazon S3,將 Symantec Web Security Service (WSS) 記錄擷取至 Google Security Operations。剖析器會先嘗試將記錄訊息剖析為 JSON。如果失敗,系統會使用一系列越來越具體的 Grok 模式,從原始文字中擷取欄位,最終將擷取的資料對應至整合式資料模型 (UDM)。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體。
- Symantec Web Security Service 的特殊權限。
- AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。
收集 Symantec WSS 必要條件 (ID、API 金鑰、機構 ID、權杖)
- 以管理員身分登入 Symantec Web Security Service 入口網站。
- 依序前往「帳戶」>「API 憑證」。
- 按一下「新增」。
- 提供下列設定詳細資料:
- API 名稱:輸入描述性名稱 (例如
Google SecOps Integration
)。 - 說明:輸入 API 憑證的說明。
- API 名稱:輸入描述性名稱 (例如
- 按一下「儲存」,並安全地複製產生的 API 憑證。
- 記錄 WSS 入口網站網址和同步 API 端點。
- 複製下列詳細資料並存放在安全位置:
- WSS_API_USERNAME。
- WSS_API_PASSWORD。
- WSS_SYNC_URL。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
symantec-wss-logs
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」(政策)。
- 按一下「建立政策」>「JSON」分頁。
- 複製並貼上下列政策。
政策 JSON (如果您輸入的 bucket 名稱不同,請替換
symantec-wss-logs
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::symantec-wss-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::symantec-wss-logs/symantec/wss/state.json" } ] }
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
SymantecWssToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 symantec_wss_to_s3
執行階段 Python 3.13 架構 x86_64 執行角色 SymantecWssToS3Role
建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (
symantec_wss_to_s3.py
)。#!/usr/bin/env python3 # Lambda: Pull Symantec WSS logs and store raw payloads to S3 # - Time window via millisecond timestamps for WSS Sync API. # - Preserves vendor-native format (CSV/JSON/ZIP). # - Retries with exponential backoff; unique S3 keys to avoid overwrites. import os, json, time, uuid from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "symantec/wss/") STATE_KEY = os.environ.get("STATE_KEY", "symantec/wss/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) # default 1h HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60")) WSS_SYNC_URL = os.environ.get("WSS_SYNC_URL", "https://portal.threatpulse.com/reportpod/logs/sync") API_USERNAME = os.environ["WSS_API_USERNAME"] API_PASSWORD = os.environ["WSS_API_PASSWORD"] TOKEN_PARAM = os.environ.get("WSS_TOKEN_PARAM", "none") MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) USER_AGENT = os.environ.get("USER_AGENT", "symantec-wss-to-s3/1.0") s3 = boto3.client("s3") def _load_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def _save_state(st): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _ms_timestamp(ts: float) -> int: """Convert Unix timestamp to milliseconds for WSS API""" return int(ts * 1000) def _fetch_wss_logs(start_ms: int, end_ms: int) -> tuple[bytes, str, str]: # WSS Sync API parameters params = f"startDate={start_ms}&endDate={end_ms}&token={TOKEN_PARAM}" url = f"{WSS_SYNC_URL}?{params}" attempt = 0 while True: req = Request(url, method="GET") req.add_header("User-Agent", USER_AGENT) req.add_header("X-APIUsername", API_USERNAME) req.add_header("X-APIPassword", API_PASSWORD) try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: blob = r.read() content_type = r.headers.get("Content-Type", "application/octet-stream") content_encoding = r.headers.get("Content-Encoding", "") return blob, content_type, content_encoding except (HTTPError, URLError) as e: attempt += 1 print(f"HTTP error on attempt {attempt}: {e}") if attempt > MAX_RETRIES: raise # exponential backoff with jitter time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def _determine_extension(content_type: str, content_encoding: str) -> str: """Determine file extension based on content type and encoding""" if "zip" in content_type.lower(): return ".zip" if "gzip" in content_type.lower() or content_encoding.lower() == "gzip": return ".gz" if "json" in content_type.lower(): return ".json" if "csv" in content_type.lower(): return ".csv" return ".bin" def _put_wss_data(blob: bytes, content_type: str, content_encoding: str, from_ts: float, to_ts: float) -> str: # Create unique S3 key for WSS data ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}" ext = _determine_extension(content_type, content_encoding) key = f"{S3_PREFIX}{ts_path}/symantec_wss_{int(from_ts)}_{int(to_ts)}_{uniq}{ext}" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=blob, ContentType=content_type, Metadata={ 'source': 'symantec-wss', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)), 'content_encoding': content_encoding } ) return key def lambda_handler(event=None, context=None): st = _load_state() now = time.time() from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC)) to_ts = now # Convert to milliseconds for WSS API start_ms = _ms_timestamp(from_ts) end_ms = _ms_timestamp(to_ts) print(f"Fetching Symantec WSS logs from {start_ms} to {end_ms}") blob, content_type, content_encoding = _fetch_wss_logs(start_ms, end_ms) print(f"Retrieved {len(blob)} bytes with content-type: {content_type}") if content_encoding: print(f"Content encoding: {content_encoding}") key = _put_wss_data(blob, content_type, content_encoding, from_ts, to_ts) st["last_to_ts"] = to_ts st["last_successful_run"] = now _save_state(st) return { "statusCode": 200, "body": { "success": True, "s3_key": key, "content_type": content_type, "content_encoding": content_encoding, "from_timestamp": from_ts, "to_timestamp": to_ts, "bytes_retrieved": len(blob) } } if __name__ == "__main__": print(lambda_handler())
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下表提供的環境變數,並將範例值換成您的值。
環境變數
鍵 範例值 S3_BUCKET
symantec-wss-logs
S3_PREFIX
symantec/wss/
STATE_KEY
symantec/wss/state.json
WINDOW_SECONDS
3600
HTTP_TIMEOUT
60
MAX_RETRIES
3
USER_AGENT
symantec-wss-to-s3/1.0
WSS_SYNC_URL
https://portal.threatpulse.com/reportpod/logs/sync
WSS_API_USERNAME
your-api-username
(步驟 2)WSS_API_PASSWORD
your-api-password
(步驟 2)WSS_TOKEN_PARAM
none
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
symantec_wss_to_s3
。 - 名稱:
symantec-wss-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::symantec-wss-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::symantec-wss-logs" } ] }
Name =
secops-reader-policy
。依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
為
secops-reader
建立存取金鑰:依序點選「安全憑證」>「存取金鑰」。按一下「建立存取金鑰」。
下載 CSV 檔案。(您會將這些值貼到動態饋給中)。
在 Google SecOps 中設定動態饋給,擷取 Symantec WSS 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Symantec WSS logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Symantec WSS」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://symantec-wss-logs/symantec/wss/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
category_id | read_only_udm.metadata.product_event_type | 如果 category_id 為 1,則 read_only_udm.metadata.product_event_type 會設為 Security 。如果 category_id 為 5,則 read_only_udm.metadata.product_event_type 會設為 Policy |
collector_device_ip | read_only_udm.principal.ip、read_only_udm.principal.asset.ip | collector_device_ip 欄位的值 |
connection.bytes_download | read_only_udm.network.received_bytes | connection.bytes_download 欄位的值已轉換為整數 |
connection.bytes_upload | read_only_udm.network.sent_bytes | connection.bytes_upload 欄位的值已轉換為整數 |
connection.dst_ip | read_only_udm.target.ip | connection.dst_ip 欄位的值 |
connection.dst_location.country | read_only_udm.target.location.country_or_region | connection.dst_location.country 欄位的值 |
connection.dst_name | read_only_udm.target.hostname | connection.dst_name 欄位的值 |
connection.dst_port | read_only_udm.target.port | connection.dst_port 欄位的值已轉換為整數 |
connection.http_status | read_only_udm.network.http.response_code | connection.http_status 欄位的值已轉換為整數 |
connection.http_user_agent | read_only_udm.network.http.user_agent | connection.http_user_agent 欄位的值 |
connection.src_ip | read_only_udm.principal.ip、read_only_udm.src.ip | connection.src_ip 欄位的值。如果 src_ip 或 collector_device_ip 不為空白,則會對應至 read_only_udm.src.ip |
connection.tls.version | read_only_udm.network.tls.version_protocol | connection.tls.version 欄位的值 |
connection.url.host | read_only_udm.target.hostname | connection.url.host 欄位的值 |
connection.url.method | read_only_udm.network.http.method | connection.url.method 欄位的值 |
connection.url.path | read_only_udm.target.url | connection.url.path 欄位的值 |
connection.url.text | read_only_udm.target.url | connection.url.text 欄位的值 |
cs_connection_negotiated_cipher | read_only_udm.network.tls.cipher | cs_connection_negotiated_cipher 欄位的值 |
cs_icap_status | read_only_udm.security_result.description | cs_icap_status 欄位的值 |
device_id | read_only_udm.target.resource.id、read_only_udm.target.resource.product_object_id | device_id 欄位的值 |
device_ip | read_only_udm.intermediary.ip、read_only_udm.intermediary.asset.ip | device_ip 欄位的值 |
device_time | read_only_udm.metadata.collected_timestamp、read_only_udm.metadata.event_timestamp | device_time 欄位的值會轉換為字串。如果 when 為空,則會對應至 read_only_udm.metadata.event_timestamp |
主機名稱 | read_only_udm.principal.hostname、read_only_udm.principal.asset.hostname | 主機名稱欄位的值 |
log_time | read_only_udm.metadata.event_timestamp | log_time 欄位的值已轉換為時間戳記。如果 when 和 device_time 為空,則會對應至 read_only_udm.metadata.event_timestamp |
msg_desc | read_only_udm.metadata.description | msg_desc 欄位的值 |
os_details | read_only_udm.target.asset.platform_software.platform、read_only_udm.target.asset.platform_software.platform_version | os_details 欄位的值。如果 os_details 不為空白,系統會剖析該值,以擷取 os_name 和 os_ver。如果 os_name 包含 Windows ,read_only_udm.target.asset.platform_software.platform 會設為 WINDOWS 。os_ver 會對應至 read_only_udm.target.asset.platform_software.platform_version |
product_data.cs(Referer) | read_only_udm.network.http.referral_url | product_data.cs(Referer) 欄位的值 |
product_data.r-supplier-country | read_only_udm.principal.location.country_or_region | product_data.r-supplier-country 欄位的值 |
product_data.s-supplier-ip | read_only_udm.intermediary.ip、read_only_udm.intermediary.asset.ip | product_data.s-supplier-ip 欄位的值 |
product_data.x-bluecoat-application-name | read_only_udm.target.application | product_data.x-bluecoat-application-name 欄位的值 |
product_data.x-bluecoat-transaction-uuid | read_only_udm.metadata.product_log_id | product_data.x-bluecoat-transaction-uuid 欄位的值 |
product_data.x-client-agent-sw | read_only_udm.observer.platform_version | product_data.x-client-agent-sw 欄位的值 |
product_data.x-client-agent-type | read_only_udm.observer.application | product_data.x-client-agent-type 欄位的值 |
product_data.x-client-device-id | read_only_udm.target.resource.type、read_only_udm.target.resource.id、read_only_udm.target.resource.product_object_id | 如果不是空白,read_only_udm.target.resource.type 會設為 DEVICE 。product_data.x-client-device-id 欄位的值會對應至 read_only_udm.target.resource.id 和 read_only_udm.target.resource.product_object_id |
product_data.x-client-device-name | read_only_udm.src.hostname、read_only_udm.src.asset.hostname | product_data.x-client-device-name 欄位的值 |
product_data.x-cs-client-ip-country | read_only_udm.target.location.country_or_region | product_data.x-cs-client-ip-country 欄位的值 |
product_data.x-cs-connection-negotiated-cipher | read_only_udm.network.tls.cipher | product_data.x-cs-connection-negotiated-cipher 欄位的值 |
product_data.x-cs-connection-negotiated-ssl-version | read_only_udm.network.tls.version_protocol | product_data.x-cs-connection-negotiated-ssl-version 欄位的值 |
product_data.x-exception-id | read_only_udm.security_result.summary | product_data.x-exception-id 欄位的值 |
product_data.x-rs-certificate-hostname | read_only_udm.network.tls.client.server_name | product_data.x-rs-certificate-hostname 欄位的值 |
product_data.x-rs-certificate-hostname-categories | read_only_udm.security_result.category_details | product_data.x-rs-certificate-hostname-categories 欄位的值 |
product_data.x-rs-certificate-observed-errors | read_only_udm.network.tls.server.certificate.issuer | product_data.x-rs-certificate-observed-errors 欄位的值 |
product_data.x-rs-certificate-validate-status | read_only_udm.network.tls.server.certificate.subject | product_data.x-rs-certificate-validate-status 欄位的值 |
product_name | read_only_udm.metadata.product_name | product_name 欄位的值 |
product_ver | read_only_udm.metadata.product_version | product_ver 欄位的值 |
proxy_connection.src_ip | read_only_udm.intermediary.ip、read_only_udm.intermediary.asset.ip | proxy_connection.src_ip 欄位的值 |
received_bytes | read_only_udm.network.received_bytes | received_bytes 欄位的值已轉換為整數 |
ref_uid | read_only_udm.metadata.product_log_id | ref_uid 欄位的值 |
s_action | read_only_udm.metadata.description | s_action 欄位的值 |
sent_bytes | read_only_udm.network.sent_bytes | sent_bytes 欄位的值已轉換為整數 |
severity_id | read_only_udm.security_result.severity | 如果 severity_id 為 1 或 2,read_only_udm.security_result.severity 會設為 LOW 。如果 severity_id 為 3 或 4,則 read_only_udm.security_result.severity 會設為 MEDIUM 。如果 severity_id 為 5 或 6,read_only_udm.security_result.severity 會設為 HIGH |
supplier_country | read_only_udm.principal.location.country_or_region | supplier_country 欄位的值 |
target_ip | read_only_udm.target.ip、read_only_udm.target.asset.ip | target_ip 欄位的值 |
user.full_name | read_only_udm.principal.user.user_display_name | user.full_name 欄位的值 |
user.name | read_only_udm.principal.user.user_display_name | user.name 欄位的值 |
user_name | read_only_udm.principal.user.user_display_name | user_name 欄位的值 |
uuid | read_only_udm.metadata.product_log_id | uuid 欄位的值 |
時間 | read_only_udm.metadata.event_timestamp | 欄位轉換為時間戳記時的值 |
read_only_udm.metadata.event_type | 如果主機名稱為空白,且 connection.dst_ip 不為空白,則設為 NETWORK_UNCATEGORIZED 。如果主機名稱不為空白,請設為 SCAN_NETWORK 。如果 has_principal 和 has_target 都是 true ,請設為 NETWORK_CONNECTION 。如果 has_principal 為 true 且 has_target 為 false ,請設為 STATUS_UPDATE 。如果 has_principal 和 has_target 都是 false ,請設為 GENERIC_EVENT |
|
read_only_udm.metadata.log_type | 一律設為「SYMANTEC_WSS 」 |
|
read_only_udm.metadata.vendor_name | 一律設為「SYMANTEC 」 |
|
read_only_udm.security_result.action | 如果 product_data.sc-filter_result 為 OBSERVED 或 PROXIED ,請設為 ALLOW 。如果 product_data.sc-filter_result 為 DENIED ,請設為 BLOCK |
|
read_only_udm.security_result.action_details | product_data.sc-filter_result 欄位的值 | |
read_only_udm.target.resource.type | 如果 product_data.x-client-device-id 不為空白,請設為 DEVICE |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。