Audit-Logs auf Gruppenebene von Snyk erfassen

Unterstützt in:

In diesem Dokument wird beschrieben, wie Sie Audit-Logs auf Gruppenebene von Snyk mithilfe von Amazon S3 in Google Security Operations aufnehmen. Der Parser bereinigt zuerst unnötige Felder aus den Rohlogs. Anschließend werden relevante Informationen wie Nutzerdetails, Ereignistyp und Zeitstempel extrahiert, transformiert und dem Google SecOps UDM-Schema zugeordnet, um eine standardisierte Darstellung von Sicherheitslogs zu ermöglichen.

Hinweise

Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:

  • Google SecOps-Instanz
  • Privilegierter Zugriff auf Snyk (Gruppenadministrator) und ein API-Token mit Zugriff auf die Gruppe
  • Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)

Voraussetzungen für das Erfassen von Audit-Logs auf Snyk-Gruppenebene (IDs, API-Schlüssel, Organisations-IDs, Tokens)

  1. Klicken Sie in Snyk auf Ihren Avatar > Kontoeinstellungen > API-Token.
    • Klicken Sie auf Widerrufen und neu generieren (oder Generieren) und kopieren Sie das Token.
    • Speichern Sie dieses Token als Umgebungsvariable SNYK_API_TOKEN.
  2. Wechseln Sie in Snyk zu Ihrer Gruppe (Auswahlfeld oben links).
    • Rufen Sie die Gruppeneinstellungen auf. Kopieren Sie die <GROUP_ID> aus der URL: https://app.snyk.io/group/<GROUP_ID>/settings.
    • Alternativ können Sie die REST API verwenden: GET https://api.snyk.io/rest/groups?version=2021-06-04 und id auswählen.
  3. Prüfen Sie, ob der Tokennutzer die Berechtigung Audit-Logs ansehen (group.audit.read) hat.

AWS S3-Bucket und IAM für Google SecOps konfigurieren

  1. Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
  2. Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B. snyk-audit).
  3. Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
  4. Wählen Sie den erstellten Nutzer aus.
  5. Wählen Sie den Tab Sicherheitsanmeldedaten aus.
  6. Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
  7. Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
  8. Klicken Sie auf Weiter.
  9. Optional: Fügen Sie ein Beschreibungstag hinzu.
  10. Klicken Sie auf Zugriffsschlüssel erstellen.
  11. Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) zur späteren Verwendung zu speichern.
  12. Klicken Sie auf Fertig.
  13. Wählen Sie den Tab Berechtigungen aus.
  14. Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
  15. Wählen Sie Berechtigungen hinzufügen aus.
  16. Wählen Sie Richtlinien direkt anhängen aus.
  17. Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
  18. Klicken Sie auf Weiter.
  19. Klicken Sie auf Berechtigungen hinzufügen.

IAM-Richtlinie und ‑Rolle für S3-Uploads konfigurieren

  1. Rufen Sie in der AWS-Konsole IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
  2. Geben Sie die folgende Richtlinie ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSnykAuditObjects",
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::snyk-audit/*"
        }
      ]
    }
    
  3. Klicken Sie auf Weiter > Richtlinie erstellen.

  4. Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.

  5. Hängen Sie die neu erstellte Richtlinie an.

  6. Geben Sie der Rolle den Namen WriteSnykAuditToS3Role und klicken Sie auf Rolle erstellen.

Lambda-Funktion erstellen

  1. Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
  2. Klicken Sie auf Von Grund auf erstellen.
  3. Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert
Name snyk_group_audit_to_s3
Laufzeit Python 3.13
Architektur x86_64
Ausführungsrolle WriteSnykAuditToS3Role
  1. Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (snyk_group_audit_to_s3.py):

    # snyk_group_audit_to_s3.py
    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError
    import boto3
    
    BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/")
    GROUP_ID = os.environ["SNYK_GROUP_ID"].strip()
    API_TOKEN = os.environ["SNYK_API_TOKEN"].strip()
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip()
    SIZE = int(os.environ.get("SIZE", "100"))  # max 100 per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json")
    API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip()  # required by REST API
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))  # used only when no cursor
    
    # Optional filters
    EVENTS_CSV = os.environ.get("EVENTS", "").strip()            # e.g. "group.create,org.user.invited"
    EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip()
    
    s3 = boto3.client("s3")
    
    HDRS = {
        # REST authentication requires "token" scheme and vnd.api+json Accept
        "Authorization": f"token {API_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    def _get_state() -> str | None:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read()).get("cursor")
        except Exception:
            return None
    
    def _put_state(cursor: str):
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8"))
    
    def _write(payload: dict) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _parse_next_cursor_from_links(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        try:
            q = urllib.parse.urlparse(nxt).query
            params = urllib.parse.parse_qs(q)
            cur = params.get("cursor")
            return cur[0] if cur else None
        except Exception:
            return None
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            # Back off on rate limit or transient server errors; single retry
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _as_list(csv_str: str) -> list[str]:
        return [x.strip() for x in csv_str.split(",") if x.strip()]
    
    def fetch_page(cursor: str | None, first_run_from_iso: str | None):
        base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {
            "version": API_VERSION,
            "size": SIZE,
        }
        if cursor:
            params["cursor"] = cursor
        elif first_run_from_iso:
            params["from"] = first_run_from_iso  # RFC3339
    
        events = _as_list(EVENTS_CSV)
        exclude_events = _as_list(EXCLUDE_EVENTS_CSV)
        if events and exclude_events:
            # API does not allow both at the same time; prefer explicit include
            exclude_events = []
        if events:
            params["events"] = events  # will be encoded as repeated params
        if exclude_events:
            params["exclude_events"] = exclude_events
    
        url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}"
        return _http_get(url)
    
    def lambda_handler(event=None, context=None):
        cursor = _get_state()
        pages = 0
        total = 0
        last_cursor = cursor
    
        # Only for the very first run (no saved cursor), constrain the time window
        first_run_from_iso = None
        if not cursor and LOOKBACK_SECONDS > 0:
            first_run_from_iso = time.strftime(
                "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS)
            )
    
        while pages < MAX_PAGES:
            payload = fetch_page(cursor, first_run_from_iso)
            _write(payload)
    
            # items are nested under data.items per Snyk docs
            data_obj = payload.get("data") or {}
            items = data_obj.get("items") or []
            if isinstance(items, list):
                total += len(items)
    
            cursor = _parse_next_cursor_from_links(payload.get("links"))
            pages += 1
            if not cursor:
                break
    
            # after first page, disable from-filter
            first_run_from_iso = None
    
        if cursor and cursor != last_cursor:
            _put_state(cursor)
    
        return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor}
    
    if __name__ == "__main__":
        print(lambda_handler())
    

Umgebungsvariablen hinzufügen

  1. Rufen Sie Konfiguration > Umgebungsvariablen auf.
  2. Klicken Sie auf Bearbeiten> Neue Umgebungsvariable hinzufügen.
  3. Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte:

    Schlüssel Beispiel
    S3_BUCKET snyk-audit
    S3_PREFIX snyk/audit/
    STATE_KEY snyk/audit/state.json
    SNYK_GROUP_ID <your_group_id>
    SNYK_API_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SNYK_API_BASE https://api.snyk.io (optional)
    SNYK_API_VERSION 2021-06-04
    SIZE 100
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    EVENTS (optional) group.create,org.user.add
    EXCLUDE_EVENTS (optional) api.access
  4. Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre Funktion.

  5. Wählen Sie den Tab Konfiguration aus.

  6. Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.

  7. Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.

EventBridge-Zeitplan erstellen

  1. Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
  2. Geben Sie die folgenden Konfigurationsdetails an:
    • Wiederkehrender Zeitplan: Preis (1 hour).
    • Ziel: Ihre Lambda-Funktion.
    • Name: snyk-group-audit-1h.
  3. Klicken Sie auf Zeitplan erstellen.

Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen

  1. Rufen Sie in der AWS-Konsole IAM > Nutzer > Nutzer hinzufügen auf.
  2. Klicken Sie auf Add users (Nutzer hinzufügen).
  3. Geben Sie die folgenden Konfigurationsdetails an:
    • Nutzer: secops-reader.
    • Zugriffstyp: Zugriffsschlüssel – programmatischer Zugriff.
  4. Klicken Sie auf Nutzer erstellen.
  5. Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
  6. Geben Sie im JSON-Editor die folgende Richtlinie ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" },
        { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" }
      ]
    }
    
  7. Legen Sie secops-reader-policy als Name fest.

  8. Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.

  9. Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.

  10. Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).

Feed in Google SecOps konfigurieren, um Audit-Logs auf Snyk-Gruppenebene aufzunehmen

  1. Rufen Sie die SIEM-Einstellungen > Feeds auf.
  2. Klicken Sie auf + Neuen Feed hinzufügen.
  3. Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B. Snyk Group Audit Logs.
  4. Wählen Sie Amazon S3 V2 als Quelltyp aus.
  5. Wählen Sie Audit-Logs auf Snyk-Gruppenebene als Logtyp aus.
  6. Klicken Sie auf Weiter.
  7. Geben Sie Werte für die folgenden Eingabeparameter an:
    • S3-URI: s3://snyk-audit/snyk/audit/
    • Optionen zum Löschen von Quellen: Wählen Sie die gewünschte Option zum Löschen aus.
    • Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Der Standardwert ist 180 Tage.
    • Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
    • Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
    • Asset-Namespace: snyk.group_audit
    • Labels für die Aufnahme: Fügen Sie diese bei Bedarf hinzu.
  8. Klicken Sie auf Weiter.
  9. Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.

UDM-Zuordnungstabelle

Logfeld UDM-Zuordnung Logik
content.url principal.url Direkt aus dem Feld content.url im Rohlog zugeordnet.
erstellt metadata.event_timestamp Wird aus dem Feld created im Rohlog im ISO8601-Format geparst.
Ereignis metadata.product_event_type Direkt aus dem Feld event im Rohlog zugeordnet.
groupId principal.user.group_identifiers Direkt aus dem Feld groupId im Rohlog zugeordnet.
orgId principal.user.attribute.labels.key Legen Sie diesen Wert auf „orgId“ fest.
orgId principal.user.attribute.labels.value Direkt aus dem Feld orgId im Rohlog zugeordnet.
userId principal.user.userid Direkt aus dem Feld userId im Rohlog zugeordnet.
metadata.event_type Im Parsercode ist „USER_UNCATEGORIZED“ fest codiert.
metadata.log_type Im Parsercode ist „SNYK_SDLC“ hartcodiert.
metadata.product_name Im Parsercode ist „SNYK SDLC“ hartcodiert.
metadata.vendor_name Im Parsercode ist „SNYK_SDLC“ hartcodiert.

Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten