Recopila registros de auditoría y de problemas a nivel del grupo de Snyk

Compatible con:

En esta guía, se explica cómo puedes transferir los registros de auditoría y problemas a nivel del grupo de Snyk a Google Security Operations con Amazon S3.

Antes de comenzar

Asegúrate de cumplir con los siguientes requisitos previos:

  • Instancia de Google SecOps
  • Acceso privilegiado al grupo de Snyk (token de API con acceso de lectura; ID de grupo)
  • Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)

Obtén el ID del grupo y el token de API de Snyk

  1. En la IU de Snyk, ve a Configuración de la cuenta > Token de API y genera el token de API.
  2. Copia y guarda el token en una ubicación segura para usarlo más adelante como SNYK_TOKEN.
  3. Cambia a tu grupo y abre Configuración del grupo.
  4. Copia y guarda el ID de grupo de la URL (https://app.snyk.io/group/<GROUP_ID>/...) para usarlo más adelante como GROUP_ID.
  5. Extremo de API base: https://api.snyk.io (anula con API_BASE si es necesario).
  6. Asigna el rol de administrador del grupo al usuario con el token. (El usuario debe poder ver los Registros de auditoría de grupos y los Problemas del grupo).

Configura el bucket de AWS S3 y el IAM para Google SecOps

  1. Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
  2. Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo, snyk-group-logs).
  3. Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
  4. Selecciona el usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. Haz clic en Crear clave de acceso en la sección Claves de acceso.
  7. Selecciona Servicio de terceros como el Caso de uso.
  8. Haz clic en Siguiente.
  9. Opcional: Agrega una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. Haz clic en Agregar permisos en la sección Políticas de permisos.
  15. Selecciona Agregar permisos.
  16. Selecciona Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Agregar permisos.

Configura la política y el rol de IAM para las cargas de S3

  1. En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
  2. Ingresa la siguiente política (incluye acceso de escritura para todos los objetos del bucket y acceso de lectura al archivo de estado que usa tu función Lambda):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "PutAllSnykGroupObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject", "s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        }
      ]
    }
    
    • Reemplaza snyk-group-logs si ingresaste un nombre de bucket diferente.
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunta la política recién creada.

  6. Asigna el nombre WriteSnykGroupToS3Role al rol y haz clic en Crear rol.

Crea la función Lambda

  1. En la consola de AWS, ve a Lambda > Functions > Create function.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:
Configuración Valor
Nombre snyk_group_audit_issues_to_s3
Tiempo de ejecución Python 3.13
Arquitectura x86_64
Rol de ejecución WriteSnykGroupToS3Role
  1. Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (snyk_group_audit_issues_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.parse import urlparse, parse_qs
    from urllib.error import HTTPError
    import boto3
    
    API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/")
    SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip()
    GROUP_ID = os.environ["GROUP_ID"].strip()
    
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip()
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip()
    
    # Page sizes & limits
    AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100"))       # audit uses 'size' (max 100)
    ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200"))   # issues uses 'limit'
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    
    # API versions (Snyk REST requires a 'version' param)
    AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip()
    ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip()
    
    # First-run lookback for audit to avoid huge backfills
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    HDRS = {
        "Authorization": f"token {SNYK_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    s3 = boto3.client("s3")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read() or b"{}")
        except Exception:
            return {}
    
    def _put_state(state: dict):
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _write_page(kind: str, payload: dict) -> str:
        ts = time.gmtime()
        key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _next_href(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        if isinstance(nxt, str):
            return nxt
        if isinstance(nxt, dict):
            return nxt.get("href")
        return None
    
    # -------- Audit Logs --------
    
    def pull_audit_logs(state: dict) -> dict:
        cursor = state.get("audit_cursor")
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE}
    
        if cursor:
            params["cursor"] = cursor
        else:
            now = time.time()
            params["from"] = _iso(now - LOOKBACK_SECONDS)
            params["to"] = _iso(now)
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("audit", payload)
    
            data_items = (payload.get("data") or {}).get("items") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("cursor") or [None])[0]
            if not cur:
                break
    
            params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur}
            state["audit_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")}
    
    # -------- Issues --------
    
    def pull_issues(state: dict) -> dict:
        cursor = state.get("issues_cursor")  # stores 'starting_after'
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues"
        params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT}
        if cursor:
            params["starting_after"] = cursor
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("issues", payload)
    
            data_items = payload.get("data") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("starting_after") or [None])[0]
            if not cur:
                break
    
            params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur}
            state["issues_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        audit_res = pull_audit_logs(state)
        issues_res = pull_issues(state)
        _put_state(state)
        return {"ok": True, "audit": audit_res, "issues": issues_res}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. Ve a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.

  3. Ingresa las siguientes variables de entorno y reemplázalas por tus valores:

    Clave Ejemplo
    S3_BUCKET snyk-group-logs
    S3_PREFIX snyk/group/
    STATE_KEY snyk/group/state.json
    SNYK_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxx
    GROUP_ID <group_uuid>
    API_BASE https://api.snyk.io
    SNYK_AUDIT_API_VERSION 2021-06-04
    SNYK_ISSUES_API_VERSION 2024-10-15
    AUDIT_PAGE_SIZE 100
    ISSUES_PAGE_LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  4. Después de crear la función, permanece en su página (o abre Lambda > Functions > <your-function>).

  5. Selecciona la pestaña Configuración.

  6. En el panel Configuración general, haz clic en Editar.

  7. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crea una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule.
  2. Proporciona los siguientes detalles de configuración:
    • Programación recurrente: Frecuencia (1 hour).
    • Destino: Tu función Lambda snyk_group_audit_issues_to_s3.
    • Nombre: snyk-group-audit-issues-1h.
  3. Haz clic en Crear programación.

Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps

  1. En la consola de AWS, ve a IAM > Usuarios > Agregar usuarios.
  2. Haz clic en Agregar usuarios.
  3. Proporciona los siguientes detalles de configuración:
    • Usuario: secops-reader.
    • Tipo de acceso: Clave de acceso: Acceso programático
  4. Haz clic en Crear usuario.
  5. Adjunta la política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política.
  6. En el editor de JSON, ingresa la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::snyk-group-logs"
        }
      ]
    }
    
  7. Configura el nombre como secops-reader-policy.

  8. Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.

  9. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  10. Descarga el archivo CSV (estos valores se ingresan en el feed).

Configura un feed en Google SecOps para transferir los registros de auditoría y de problemas a nivel del grupo de Snyk

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Agregar feed nuevo.
  3. En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo, Snyk Group Audit/Issues).
  4. Selecciona Amazon S3 V2 como el Tipo de fuente.
  5. Selecciona Snyk Group level audit/issues logs como el Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifica valores para los siguientes parámetros de entrada:
    • URI de S3: s3://snyk-group-logs/snyk/group/
    • Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
    • Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
    • ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
    • Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres del activo: snyk.group
    • Etiquetas de transferencia: Agrégalas si lo deseas.
  8. Haz clic en Siguiente.
  9. Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.