Recolha registos de auditoria e problemas ao nível do grupo do Snyk

Compatível com:

Este guia explica como pode carregar registos de auditoria e problemas ao nível do grupo do Snyk para o Google Security Operations através do Amazon S3.

Antes de começar

Certifique-se de que tem os seguintes pré-requisitos:

  • Instância do Google SecOps
  • Acesso privilegiado ao grupo Snyk (token da API com acesso de leitura; ID do grupo)
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Obtenha o ID do grupo e a chave da API da Snyk

  1. Na IU do Snyk, aceda a Definições da conta > Token de API e gere o token de API.
  2. Copie e guarde a chave num local seguro para usar mais tarde como SNYK_TOKEN.
  3. Mude para o grupo e abra as Definições do grupo.
  4. Copie e guarde o ID do grupo do URL (https://app.snyk.io/group/<GROUP_ID>/...) para usar mais tarde como GROUP_ID.
  5. Ponto final da API base: https://api.snyk.io (substitua por API_BASE, se necessário).
  6. Atribua a função de administrador do grupo ao utilizador com o token. (O utilizador tem de conseguir ver os Registos de auditoria de grupos e os Problemas do grupo).

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, snyk-group-logs).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
  2. Introduza a seguinte política (inclui acesso de escrita para todos os objetos no contentor e acesso de leitura ao ficheiro de estado que o Lambda usa):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "PutAllSnykGroupObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject", "s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        }
      ]
    }
    
    • Substitua snyk-group-logs se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política criada recentemente.

  6. Dê o nome WriteSnykGroupToS3Role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:
Definição Valor
Nome snyk_group_audit_issues_to_s3
Runtime Python 3.13
Arquitetura x86_64
Função de execução WriteSnykGroupToS3Role
  1. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (snyk_group_audit_issues_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.parse import urlparse, parse_qs
    from urllib.error import HTTPError
    import boto3
    
    API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/")
    SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip()
    GROUP_ID = os.environ["GROUP_ID"].strip()
    
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip()
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip()
    
    # Page sizes & limits
    AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100"))       # audit uses 'size' (max 100)
    ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200"))   # issues uses 'limit'
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    
    # API versions (Snyk REST requires a 'version' param)
    AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip()
    ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip()
    
    # First-run lookback for audit to avoid huge backfills
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    HDRS = {
        "Authorization": f"token {SNYK_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    s3 = boto3.client("s3")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read() or b"{}")
        except Exception:
            return {}
    
    def _put_state(state: dict):
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _write_page(kind: str, payload: dict) -> str:
        ts = time.gmtime()
        key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _next_href(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        if isinstance(nxt, str):
            return nxt
        if isinstance(nxt, dict):
            return nxt.get("href")
        return None
    
    # -------- Audit Logs --------
    
    def pull_audit_logs(state: dict) -> dict:
        cursor = state.get("audit_cursor")
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE}
    
        if cursor:
            params["cursor"] = cursor
        else:
            now = time.time()
            params["from"] = _iso(now - LOOKBACK_SECONDS)
            params["to"] = _iso(now)
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("audit", payload)
    
            data_items = (payload.get("data") or {}).get("items") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("cursor") or [None])[0]
            if not cur:
                break
    
            params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur}
            state["audit_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")}
    
    # -------- Issues --------
    
    def pull_issues(state: dict) -> dict:
        cursor = state.get("issues_cursor")  # stores 'starting_after'
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues"
        params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT}
        if cursor:
            params["starting_after"] = cursor
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("issues", payload)
    
            data_items = payload.get("data") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("starting_after") or [None])[0]
            if not cur:
                break
    
            params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur}
            state["issues_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        audit_res = pull_audit_logs(state)
        issues_res = pull_issues(state)
        _put_state(state)
        return {"ok": True, "audit": audit_res, "issues": issues_res}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  3. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:

    Chave Exemplo
    S3_BUCKET snyk-group-logs
    S3_PREFIX snyk/group/
    STATE_KEY snyk/group/state.json
    SNYK_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxx
    GROUP_ID <group_uuid>
    API_BASE https://api.snyk.io
    SNYK_AUDIT_API_VERSION 2021-06-04
    SNYK_ISSUES_API_VERSION 2024-10-15
    AUDIT_PAGE_SIZE 100
    ISSUES_PAGE_LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  4. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > <your-function>).

  5. Selecione o separador Configuração.

  6. No painel Configuração geral, clique em Editar.

  7. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Destino: a sua função Lambda snyk_group_audit_issues_to_s3.
    • Nome: snyk-group-audit-issues-1h.
  3. Clique em Criar programação.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: secops-reader.
    • Tipo de acesso: chave de acesso – acesso programático.
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::snyk-group-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar os registos de auditoria e problemas ao nível do grupo do Snyk

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Snyk Group Audit/Issues).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Registos de auditoria/problemas ao nível do grupo do Snyk como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://snyk-group-logs/snyk/group/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes de recursos: snyk.group
    • Etiquetas de carregamento: adicione-as, se quiser.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.