收集 Snyk 群組層級稽核記錄
本文說明如何使用 Amazon S3,將 Snyk 群組層級稽核記錄擷取至 Google Security Operations。剖析器會先清除原始記錄中不必要的欄位,接著,系統會擷取相關資訊 (例如使用者詳細資料、事件類型和時間戳記),並轉換及對應至 Google SecOps UDM 結構定義,以標準化安全記錄表示法。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- Snyk 的特殊存取權 (群組管理員) 和具有群組存取權的 API 權杖
- AWS (S3、IAM、Lambda、EventBridge) 的具備權限存取權
收集 Snyk Group 層級稽核記錄的必要條件 (ID、API 金鑰、機構 ID、權杖)
- 在 Snyk 中,依序點選「你的虛擬人偶」>「帳戶設定」>「API 權杖」。
- 按一下「撤銷並重新產生」 (或「產生」),然後複製權杖。
- 將這個權杖儲存為
SNYK_API_TOKEN
環境變數。
- 在 Snyk 中,切換至您的群組 (左上方的切換器)。
- 前往「群組設定」。從網址複製
<GROUP_ID>
:https://app.snyk.io/group/<GROUP_ID>/settings
。 - 或是使用 REST API:
GET https://api.snyk.io/rest/groups?version=2021-06-04
並選取id
。
- 前往「群組設定」。從網址複製
- 確認權杖使用者具備「查看稽核記錄 (group.audit.read)」權限。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 值區:建立值區
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
snyk-audit
)。 - 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
WriteSnykAuditToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
- 請提供下列設定詳細資料:
設定 | 值 |
---|---|
名稱 | snyk_group_audit_to_s3 |
執行階段 | Python 3.13 |
架構 | x86_64 |
執行角色 | WriteSnykAuditToS3Role |
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
新增環境變數
- 依序前往「設定」>「環境變數」。
- 依序點選「編輯」> 新增環境變數。
輸入下列環境變數,並將 換成您的值:
鍵 範例 S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(選填)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(選填) group.create,org.user.add
EXCLUDE_EVENTS
(選填) api.access
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式。
- 名稱:
snyk-group-audit-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」>「Add users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:
secops-reader
。 - 存取類型:存取金鑰 - 程式輔助存取。
- 使用者:
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定資訊提供,擷取 Snyk 群組層級的稽核記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Snyk Group Audit Logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Snyk Group level audit Logs」(Snyk 群組層級稽核記錄) 做為「Log type」(記錄類型)。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://snyk-audit/snyk/audit/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:
snyk.group_audit
- 擷取標籤:視需要新增。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
content.url | principal.url | 直接從原始記錄中的 content.url 欄位對應。 |
已建立 | metadata.event_timestamp | 使用 ISO8601 格式,從原始記錄的 created 欄位剖析而來。 |
活動 | metadata.product_event_type | 直接從原始記錄中的 event 欄位對應。 |
groupId | principal.user.group_identifiers | 直接從原始記錄中的 groupId 欄位對應。 |
orgId | principal.user.attribute.labels.key | 設為「orgId」。 |
orgId | principal.user.attribute.labels.value | 直接從原始記錄中的 orgId 欄位對應。 |
userId | principal.user.userid | 直接從原始記錄中的 userId 欄位對應。 |
不適用 | metadata.event_type | 在剖析器程式碼中,以硬式編碼方式設為「USER_UNCATEGORIZED」。 |
不適用 | metadata.log_type | 在剖析器程式碼中,以硬式編碼方式設為「SNYK_SDLC」。 |
不適用 | metadata.product_name | 在剖析器程式碼中,硬式編碼為「SNYK SDLC」。 |
不適用 | metadata.vendor_name | 在剖析器程式碼中,以硬式編碼方式設為「SNYK_SDLC」。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。