收集 Snyk 群组级审核日志和问题日志

支持的语言:

本指南介绍了如何使用 Amazon S3 将 Snyk 群组级审核和问题日志注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • Snyk 群组的特权访问权限(具有读取权限的 API 令牌;群组 ID)
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

获取 Snyk Group ID 和 API 令牌

  1. Snyk 界面中,依次前往账号设置 > API 令牌,然后生成 API 令牌
  2. 复制令牌并将其保存在安全的位置,以便稍后用作 SNYK_TOKEN
  3. 切换到您的群组,然后打开群组设置
  4. 复制并保存网址 (https://app.snyk.io/group/<GROUP_ID>/...) 中的群组 ID,以供日后用作 GROUP_ID
  5. 基本 API 端点:https://api.snyk.io(如果需要,可使用 API_BASE 进行替换)。
  6. 为拥有令牌的用户分配群组管理员角色。(用户必须能够查看群组审核日志群组问题)。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 snyk-group-logs)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策(包括对存储桶 Lambda 使用的状态文件的写入权限和读取权限):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "PutAllSnykGroupObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject", "s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 snyk-group-logs
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteSnykGroupToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:
设置
名称 snyk_group_audit_issues_to_s3
运行时 Python 3.13
架构 x86_64
执行角色 WriteSnykGroupToS3Role
  1. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (snyk_group_audit_issues_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.parse import urlparse, parse_qs
    from urllib.error import HTTPError
    import boto3
    
    API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/")
    SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip()
    GROUP_ID = os.environ["GROUP_ID"].strip()
    
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip()
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip()
    
    # Page sizes & limits
    AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100"))       # audit uses 'size' (max 100)
    ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200"))   # issues uses 'limit'
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    
    # API versions (Snyk REST requires a 'version' param)
    AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip()
    ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip()
    
    # First-run lookback for audit to avoid huge backfills
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    HDRS = {
        "Authorization": f"token {SNYK_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    s3 = boto3.client("s3")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read() or b"{}")
        except Exception:
            return {}
    
    def _put_state(state: dict):
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _write_page(kind: str, payload: dict) -> str:
        ts = time.gmtime()
        key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _next_href(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        if isinstance(nxt, str):
            return nxt
        if isinstance(nxt, dict):
            return nxt.get("href")
        return None
    
    # -------- Audit Logs --------
    
    def pull_audit_logs(state: dict) -> dict:
        cursor = state.get("audit_cursor")
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE}
    
        if cursor:
            params["cursor"] = cursor
        else:
            now = time.time()
            params["from"] = _iso(now - LOOKBACK_SECONDS)
            params["to"] = _iso(now)
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("audit", payload)
    
            data_items = (payload.get("data") or {}).get("items") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("cursor") or [None])[0]
            if not cur:
                break
    
            params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur}
            state["audit_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")}
    
    # -------- Issues --------
    
    def pull_issues(state: dict) -> dict:
        cursor = state.get("issues_cursor")  # stores 'starting_after'
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues"
        params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT}
        if cursor:
            params["starting_after"] = cursor
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("issues", payload)
    
            data_items = payload.get("data") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("starting_after") or [None])[0]
            if not cur:
                break
    
            params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur}
            state["issues_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        audit_res = pull_audit_logs(state)
        issues_res = pull_issues(state)
        _put_state(state)
        return {"ok": True, "audit": audit_res, "issues": issues_res}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  3. 输入以下环境变量,并替换为您的值:

    示例
    S3_BUCKET snyk-group-logs
    S3_PREFIX snyk/group/
    STATE_KEY snyk/group/state.json
    SNYK_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxx
    GROUP_ID <group_uuid>
    API_BASE https://api.snyk.io
    SNYK_AUDIT_API_VERSION 2021-06-04
    SNYK_ISSUES_API_VERSION 2024-10-15
    AUDIT_PAGE_SIZE 100
    ISSUES_PAGE_LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  4. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > <your-function>)。

  5. 选择配置标签页。

  6. 常规配置面板中,点击修改

  7. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 snyk_group_audit_issues_to_s3
    • 名称snyk-group-audit-issues-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. AWS 控制台中,依次前往 IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户secops-reader
    • 访问类型访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::snyk-group-logs"
        }
      ]
    }
    
  7. 将名称设置为 secops-reader-policy

  8. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  10. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Snyk 组级审核日志和问题日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Snyk Group Audit/Issues)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Snyk 群组级审核/问题日志作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://snyk-group-logs/snyk/group/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资源命名空间snyk.group
    • 提取标签:可根据需要添加。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。