Coletar registros de auditoria e problemas no nível do grupo do Snyk

Compatível com:

Este guia explica como ingerir registros de auditoria e problemas no nível do grupo do Snyk no Google Security Operations usando o Amazon S3.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Instância do Google SecOps
  • Acesso privilegiado ao grupo Snyk (token de API com acesso de leitura; ID do grupo)
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Receber o ID do grupo e o token da API do Snyk

  1. Na UI do Snyk, acesse Configurações da conta > Token da API e gere o token da API.
  2. Copie e salve o token em um local seguro para usar mais tarde como SNYK_TOKEN.
  3. Mude para seu Grupo e abra Configurações do grupo.
  4. Copie e salve o ID do grupo do URL (https://app.snyk.io/group/<GROUP_ID>/...) para usar depois como GROUP_ID.
  5. Endpoint de API de base: https://api.snyk.io (substitua por API_BASE se necessário).
  6. Atribua a função Administrador do grupo ao usuário com o token. O usuário precisa ter acesso aos Registros de auditoria de grupo e aos Problemas do grupo.

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, snyk-group-logs).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clique em Próxima.
  19. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
  2. Insira a seguinte política (inclui acesso de gravação para todos os objetos no bucket e acesso de leitura ao arquivo de estado usado pela sua função do Lambda):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "PutAllSnykGroupObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject", "s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        }
      ]
    }
    
    • Substitua snyk-group-logs se você tiver inserido um nome de bucket diferente.
  3. Clique em Próxima > Criar política.

  4. Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.

  5. Anexe a política recém-criada.

  6. Nomeie a função como WriteSnykGroupToS3Role e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:
Configuração Valor
Nome snyk_group_audit_issues_to_s3
Ambiente de execução Python 3.13
Arquitetura x86_64
Função de execução WriteSnykGroupToS3Role
  1. Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (snyk_group_audit_issues_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.parse import urlparse, parse_qs
    from urllib.error import HTTPError
    import boto3
    
    API_BASE = os.environ.get("API_BASE", "https://api.snyk.io").rstrip("/")
    SNYK_TOKEN = os.environ["SNYK_TOKEN"].strip()
    GROUP_ID = os.environ["GROUP_ID"].strip()
    
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/group/").strip()
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/group/state.json").strip()
    
    # Page sizes & limits
    AUDIT_SIZE = int(os.environ.get("AUDIT_PAGE_SIZE", "100"))       # audit uses 'size' (max 100)
    ISSUES_LIMIT = int(os.environ.get("ISSUES_PAGE_LIMIT", "200"))   # issues uses 'limit'
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    
    # API versions (Snyk REST requires a 'version' param)
    AUDIT_API_VERSION = os.environ.get("SNYK_AUDIT_API_VERSION", "2021-06-04").strip()
    ISSUES_API_VERSION = os.environ.get("SNYK_ISSUES_API_VERSION", "2024-10-15").strip()
    
    # First-run lookback for audit to avoid huge backfills
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    HDRS = {
        "Authorization": f"token {SNYK_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    s3 = boto3.client("s3")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read() or b"{}")
        except Exception:
            return {}
    
    def _put_state(state: dict):
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _write_page(kind: str, payload: dict) -> str:
        ts = time.gmtime()
        key = f"{PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _next_href(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        if isinstance(nxt, str):
            return nxt
        if isinstance(nxt, dict):
            return nxt.get("href")
        return None
    
    # -------- Audit Logs --------
    
    def pull_audit_logs(state: dict) -> dict:
        cursor = state.get("audit_cursor")
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE}
    
        if cursor:
            params["cursor"] = cursor
        else:
            now = time.time()
            params["from"] = _iso(now - LOOKBACK_SECONDS)
            params["to"] = _iso(now)
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("audit", payload)
    
            data_items = (payload.get("data") or {}).get("items") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("cursor") or [None])[0]
            if not cur:
                break
    
            params = {"version": AUDIT_API_VERSION, "size": AUDIT_SIZE, "cursor": cur}
            state["audit_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("audit_cursor")}
    
    # -------- Issues --------
    
    def pull_issues(state: dict) -> dict:
        cursor = state.get("issues_cursor")  # stores 'starting_after'
        pages = 0
        total = 0
    
        base = f"{API_BASE}/rest/groups/{GROUP_ID}/issues"
        params: dict[str, object] = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT}
        if cursor:
            params["starting_after"] = cursor
    
        while pages < MAX_PAGES:
            url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
            payload = _http_get(url)
            _write_page("issues", payload)
    
            data_items = payload.get("data") or []
            if isinstance(data_items, list):
                total += len(data_items)
    
            nxt = _next_href(payload.get("links"))
            if not nxt:
                break
            q = parse_qs(urlparse(nxt).query)
            cur = (q.get("starting_after") or [None])[0]
            if not cur:
                break
    
            params = {"version": ISSUES_API_VERSION, "limit": ISSUES_LIMIT, "starting_after": cur}
            state["issues_cursor"] = cur
            pages += 1
    
        return {"pages": pages + 1 if total else pages, "items": total, "cursor": state.get("issues_cursor")}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        audit_res = pull_audit_logs(state)
        issues_res = pull_issues(state)
        _put_state(state)
        return {"ok": True, "audit": audit_res, "issues": issues_res}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  3. Insira as seguintes variáveis de ambiente, substituindo pelos seus valores:

    Chave Exemplo
    S3_BUCKET snyk-group-logs
    S3_PREFIX snyk/group/
    STATE_KEY snyk/group/state.json
    SNYK_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxx
    GROUP_ID <group_uuid>
    API_BASE https://api.snyk.io
    SNYK_AUDIT_API_VERSION 2021-06-04
    SNYK_ISSUES_API_VERSION 2024-10-15
    AUDIT_PAGE_SIZE 100
    ISSUES_PAGE_LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  4. Depois que a função for criada, permaneça na página dela ou abra Lambda > Funções > <your-function>.

  5. Selecione a guia Configuração.

  6. No painel Configuração geral, clique em Editar.

  7. Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour).
    • Destino: sua função Lambda snyk_group_audit_issues_to_s3.
    • Nome: snyk-group-audit-issues-1h.
  3. Clique em Criar programação.

Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps

  1. No console da AWS, acesse IAM > Usuários > Adicionar usuários.
  2. Clique em Add users.
  3. Informe os seguintes detalhes de configuração:
    • Usuário: secops-reader.
    • Tipo de acesso: Chave de acesso — Acesso programático.
  4. Clique em Criar usuário.
  5. Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
  6. No editor JSON, insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::snyk-group-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::snyk-group-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.

  9. Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Faça o download do CSV (esses valores são inseridos no feed).

Configurar um feed no Google SecOps para ingerir os registros de auditoria e problemas no nível do grupo do Snyk

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, insira um nome para o feed (por exemplo, Snyk Group Audit/Issues).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Registros de auditoria/problemas no nível do grupo do Snyk como o Tipo de registro.
  6. Clique em Próxima.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://snyk-group-logs/snyk/group/
    • Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: snyk.group
    • Rótulos de ingestão: adicione se quiser.
  8. Clique em Próxima.
  9. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.