Raccogli i log di controllo e dei problemi a livello di gruppo Snyk
Questa guida spiega come importare i log di controllo e dei problemi a livello di gruppo di Snyk in Google Security Operations utilizzando Amazon S3.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Istanza Google SecOps
- Accesso privilegiato al gruppo Snyk (token API con accesso in lettura; ID gruppo)
- Accesso con privilegi ad AWS (S3, IAM, Lambda, EventBridge)
Recuperare l'ID gruppo e il token API di Snyk
- Nell'interfaccia utente di Snyk, vai a Impostazioni account > Token API e genera il token API.
- Copia e salva il token in una posizione sicura per utilizzarlo in un secondo momento come
SNYK_TOKEN
. - Passa al tuo Gruppo e apri le Impostazioni del gruppo.
- Copia e salva l'ID gruppo dall'URL (
https://app.snyk.io/group/<GROUP_ID>/...
) per utilizzarlo in un secondo momento comeGROUP_ID
. - Endpoint API di base:
https://api.snyk.io
(esegui l'override conAPI_BASE
se necessario). - Assegna il ruolo Amministratore del gruppo all'utente con il token. L'utente deve essere in grado di visualizzare i log di controllo dei gruppi e i problemi dei gruppi.
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
snyk-group-logs
). - Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy > Crea policy > scheda JSON.
Inserisci il seguente criterio (include l'accesso in scrittura per tutti gli oggetti nel bucket e l'accesso in lettura al file di stato utilizzato dalla tua funzione Lambda):
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PutAllSnykGroupObjects", "Effect": "Allow", "Action": ["s3:PutObject", "s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" } ] }
- Sostituisci
snyk-group-logs
se hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
WriteSnykGroupToS3Role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
- Fornisci i seguenti dettagli di configurazione:
Impostazione | Valore |
---|---|
Nome | snyk_group_audit_issues_to_s3 |
Tempo di esecuzione | Python 3.13 |
Architettura | x86_64 |
Ruolo di esecuzione | WriteSnykGroupToS3Role |
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
snyk_group_audit_issues_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.parse import urlparse, parse_qs from urllib.error import HTTPError import boto3 API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/") SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip() GROUP_ID = os.environ["GROUP_ID"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip() STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip() # Page sizes & limits AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100")) # audit uses 'size' (max 100) ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200")) # issues uses 'limit' MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) # API versions (Snyk REST requires a 'version' param) AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip() ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip() # First-run lookback for audit to avoid huge backfills LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) HDRS = { "Authorization": f"token {SNYK_TOKEN}", "Accept": "application/vnd.api+json", } s3 = boto3.client("s3") def _get_state() -> dict: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read() or b"{}") except Exception: return {} def _put_state(state: dict): s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _write_page(kind: str, payload: dict) -> str: ts = time.gmtime() key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _next_href(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None if isinstance(nxt, str): return nxt if isinstance(nxt, dict): return nxt.get("href") return None # -------- Audit Logs -------- def pull_audit_logs(state: dict) -> dict: cursor = state.get("audit_cursor") pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE} if cursor: params["cursor"] = cursor else: now = time.time() params["from"] = _iso(now - LOOKBACK_SECONDS) params["to"] = _iso(now) while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("audit", payload) data_items = (payload.get("data") or {}).get("items") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("cursor") or [None])[0] if not cur: break params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur} state["audit_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")} # -------- Issues -------- def pull_issues(state: dict) -> dict: cursor = state.get("issues_cursor") # stores 'starting_after' pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues" params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT} if cursor: params["starting_after"] = cursor while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("issues", payload) data_items = payload.get("data") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("starting_after") or [None])[0] if not cur: break params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur} state["issues_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")} def lambda_handler(event=None, context=None): state = _get_state() audit_res = pull_audit_logs(state) issues_res = pull_issues(state) _put_state(state) return {"ok": True, "audit": audit_res, "issues": issues_res} if __name__ == "__main__": print(lambda_handler())
Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:
Chiave Esempio S3_BUCKET
snyk-group-logs
S3_PREFIX
snyk/group/
STATE_KEY
snyk/group/state.json
SNYK_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxx
GROUP_ID
<group_uuid>
API_BASE
https://api.snyk.io
SNYK_AUDIT_API_VERSION
2021-06-04
SNYK_ISSUES_API_VERSION
2024-10-15
AUDIT_PAGE_SIZE
100
ISSUES_PAGE_LIMIT
200
MAX_PAGES
20
LOOKBACK_SECONDS
3600
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni >
<your-function>
).Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Target: la tua funzione Lambda
snyk_group_audit_issues_to_s3
. - Nome:
snyk-group-audit-issues-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti > Aggiungi utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente:
secops-reader
. - Tipo di accesso: Chiave di accesso - Accesso programmatico.
- Utente:
- Fai clic su Crea utente.
- Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-group-logs" } ] }
Imposta il nome su
secops-reader-policy
.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare i log di controllo e dei problemi a livello di gruppo Snyk
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Snyk Group Audit/Issues
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Log di controllo/problemi a livello di gruppo Snyk come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://snyk-group-logs/snyk/group/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset:
snyk.group
- Etichette di importazione: aggiungile se vuoi.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.