收集 Snyk 群组级审核日志
本文档介绍了如何使用 Amazon S3 将 Snyk 群组级审核日志注入到 Google Security Operations。解析器首先会清理原始日志中不必要的字段。然后,它会提取相关信息(例如用户详细信息、事件类型和时间戳),并将这些信息转换和映射到 Google SecOps UDM 架构,以便以标准化方式表示安全日志。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Snyk 的特权访问权限(群组管理员)以及对群组具有访问权限的 API 令牌
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
收集 Snyk 组级审核日志的前提条件(ID、API 密钥、组织 ID、令牌)
- 在 Snyk 中,依次点击您的头像 > 账号设置 > API 令牌。
- 点击撤消并重新生成(或生成),然后复制令牌。
- 将此令牌保存为
SNYK_API_TOKEN
环境变量。
- 在 Snyk 中,切换到您的群组(左上角的切换器)。
- 前往群组设置。从网址
https://app.snyk.io/group/<GROUP_ID>/settings
中复制<GROUP_ID>
。 - 或者,使用 REST API:
GET https://api.snyk.io/rest/groups?version=2021-06-04
并选择id
。
- 前往群组设置。从网址
- 确保令牌用户拥有查看审核日志 (group.audit.read) 权限。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
snyk-audit
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
WriteSnykAuditToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
- 提供以下配置详细信息:
设置 | 值 |
---|---|
名称 | snyk_group_audit_to_s3 |
运行时 | Python 3.13 |
架构 | x86_64 |
执行角色 | WriteSnykAuditToS3Role |
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
添加环境变量
- 依次前往配置 > 环境变量。
- 依次点击修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值:
键 示例 S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(可选)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(可选) group.create,org.user.add
EXCLUDE_EVENTS
(可选) api.access
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度程序 > 创建计划。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数。
- 名称:
snyk-group-audit-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:
secops-reader
。 - 访问类型:访问密钥 - 以程序化方式访问。
- 用户:
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Snyk 组级审核日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Snyk Group Audit Logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Snyk 群组级审核日志作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://snyk-audit/snyk/audit/
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资源命名空间:
snyk.group_audit
- 提取标签:可根据需要添加。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
content.url | principal.url | 直接从原始日志中的 content.url 字段映射。 |
已创建 | metadata.event_timestamp | 使用 ISO8601 格式从原始日志的 created 字段解析而来。 |
事件 | metadata.product_event_type | 直接从原始日志中的 event 字段映射。 |
groupId | principal.user.group_identifiers | 直接从原始日志中的 groupId 字段映射。 |
orgId | principal.user.attribute.labels.key | 设置为“orgId”。 |
orgId | principal.user.attribute.labels.value | 直接从原始日志中的 orgId 字段映射。 |
userId | principal.user.userid | 直接从原始日志中的 userId 字段映射。 |
不适用 | metadata.event_type | 在解析器代码中硬编码为“USER_UNCATEGORIZED”。 |
不适用 | metadata.log_type | 在解析器代码中硬编码为“SNYK_SDLC”。 |
不适用 | metadata.product_name | 在解析器代码中硬编码为“SNYK SDLC”。 |
不适用 | metadata.vendor_name | 在解析器代码中硬编码为“SNYK_SDLC”。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。