Audit-Logs auf Gruppenebene von Snyk erfassen
In diesem Dokument wird beschrieben, wie Sie Audit-Logs auf Gruppenebene von Snyk mithilfe von Amazon S3 in Google Security Operations aufnehmen. Der Parser bereinigt zuerst unnötige Felder aus den Rohlogs. Anschließend werden relevante Informationen wie Nutzerdetails, Ereignistyp und Zeitstempel extrahiert, transformiert und dem Google SecOps UDM-Schema zugeordnet, um eine standardisierte Darstellung von Sicherheitslogs zu ermöglichen.
Hinweise
Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:
- Google SecOps-Instanz
- Privilegierter Zugriff auf Snyk (Gruppenadministrator) und ein API-Token mit Zugriff auf die Gruppe
- Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)
Voraussetzungen für das Erfassen von Audit-Logs auf Snyk-Gruppenebene (IDs, API-Schlüssel, Organisations-IDs, Tokens)
- Klicken Sie in Snyk auf Ihren Avatar > Kontoeinstellungen > API-Token.
- Klicken Sie auf Widerrufen und neu generieren (oder Generieren) und kopieren Sie das Token.
- Speichern Sie dieses Token als Umgebungsvariable
SNYK_API_TOKEN
.
- Wechseln Sie in Snyk zu Ihrer Gruppe (Auswahlfeld oben links).
- Rufen Sie die Gruppeneinstellungen auf. Kopieren Sie die
<GROUP_ID>
aus der URL:https://app.snyk.io/group/<GROUP_ID>/settings
. - Alternativ können Sie die REST API verwenden:
GET https://api.snyk.io/rest/groups?version=2021-06-04
undid
auswählen.
- Rufen Sie die Gruppeneinstellungen auf. Kopieren Sie die
- Prüfen Sie, ob der Tokennutzer die Berechtigung Audit-Logs ansehen (group.audit.read) hat.
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
snyk-audit
). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) zur späteren Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
IAM-Richtlinie und ‑Rolle für S3-Uploads konfigurieren
- Rufen Sie in der AWS-Konsole IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
Geben Sie die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
Klicken Sie auf Weiter > Richtlinie erstellen.
Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.
Hängen Sie die neu erstellte Richtlinie an.
Geben Sie der Rolle den Namen
WriteSnykAuditToS3Role
und klicken Sie auf Rolle erstellen.
Lambda-Funktion erstellen
- Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
- Klicken Sie auf Von Grund auf erstellen.
- Geben Sie die folgenden Konfigurationsdetails an:
Einstellung | Wert |
---|---|
Name | snyk_group_audit_to_s3 |
Laufzeit | Python 3.13 |
Architektur | x86_64 |
Ausführungsrolle | WriteSnykAuditToS3Role |
Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
Umgebungsvariablen hinzufügen
- Rufen Sie Konfiguration > Umgebungsvariablen auf.
- Klicken Sie auf Bearbeiten> Neue Umgebungsvariable hinzufügen.
Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte:
Schlüssel Beispiel S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(optional)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(optional) group.create,org.user.add
EXCLUDE_EVENTS
(optional) api.access
Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre Funktion.
Wählen Sie den Tab Konfiguration aus.
Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.
Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.
EventBridge-Zeitplan erstellen
- Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Wiederkehrender Zeitplan: Preis (
1 hour
). - Ziel: Ihre Lambda-Funktion.
- Name:
snyk-group-audit-1h
.
- Wiederkehrender Zeitplan: Preis (
- Klicken Sie auf Zeitplan erstellen.
Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen
- Rufen Sie in der AWS-Konsole IAM > Nutzer > Nutzer hinzufügen auf.
- Klicken Sie auf Add users (Nutzer hinzufügen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer:
secops-reader
. - Zugriffstyp: Zugriffsschlüssel – programmatischer Zugriff.
- Nutzer:
- Klicken Sie auf Nutzer erstellen.
- Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
Geben Sie im JSON-Editor die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
Legen Sie
secops-reader-policy
als Name fest.Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.
Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.
Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).
Feed in Google SecOps konfigurieren, um Audit-Logs auf Snyk-Gruppenebene aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf + Neuen Feed hinzufügen.
- Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B.
Snyk Group Audit Logs
. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Audit-Logs auf Snyk-Gruppenebene als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://snyk-audit/snyk/audit/
- Optionen zum Löschen von Quellen: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Der Standardwert ist 180 Tage.
- Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
- Asset-Namespace:
snyk.group_audit
- Labels für die Aufnahme: Fügen Sie diese bei Bedarf hinzu.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
UDM-Zuordnungstabelle
Logfeld | UDM-Zuordnung | Logik |
---|---|---|
content.url | principal.url | Direkt aus dem Feld content.url im Rohlog zugeordnet. |
erstellt | metadata.event_timestamp | Wird aus dem Feld created im Rohlog im ISO8601-Format geparst. |
Ereignis | metadata.product_event_type | Direkt aus dem Feld event im Rohlog zugeordnet. |
groupId | principal.user.group_identifiers | Direkt aus dem Feld groupId im Rohlog zugeordnet. |
orgId | principal.user.attribute.labels.key | Legen Sie diesen Wert auf „orgId“ fest. |
orgId | principal.user.attribute.labels.value | Direkt aus dem Feld orgId im Rohlog zugeordnet. |
userId | principal.user.userid | Direkt aus dem Feld userId im Rohlog zugeordnet. |
– | metadata.event_type | Im Parsercode ist „USER_UNCATEGORIZED“ fest codiert. |
– | metadata.log_type | Im Parsercode ist „SNYK_SDLC“ hartcodiert. |
– | metadata.product_name | Im Parsercode ist „SNYK SDLC“ hartcodiert. |
– | metadata.vendor_name | Im Parsercode ist „SNYK_SDLC“ hartcodiert. |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten