Snyk 그룹 수준 감사 및 문제 로그 수집
이 가이드에서는 Amazon S3를 사용하여 Snyk 그룹 수준 감사 및 문제 로그를 Google Security Operations에 수집하는 방법을 설명합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- Snyk 그룹에 대한 권한 있는 액세스 (읽기 액세스 권한이 있는 API 토큰, 그룹 ID)
- AWS (S3, IAM, Lambda, EventBridge)에 대한 권한 액세스
Snyk 그룹 ID 및 API 토큰 가져오기
- Snyk UI에서 Account settings(계정 설정) > API token(API 토큰)으로 이동하여 API token을 생성합니다.
- 나중에
SNYK_TOKEN
로 사용할 수 있도록 토큰을 복사하여 안전한 위치에 저장합니다. - 그룹으로 전환하고 그룹 설정을 엽니다.
- URL (
https://app.snyk.io/group/<GROUP_ID>/...
)에서 그룹 ID를 복사하여 나중에GROUP_ID
로 사용할 수 있도록 저장합니다. - 기본 API 엔드포인트:
https://api.snyk.io
(필요한 경우API_BASE
로 재정의). - 토큰이 있는 사용자에게 그룹 관리자 역할을 할당합니다. (사용자가 그룹 감사 로그 및 그룹 문제를 볼 수 있어야 함)
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다 (예:
snyk-group-logs
). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색하여 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책 > 정책 만들기 > JSON 탭으로 이동합니다.
다음 정책을 입력합니다 (버킷 아래의 모든 객체에 대한 쓰기 액세스 및 Lambda가 사용하는 상태 파일에 대한 읽기 액세스 포함).
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PutAllSnykGroupObjects", "Effect": "Allow", "Action": ["s3:PutObject", "s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" } ] }
- 다른 버킷 이름을 입력한 경우
snyk-group-logs
을 해당 이름으로 바꿉니다.
- 다른 버킷 이름을 입력한 경우
다음 > 정책 만들기를 클릭합니다.
IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.
새로 만든 정책을 연결합니다.
역할 이름을
WriteSnykGroupToS3Role
로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
설정 | 값 |
---|---|
이름 | snyk_group_audit_issues_to_s3 |
런타임 | Python 3.13 |
아키텍처 | x86_64 |
실행 역할 | WriteSnykGroupToS3Role |
함수가 생성되면 코드 탭을 열고 스텁을 삭제하고 다음 코드를 입력합니다 (
snyk_group_audit_issues_to_s3.py
).#!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.parse import urlparse, parse_qs from urllib.error import HTTPError import boto3 API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/") SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip() GROUP_ID = os.environ["GROUP_ID"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip() STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip() # Page sizes & limits AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100")) # audit uses 'size' (max 100) ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200")) # issues uses 'limit' MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) # API versions (Snyk REST requires a 'version' param) AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip() ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip() # First-run lookback for audit to avoid huge backfills LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) HDRS = { "Authorization": f"token {SNYK_TOKEN}", "Accept": "application/vnd.api+json", } s3 = boto3.client("s3") def _get_state() -> dict: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read() or b"{}") except Exception: return {} def _put_state(state: dict): s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _write_page(kind: str, payload: dict) -> str: ts = time.gmtime() key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _next_href(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None if isinstance(nxt, str): return nxt if isinstance(nxt, dict): return nxt.get("href") return None # -------- Audit Logs -------- def pull_audit_logs(state: dict) -> dict: cursor = state.get("audit_cursor") pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE} if cursor: params["cursor"] = cursor else: now = time.time() params["from"] = _iso(now - LOOKBACK_SECONDS) params["to"] = _iso(now) while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("audit", payload) data_items = (payload.get("data") or {}).get("items") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("cursor") or [None])[0] if not cur: break params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur} state["audit_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")} # -------- Issues -------- def pull_issues(state: dict) -> dict: cursor = state.get("issues_cursor") # stores 'starting_after' pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues" params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT} if cursor: params["starting_after"] = cursor while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("issues", payload) data_items = payload.get("data") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("starting_after") or [None])[0] if not cur: break params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur} state["issues_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")} def lambda_handler(event=None, context=None): state = _get_state() audit_res = pull_audit_logs(state) issues_res = pull_issues(state) _put_state(state) return {"ok": True, "audit": audit_res, "issues": issues_res} if __name__ == "__main__": print(lambda_handler())
구성 > 환경 변수 > 수정 > 새 환경 변수 추가로 이동합니다.
다음 환경 변수를 입력하고 값으로 바꿉니다.
키 예 S3_BUCKET
snyk-group-logs
S3_PREFIX
snyk/group/
STATE_KEY
snyk/group/state.json
SNYK_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxx
GROUP_ID
<group_uuid>
API_BASE
https://api.snyk.io
SNYK_AUDIT_API_VERSION
2021-06-04
SNYK_ISSUES_API_VERSION
2024-10-15
AUDIT_PAGE_SIZE
100
ISSUES_PAGE_LIMIT
200
MAX_PAGES
20
LOOKBACK_SECONDS
3600
함수가 생성된 후 해당 페이지에 머무르거나 Lambda > 함수 >
<your-function>
를 엽니다.구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)으로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요율 (
1 hour
) - 타겟: Lambda 함수
snyk_group_audit_issues_to_s3
- 이름:
snyk-group-audit-issues-1h
.
- 반복 일정: 요율 (
- 일정 만들기를 클릭합니다.
선택사항: Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기
- AWS 콘솔에서 IAM > 사용자 > 사용자 추가로 이동합니다.
- Add users를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader
- 액세스 유형: 액세스 키 — 프로그래매틱 액세스
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
JSON 편집기에서 다음 정책을 입력합니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-group-logs" } ] }
이름을
secops-reader-policy
로 설정합니다.정책 만들기 > 검색/선택 > 다음 > 권한 추가로 이동합니다.
보안 사용자 인증 정보> 액세스 키> 액세스 키 만들기로 이동합니다.
CSV를 다운로드합니다 (이러한 값은 피드에 입력됨).
Snyk 그룹 수준 감사 및 문제 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- + 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다 (예:
Snyk Group Audit/Issues
). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Snyk 그룹 수준 감사/문제 로그를 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI:
s3://snyk-group-logs/snyk/group/
- 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스:
snyk.group
- 수집 라벨: 원하는 경우 추가합니다.
- S3 URI:
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.