Collecter les journaux d'audit et de problèmes au niveau du groupe Snyk
Ce guide explique comment ingérer les journaux d'audit et de problèmes au niveau du groupe Snyk dans Google Security Operations à l'aide d'Amazon S3.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Instance Google SecOps
- Accès privilégié au groupe Snyk (jeton d'API avec accès en lecture ; ID de groupe)
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Obtenir l'ID de groupe et le jeton d'API Snyk
- Dans l'UI Snyk, accédez à Paramètres du compte > Jeton d'API et générez le jeton d'API.
- Copiez et enregistrez le jeton dans un emplacement sécurisé pour l'utiliser ultérieurement comme
SNYK_TOKEN
. - Passez à votre groupe, puis ouvrez les paramètres du groupe.
- Copiez l'ID du groupe à partir de l'URL (
https://app.snyk.io/group/<GROUP_ID>/...
) pour l'utiliser ultérieurement commeGROUP_ID
. - Point de terminaison de l'API de base :
https://api.snyk.io
(remplacez-le parAPI_BASE
si nécessaire). - Attribuez le rôle d'administrateur de groupe à l'utilisateur disposant du jeton. (L'utilisateur doit pouvoir consulter les journaux d'audit des groupes et les problèmes liés aux groupes.)
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple,
snyk-group-logs
). - Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies > Créer une stratégie > Onglet JSON.
Saisissez le règlement suivant (qui inclut l'accès en écriture pour tous les objets du bucket et l'accès en lecture au fichier d'état utilisé par votre fonction Lambda) :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PutAllSnykGroupObjects", "Effect": "Allow", "Action": ["s3:PutObject", "s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" } ] }
- Remplacez
snyk-group-logs
si vous avez saisi un autre nom de bucket.
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
WriteSnykGroupToS3Role
, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
- Fournissez les informations de configuration suivantes :
Paramètre | Valeur |
---|---|
Nom | snyk_group_audit_issues_to_s3 |
Durée d'exécution | Python 3.13 |
Architecture | x86_64 |
Rôle d'exécution | WriteSnykGroupToS3Role |
Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
snyk_group_audit_issues_to_s3.py
) :#!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.parse import urlparse, parse_qs from urllib.error import HTTPError import boto3 API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/") SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip() GROUP_ID = os.environ["GROUP_ID"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip() STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip() # Page sizes & limits AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100")) # audit uses 'size' (max 100) ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200")) # issues uses 'limit' MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) # API versions (Snyk REST requires a 'version' param) AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip() ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip() # First-run lookback for audit to avoid huge backfills LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) HDRS = { "Authorization": f"token {SNYK_TOKEN}", "Accept": "application/vnd.api+json", } s3 = boto3.client("s3") def _get_state() -> dict: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read() or b"{}") except Exception: return {} def _put_state(state: dict): s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _write_page(kind: str, payload: dict) -> str: ts = time.gmtime() key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _next_href(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None if isinstance(nxt, str): return nxt if isinstance(nxt, dict): return nxt.get("href") return None # -------- Audit Logs -------- def pull_audit_logs(state: dict) -> dict: cursor = state.get("audit_cursor") pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE} if cursor: params["cursor"] = cursor else: now = time.time() params["from"] = _iso(now - LOOKBACK_SECONDS) params["to"] = _iso(now) while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("audit", payload) data_items = (payload.get("data") or {}).get("items") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("cursor") or [None])[0] if not cur: break params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur} state["audit_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")} # -------- Issues -------- def pull_issues(state: dict) -> dict: cursor = state.get("issues_cursor") # stores 'starting_after' pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues" params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT} if cursor: params["starting_after"] = cursor while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("issues", payload) data_items = payload.get("data") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("starting_after") or [None])[0] if not cur: break params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur} state["issues_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")} def lambda_handler(event=None, context=None): state = _get_state() audit_res = pull_audit_logs(state) issues_res = pull_issues(state) _put_state(state) return {"ok": True, "audit": audit_res, "issues": issues_res} if __name__ == "__main__": print(lambda_handler())
Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :
Clé Exemple S3_BUCKET
snyk-group-logs
S3_PREFIX
snyk/group/
STATE_KEY
snyk/group/state.json
SNYK_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxx
GROUP_ID
<group_uuid>
API_BASE
https://api.snyk.io
SNYK_AUDIT_API_VERSION
2021-06-04
SNYK_ISSUES_API_VERSION
2024-10-15
AUDIT_PAGE_SIZE
100
ISSUES_PAGE_LIMIT
200
MAX_PAGES
20
LOOKBACK_SECONDS
3600
Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions >
<your-function>
).Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour
). - Cible : votre fonction Lambda
snyk_group_audit_issues_to_s3
. - Nom :
snyk-group-audit-issues-1h
.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Dans la console AWS, accédez à IAM > Utilisateurs > Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur :
secops-reader
. - Type d'accès : Clé d'accès – Accès programmatique.
- Utilisateur :
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-group-logs" } ] }
Définissez le nom sur
secops-reader-policy
.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux d'audit et de problèmes au niveau du groupe Snyk
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Snyk Group Audit/Issues
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Journaux d'audit/d'incidents au niveau du groupe Snyk pour le Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://snyk-group-logs/snyk/group/
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'asset :
snyk.group
- Libellés d'ingestion : ajoutez-en si vous le souhaitez.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.