Recopila registros de auditoría y de problemas a nivel del grupo de Snyk
En esta guía, se explica cómo puedes transferir los registros de auditoría y problemas a nivel del grupo de Snyk a Google Security Operations con Amazon S3.
Antes de comenzar
Asegúrate de cumplir con los siguientes requisitos previos:
- Instancia de Google SecOps
- Acceso privilegiado al grupo de Snyk (token de API con acceso de lectura; ID de grupo)
- Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)
Obtén el ID del grupo y el token de API de Snyk
- En la IU de Snyk, ve a Configuración de la cuenta > Token de API y genera el token de API.
- Copia y guarda el token en una ubicación segura para usarlo más adelante como
SNYK_TOKEN
. - Cambia a tu grupo y abre Configuración del grupo.
- Copia y guarda el ID de grupo de la URL (
https://app.snyk.io/group/<GROUP_ID>/...
) para usarlo más adelante comoGROUP_ID
. - Extremo de API base:
https://api.snyk.io
(anula conAPI_BASE
si es necesario). - Asigna el rol de administrador del grupo al usuario con el token. (El usuario debe poder ver los Registros de auditoría de grupos y los Problemas del grupo).
Configura el bucket de AWS S3 y el IAM para Google SecOps
- Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
- Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo,
snyk-group-logs
). - Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
- Selecciona el usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- Haz clic en Crear clave de acceso en la sección Claves de acceso.
- Selecciona Servicio de terceros como el Caso de uso.
- Haz clic en Siguiente.
- Opcional: Agrega una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- Haz clic en Agregar permisos en la sección Políticas de permisos.
- Selecciona Agregar permisos.
- Selecciona Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Agregar permisos.
Configura la política y el rol de IAM para las cargas de S3
- En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
Ingresa la siguiente política (incluye acceso de escritura para todos los objetos del bucket y acceso de lectura al archivo de estado que usa tu función Lambda):
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PutAllSnykGroupObjects", "Effect": "Allow", "Action": ["s3:PutObject", "s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" } ] }
- Reemplaza
snyk-group-logs
si ingresaste un nombre de bucket diferente.
- Reemplaza
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política recién creada.
Asigna el nombre
WriteSnykGroupToS3Role
al rol y haz clic en Crear rol.
Crea la función Lambda
- En la consola de AWS, ve a Lambda > Functions > Create function.
- Haz clic en Crear desde cero.
- Proporciona los siguientes detalles de configuración:
Configuración | Valor |
---|---|
Nombre | snyk_group_audit_issues_to_s3 |
Tiempo de ejecución | Python 3.13 |
Arquitectura | x86_64 |
Rol de ejecución | WriteSnykGroupToS3Role |
Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (
snyk_group_audit_issues_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.parse import urlparse, parse_qs from urllib.error import HTTPError import boto3 API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/") SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip() GROUP_ID = os.environ["GROUP_ID"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip() STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip() # Page sizes & limits AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100")) # audit uses 'size' (max 100) ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200")) # issues uses 'limit' MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) # API versions (Snyk REST requires a 'version' param) AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip() ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip() # First-run lookback for audit to avoid huge backfills LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) HDRS = { "Authorization": f"token {SNYK_TOKEN}", "Accept": "application/vnd.api+json", } s3 = boto3.client("s3") def _get_state() -> dict: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read() or b"{}") except Exception: return {} def _put_state(state: dict): s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _write_page(kind: str, payload: dict) -> str: ts = time.gmtime() key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _next_href(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None if isinstance(nxt, str): return nxt if isinstance(nxt, dict): return nxt.get("href") return None # -------- Audit Logs -------- def pull_audit_logs(state: dict) -> dict: cursor = state.get("audit_cursor") pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE} if cursor: params["cursor"] = cursor else: now = time.time() params["from"] = _iso(now - LOOKBACK_SECONDS) params["to"] = _iso(now) while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("audit", payload) data_items = (payload.get("data") or {}).get("items") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("cursor") or [None])[0] if not cur: break params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur} state["audit_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")} # -------- Issues -------- def pull_issues(state: dict) -> dict: cursor = state.get("issues_cursor") # stores 'starting_after' pages = 0 total = 0 base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues" params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT} if cursor: params["starting_after"] = cursor while pages < MAX_PAGES: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url) _write_page("issues", payload) data_items = payload.get("data") or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get("links")) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get("starting_after") or [None])[0] if not cur: break params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur} state["issues_cursor"] = cur pages += 1 return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")} def lambda_handler(event=None, context=None): state = _get_state() audit_res = pull_audit_logs(state) issues_res = pull_issues(state) _put_state(state) return {"ok": True, "audit": audit_res, "issues": issues_res} if __name__ == "__main__": print(lambda_handler())
Ve a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.
Ingresa las siguientes variables de entorno y reemplázalas por tus valores:
Clave Ejemplo S3_BUCKET
snyk-group-logs
S3_PREFIX
snyk/group/
STATE_KEY
snyk/group/state.json
SNYK_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxx
GROUP_ID
<group_uuid>
API_BASE
https://api.snyk.io
SNYK_AUDIT_API_VERSION
2021-06-04
SNYK_ISSUES_API_VERSION
2024-10-15
AUDIT_PAGE_SIZE
100
ISSUES_PAGE_LIMIT
200
MAX_PAGES
20
LOOKBACK_SECONDS
3600
Después de crear la función, permanece en su página (o abre Lambda > Functions >
<your-function>
).Selecciona la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crea una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule.
- Proporciona los siguientes detalles de configuración:
- Programación recurrente: Frecuencia (
1 hour
). - Destino: Tu función Lambda
snyk_group_audit_issues_to_s3
. - Nombre:
snyk-group-audit-issues-1h
.
- Programación recurrente: Frecuencia (
- Haz clic en Crear programación.
Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps
- En la consola de AWS, ve a IAM > Usuarios > Agregar usuarios.
- Haz clic en Agregar usuarios.
- Proporciona los siguientes detalles de configuración:
- Usuario:
secops-reader
. - Tipo de acceso: Clave de acceso: Acceso programático
- Usuario:
- Haz clic en Crear usuario.
- Adjunta la política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política.
En el editor de JSON, ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-group-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-group-logs" } ] }
Configura el nombre como
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se ingresan en el feed).
Configura un feed en Google SecOps para transferir los registros de auditoría y de problemas a nivel del grupo de Snyk
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Agregar feed nuevo.
- En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo,
Snyk Group Audit/Issues
). - Selecciona Amazon S3 V2 como el Tipo de fuente.
- Selecciona Snyk Group level audit/issues logs como el Tipo de registro.
- Haz clic en Siguiente.
- Especifica valores para los siguientes parámetros de entrada:
- URI de S3:
s3://snyk-group-logs/snyk/group/
- Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
- Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
- ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
- Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres del activo:
snyk.group
- Etiquetas de transferencia: Agrégalas si lo deseas.
- URI de S3:
- Haz clic en Siguiente.
- Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.