Collecter les journaux d'audit et de problèmes au niveau du groupe Snyk

Compatible avec :

Ce guide explique comment ingérer les journaux d'audit et de problèmes au niveau du groupe Snyk dans Google Security Operations à l'aide d'Amazon S3.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Instance Google SecOps
  • Accès privilégié au groupe Snyk (jeton d'API avec accès en lecture ; ID de groupe)
  • Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)

Obtenir l'ID de groupe et le jeton d'API Snyk

  1. Dans l'UI Snyk, accédez à Paramètres du compte > Jeton d'API et générez le jeton d'API.
  2. Copiez et enregistrez le jeton dans un emplacement sécurisé pour l'utiliser ultérieurement comme SNYK_TOKEN.
  3. Passez à votre groupe, puis ouvrez les paramètres du groupe.
  4. Copiez l'ID du groupe à partir de l'URL (https://app.snyk.io/group/<GROUP_ID>/...) pour l'utiliser ultérieurement comme GROUP_ID.
  5. Point de terminaison de l'API de base : https://api.snyk.io (remplacez-le par API_BASE si nécessaire).
  6. Attribuez le rôle d'administrateur de groupe à l'utilisateur disposant du jeton. (L'utilisateur doit pouvoir consulter les journaux d'audit des groupes et les problèmes liés aux groupes.)

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple, snyk-group-logs).
  3. Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez et sélectionnez la règle AmazonS3FullAccess.
  18. Cliquez sur Suivant.
  19. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Dans la console AWS, accédez à IAM > Stratégies > Créer une stratégie > Onglet JSON.
  2. Saisissez le règlement suivant (qui inclut l'accès en écriture pour tous les objets du bucket et l'accès en lecture au fichier d'état utilisé par votre fonction Lambda) :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "PutAllSnykGroupObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject", "s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        }
      ]
    }
    
    • Remplacez snyk-group-logs si vous avez saisi un autre nom de bucket.
  3. Cliquez sur Suivant > Créer une règle.

  4. Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.

  5. Associez la règle que vous venez de créer.

  6. Nommez le rôle WriteSnykGroupToS3Role, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :
Paramètre Valeur
Nom snyk_group_audit_issues_to_s3
Durée d'exécution Python 3.13
Architecture x86_64
Rôle d'exécution WriteSnykGroupToS3Role
  1. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (snyk_group_audit_issues_to_s3.py) :

    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.parse import urlparse, parse_qs
    from urllib.error import HTTPError
    import boto3
    
    API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/")
    SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip()
    GROUP_ID = os.environ["GROUP_ID"].strip()
    
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip()
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip()
    
    # Page sizes & limits
    AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100"))       # audit uses 'size' (max 100)
    ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200"))   # issues uses 'limit'
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    
    # API versions (Snyk REST requires a 'version' param)
    AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip()
    ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip()
    
    # First-run lookback for audit to avoid huge backfills
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    HDRS = {
        "Authorization": f"token {SNYK_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    s3 = boto3.client("s3")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read() or b"{}")
        except Exception:
            return {}
    
    def _put_state(state: dict):
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _write_page(kind: str, payload: dict) -> str:
        ts = time.gmtime()
        key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _next_href(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        if isinstance(nxt, str):
            return nxt
        if isinstance(nxt, dict):
            return nxt.get("href")
        return None
    
    # -------- Audit Logs --------
    
    def pull_audit_logs(state: dict) -> dict:
        cursor = state.get("audit_cursor")
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE}
    
        if cursor:
            params["cursor"] = cursor
        else:
            now = time.time()
            params["from"] = _iso(now - LOOKBACK_SECONDS)
            params["to"] = _iso(now)
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("audit", payload)
    
            data_items = (payload.get("data") or {}).get("items") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("cursor") or [None])[0]
            if not cur:
                break
    
            params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur}
            state["audit_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")}
    
    # -------- Issues --------
    
    def pull_issues(state: dict) -> dict:
        cursor = state.get("issues_cursor")  # stores 'starting_after'
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues"
        params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT}
        if cursor:
            params["starting_after"] = cursor
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("issues", payload)
    
            data_items = payload.get("data") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("starting_after") or [None])[0]
            if not cur:
                break
    
            params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur}
            state["issues_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        audit_res = pull_audit_logs(state)
        issues_res = pull_issues(state)
        _put_state(state)
        return {"ok": True, "audit": audit_res, "issues": issues_res}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.

  3. Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :

    Clé Exemple
    S3_BUCKET snyk-group-logs
    S3_PREFIX snyk/group/
    STATE_KEY snyk/group/state.json
    SNYK_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxx
    GROUP_ID <group_uuid>
    API_BASE https://api.snyk.io
    SNYK_AUDIT_API_VERSION 2021-06-04
    SNYK_ISSUES_API_VERSION 2024-10-15
    AUDIT_PAGE_SIZE 100
    ISSUES_PAGE_LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  4. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > <your-function>).

  5. Accédez à l'onglet Configuration.

  6. Dans le panneau Configuration générale, cliquez sur Modifier.

  7. Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
  2. Fournissez les informations de configuration suivantes :
    • Planning récurrent : Tarif (1 hour).
    • Cible : votre fonction Lambda snyk_group_audit_issues_to_s3.
    • Nom : snyk-group-audit-issues-1h.
  3. Cliquez sur Créer la programmation.

Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps

  1. Dans la console AWS, accédez à IAM > Utilisateurs > Ajouter des utilisateurs.
  2. Cliquez sur Add users (Ajouter des utilisateurs).
  3. Fournissez les informations de configuration suivantes :
    • Utilisateur : secops-reader.
    • Type d'accès : Clé d'accès – Accès programmatique.
  4. Cliquez sur Créer un utilisateur.
  5. Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
  6. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::snyk-group-logs"
        }
      ]
    }
    
  7. Définissez le nom sur secops-reader-policy.

  8. Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.

  9. Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.

  10. Téléchargez le CSV (ces valeurs sont saisies dans le flux).

Configurer un flux dans Google SecOps pour ingérer les journaux d'audit et de problèmes au niveau du groupe Snyk

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur + Ajouter un flux.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Snyk Group Audit/Issues).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez Journaux d'audit/d'incidents au niveau du groupe Snyk pour le Type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://snyk-group-logs/snyk/group/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'asset : snyk.group
    • Libellés d'ingestion : ajoutez-en si vous le souhaitez.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.