收集 Symantec WSS 日志
本文档介绍了如何使用 Amazon S3 将 Symantec Web Security Service (WSS) 日志注入到 Google Security Operations。解析器首先尝试将日志消息解析为 JSON。如果失败,它会使用一系列越来越具体的 grok 模式从原始文本中提取字段,最终将提取的数据映射到统一数据模型 (UDM)。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例。
- 对 Symantec Web Security Service 的特权访问权限。
- 对 AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。
收集 Symantec WSS 前提条件(ID、API 密钥、组织 ID、令牌)
- 以管理员身份登录 Symantec Web Security Service 门户网站。
- 前往账号 > API 凭据。
- 点击 Add(添加)。
- 提供以下配置详细信息:
- API Name:输入描述性名称(例如
Google SecOps Integration
)。 - 说明:输入 API 凭据的说明。
- API Name:输入描述性名称(例如
- 点击保存,然后安全地复制生成的 API 凭据。
- 记录您的 WSS 门户网址和 Sync API 端点。
- 复制以下详细信息并将其保存在安全的位置:
- WSS_API_USERNAME。
- WSS_API_PASSWORD。
- WSS_SYNC_URL。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
symantec-wss-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
- 复制并粘贴以下政策。
政策 JSON(如果您输入了其他存储桶名称,请替换
symantec-wss-logs
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::symantec-wss-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::symantec-wss-logs/symantec/wss/state.json" } ] }
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
SymantecWssToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 symantec_wss_to_s3
运行时 Python 3.13 架构 x86_64 执行角色 SymantecWssToS3Role
创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (
symantec_wss_to_s3.py
)。#!/usr/bin/env python3 # Lambda: Pull Symantec WSS logs and store raw payloads to S3 # - Time window via millisecond timestamps for WSS Sync API. # - Preserves vendor-native format (CSV/JSON/ZIP). # - Retries with exponential backoff; unique S3 keys to avoid overwrites. import os, json, time, uuid from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "symantec/wss/") STATE_KEY = os.environ.get("STATE_KEY", "symantec/wss/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) # default 1h HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60")) WSS_SYNC_URL = os.environ.get("WSS_SYNC_URL", "https://portal.threatpulse.com/reportpod/logs/sync") API_USERNAME = os.environ["WSS_API_USERNAME"] API_PASSWORD = os.environ["WSS_API_PASSWORD"] TOKEN_PARAM = os.environ.get("WSS_TOKEN_PARAM", "none") MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) USER_AGENT = os.environ.get("USER_AGENT", "symantec-wss-to-s3/1.0") s3 = boto3.client("s3") def _load_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def _save_state(st): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _ms_timestamp(ts: float) -> int: """Convert Unix timestamp to milliseconds for WSS API""" return int(ts * 1000) def _fetch_wss_logs(start_ms: int, end_ms: int) -> tuple[bytes, str, str]: # WSS Sync API parameters params = f"startDate={start_ms}&endDate={end_ms}&token={TOKEN_PARAM}" url = f"{WSS_SYNC_URL}?{params}" attempt = 0 while True: req = Request(url, method="GET") req.add_header("User-Agent", USER_AGENT) req.add_header("X-APIUsername", API_USERNAME) req.add_header("X-APIPassword", API_PASSWORD) try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: blob = r.read() content_type = r.headers.get("Content-Type", "application/octet-stream") content_encoding = r.headers.get("Content-Encoding", "") return blob, content_type, content_encoding except (HTTPError, URLError) as e: attempt += 1 print(f"HTTP error on attempt {attempt}: {e}") if attempt > MAX_RETRIES: raise # exponential backoff with jitter time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def _determine_extension(content_type: str, content_encoding: str) -> str: """Determine file extension based on content type and encoding""" if "zip" in content_type.lower(): return ".zip" if "gzip" in content_type.lower() or content_encoding.lower() == "gzip": return ".gz" if "json" in content_type.lower(): return ".json" if "csv" in content_type.lower(): return ".csv" return ".bin" def _put_wss_data(blob: bytes, content_type: str, content_encoding: str, from_ts: float, to_ts: float) -> str: # Create unique S3 key for WSS data ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}" ext = _determine_extension(content_type, content_encoding) key = f"{S3_PREFIX}{ts_path}/symantec_wss_{int(from_ts)}_{int(to_ts)}_{uniq}{ext}" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=blob, ContentType=content_type, Metadata={ 'source': 'symantec-wss', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)), 'content_encoding': content_encoding } ) return key def lambda_handler(event=None, context=None): st = _load_state() now = time.time() from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC)) to_ts = now # Convert to milliseconds for WSS API start_ms = _ms_timestamp(from_ts) end_ms = _ms_timestamp(to_ts) print(f"Fetching Symantec WSS logs from {start_ms} to {end_ms}") blob, content_type, content_encoding = _fetch_wss_logs(start_ms, end_ms) print(f"Retrieved {len(blob)} bytes with content-type: {content_type}") if content_encoding: print(f"Content encoding: {content_encoding}") key = _put_wss_data(blob, content_type, content_encoding, from_ts, to_ts) st["last_to_ts"] = to_ts st["last_successful_run"] = now _save_state(st) return { "statusCode": 200, "body": { "success": True, "s3_key": key, "content_type": content_type, "content_encoding": content_encoding, "from_timestamp": from_ts, "to_timestamp": to_ts, "bytes_retrieved": len(blob) } } if __name__ == "__main__": print(lambda_handler())
依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入下表中提供的环境变量,并将示例值替换为您的值。
环境变量
键 示例值 S3_BUCKET
symantec-wss-logs
S3_PREFIX
symantec/wss/
STATE_KEY
symantec/wss/state.json
WINDOW_SECONDS
3600
HTTP_TIMEOUT
60
MAX_RETRIES
3
USER_AGENT
symantec-wss-to-s3/1.0
WSS_SYNC_URL
https://portal.threatpulse.com/reportpod/logs/sync
WSS_API_USERNAME
your-api-username
(来自第 2 步)WSS_API_PASSWORD
your-api-password
(来自第 2 步)WSS_TOKEN_PARAM
none
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数
symantec_wss_to_s3
。 - 名称:
symantec-wss-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
(可选)为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader
。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::symantec-wss-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::symantec-wss-logs" } ] }
名称 =
secops-reader-policy
。依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限。
为
secops-reader
创建访问密钥:安全凭据 > 访问密钥。点击创建访问密钥。
下载 CSV 文件。(您需要将这些值粘贴到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Symantec WSS 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Symantec WSS logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Symantec WSS 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://symantec-wss-logs/symantec/wss/
- 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
category_id | read_only_udm.metadata.product_event_type | 如果 category_id 为 1,则 read_only_udm.metadata.product_event_type 设置为 Security 。如果 category_id 为 5,则 read_only_udm.metadata.product_event_type 设置为 Policy |
collector_device_ip | read_only_udm.principal.ip, read_only_udm.principal.asset.ip | collector_device_ip 字段的值 |
connection.bytes_download | read_only_udm.network.received_bytes | connection.bytes_download 字段的值转换为整数 |
connection.bytes_upload | read_only_udm.network.sent_bytes | connection.bytes_upload 字段的值转换为整数 |
connection.dst_ip | read_only_udm.target.ip | connection.dst_ip 字段的值 |
connection.dst_location.country | read_only_udm.target.location.country_or_region | connection.dst_location.country 字段的值 |
connection.dst_name | read_only_udm.target.hostname | connection.dst_name 字段的值 |
connection.dst_port | read_only_udm.target.port | connection.dst_port 字段的值转换为整数 |
connection.http_status | read_only_udm.network.http.response_code | 将 connection.http_status 字段的值转换为整数 |
connection.http_user_agent | read_only_udm.network.http.user_agent | connection.http_user_agent 字段的值 |
connection.src_ip | read_only_udm.principal.ip、read_only_udm.src.ip | connection.src_ip 字段的值。如果 src_ip 或 collector_device_ip 不为空,则会映射到 read_only_udm.src.ip |
connection.tls.version | read_only_udm.network.tls.version_protocol | connection.tls.version 字段的值 |
connection.url.host | read_only_udm.target.hostname | connection.url.host 字段的值 |
connection.url.method | read_only_udm.network.http.method | connection.url.method 字段的值 |
connection.url.path | read_only_udm.target.url | connection.url.path 字段的值 |
connection.url.text | read_only_udm.target.url | connection.url.text 字段的值 |
cs_connection_negotiated_cipher | read_only_udm.network.tls.cipher | cs_connection_negotiated_cipher 字段的值 |
cs_icap_status | read_only_udm.security_result.description | cs_icap_status 字段的值 |
device_id | read_only_udm.target.resource.id, read_only_udm.target.resource.product_object_id | device_id 字段的值 |
device_ip | read_only_udm.intermediary.ip, read_only_udm.intermediary.asset.ip | device_ip 字段的值 |
device_time | read_only_udm.metadata.collected_timestamp, read_only_udm.metadata.event_timestamp | device_time 字段的值转换为字符串。如果 when 为空,则映射到 read_only_udm.metadata.event_timestamp |
主机名 | read_only_udm.principal.hostname、read_only_udm.principal.asset.hostname | 主机名字段的值 |
log_time | read_only_udm.metadata.event_timestamp | log_time 字段的值已转换为时间戳。如果 when 和 device_time 为空,则映射到 read_only_udm.metadata.event_timestamp |
msg_desc | read_only_udm.metadata.description | msg_desc 字段的值 |
os_details | read_only_udm.target.asset.platform_software.platform, read_only_udm.target.asset.platform_software.platform_version | os_details 字段的值。如果 os_details 不为空,则会对其进行解析以提取 os_name 和 os_ver。如果 os_name 包含 Windows ,则将 read_only_udm.target.asset.platform_software.platform 设置为 WINDOWS 。os_ver 映射到 read_only_udm.target.asset.platform_software.platform_version |
product_data.cs(Referer) | read_only_udm.network.http.referral_url | product_data.cs(Referer) 字段的值 |
product_data.r-supplier-country | read_only_udm.principal.location.country_or_region | product_data.r-supplier-country 字段的值 |
product_data.s-supplier-ip | read_only_udm.intermediary.ip, read_only_udm.intermediary.asset.ip | product_data.s-supplier-ip 字段的值 |
product_data.x-bluecoat-application-name | read_only_udm.target.application | product_data.x-bluecoat-application-name 字段的值 |
product_data.x-bluecoat-transaction-uuid | read_only_udm.metadata.product_log_id | product_data.x-bluecoat-transaction-uuid 字段的值 |
product_data.x-client-agent-sw | read_only_udm.observer.platform_version | product_data.x-client-agent-sw 字段的值 |
product_data.x-client-agent-type | read_only_udm.observer.application | product_data.x-client-agent-type 字段的值 |
product_data.x-client-device-id | read_only_udm.target.resource.type, read_only_udm.target.resource.id, read_only_udm.target.resource.product_object_id | 如果不为空,则将 read_only_udm.target.resource.type 设置为 DEVICE 。product_data.x-client-device-id 字段的值会映射到 read_only_udm.target.resource.id 和 read_only_udm.target.resource.product_object_id |
product_data.x-client-device-name | read_only_udm.src.hostname、read_only_udm.src.asset.hostname | product_data.x-client-device-name 字段的值 |
product_data.x-cs-client-ip-country | read_only_udm.target.location.country_or_region | product_data.x-cs-client-ip-country 字段的值 |
product_data.x-cs-connection-negotiated-cipher | read_only_udm.network.tls.cipher | product_data.x-cs-connection-negotiated-cipher 字段的值 |
product_data.x-cs-connection-negotiated-ssl-version | read_only_udm.network.tls.version_protocol | product_data.x-cs-connection-negotiated-ssl-version 字段的值 |
product_data.x-exception-id | read_only_udm.security_result.summary | product_data.x-exception-id 字段的值 |
product_data.x-rs-certificate-hostname | read_only_udm.network.tls.client.server_name | product_data.x-rs-certificate-hostname 字段的值 |
product_data.x-rs-certificate-hostname-categories | read_only_udm.security_result.category_details | product_data.x-rs-certificate-hostname-categories 字段的值 |
product_data.x-rs-certificate-observed-errors | read_only_udm.network.tls.server.certificate.issuer | product_data.x-rs-certificate-observed-errors 字段的值 |
product_data.x-rs-certificate-validate-status | read_only_udm.network.tls.server.certificate.subject | product_data.x-rs-certificate-validate-status 字段的值 |
product_name | read_only_udm.metadata.product_name | product_name 字段的值 |
product_ver | read_only_udm.metadata.product_version | product_ver 字段的值 |
proxy_connection.src_ip | read_only_udm.intermediary.ip, read_only_udm.intermediary.asset.ip | proxy_connection.src_ip 字段的值 |
received_bytes | read_only_udm.network.received_bytes | received_bytes 字段的值已转换为整数 |
ref_uid | read_only_udm.metadata.product_log_id | ref_uid 字段的值 |
s_action | read_only_udm.metadata.description | s_action 字段的值 |
sent_bytes | read_only_udm.network.sent_bytes | sent_bytes 字段的值已转换为整数 |
severity_id | read_only_udm.security_result.severity | 如果 severity_id 为 1 或 2,则将 read_only_udm.security_result.severity 设置为 LOW 。如果 severity_id 为 3 或 4,则将 read_only_udm.security_result.severity 设置为 MEDIUM 。如果 severity_id 为 5 或 6,则将 read_only_udm.security_result.severity 设置为 HIGH |
supplier_country | read_only_udm.principal.location.country_or_region | supplier_country 字段的值 |
target_ip | read_only_udm.target.ip, read_only_udm.target.asset.ip | target_ip 字段的值 |
user.full_name | read_only_udm.principal.user.user_display_name | user.full_name 字段的值 |
user.name | read_only_udm.principal.user.user_display_name | user.name 字段的值 |
user_name | read_only_udm.principal.user.user_display_name | user_name 字段的值 |
uuid | read_only_udm.metadata.product_log_id | UUID 字段的值 |
when | read_only_udm.metadata.event_timestamp | 字段转换为时间戳时的值 |
read_only_udm.metadata.event_type | 如果主机名为空且 connection.dst_ip 不为空,则设置为 NETWORK_UNCATEGORIZED 。如果主机名不为空,则设置为 SCAN_NETWORK 。如果 has_principal 和 has_target 均为 true ,则设置为 NETWORK_CONNECTION 。如果 has_principal 为 true 且 has_target 为 false ,则设置为 STATUS_UPDATE 。如果 has_principal 和 has_target 为 false ,则设置为 GENERIC_EVENT |
|
read_only_udm.metadata.log_type | 始终设置为 SYMANTEC_WSS |
|
read_only_udm.metadata.vendor_name | 始终设置为 SYMANTEC |
|
read_only_udm.security_result.action | 如果 product_data.sc-filter_result 为 OBSERVED 或 PROXIED ,则设置为 ALLOW 。如果 product_data.sc-filter_result 为 DENIED ,则设置为 BLOCK |
|
read_only_udm.security_result.action_details | product_data.sc-filter_result 字段的值 | |
read_only_udm.target.resource.type | 如果 product_data.x-client-device-id 不为空,则设置为 DEVICE |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。