Coletar registros de auditoria no nível do grupo do Snyk
Este documento explica como ingerir registros de auditoria no nível do grupo do Snyk no Google Security Operations usando o Amazon S3. Primeiro, o analisador limpa os campos desnecessários dos registros brutos. Em seguida, ele extrai informações relevantes, como detalhes do usuário, tipo de evento e carimbos de data/hora, transformando e mapeando esses dados no esquema UDM do Google SecOps para representação padronizada de registros de segurança.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Instância do Google SecOps
- Acesso privilegiado ao Snyk (administrador do grupo) e um token da API com acesso ao grupo
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Coletar pré-requisitos de registros de auditoria no nível do grupo do Snyk (IDs, chaves de API, IDs de organização, tokens)
- No Snyk, clique no seu avatar > Configurações da conta > Token da API.
- Clique em Revogar e gerar novamente (ou Gerar) e copie o token.
- Salve esse token como a variável de ambiente
SNYK_API_TOKEN
.
- No Snyk, mude para seu grupo (seletor no canto superior esquerdo).
- Acesse Configurações do grupo. Copie o
<GROUP_ID>
do URL:https://app.snyk.io/group/<GROUP_ID>/settings
. - Ou use a API REST:
GET https://api.snyk.io/rest/groups?version=2021-06-04
e escolha oid
.
- Acesse Configurações do grupo. Copie o
- Verifique se o usuário do token tem a permissão Ler registros de auditoria (group.audit.read).
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
snyk-audit
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada.
Nomeie a função como
WriteSnykAuditToS3Role
e clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
- Informe os seguintes detalhes de configuração:
Configuração | Valor |
---|---|
Nome | snyk_group_audit_to_s3 |
Ambiente de execução | Python 3.13 |
Arquitetura | x86_64 |
Função de execução | WriteSnykAuditToS3Role |
Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
Adicionar variáveis de ambiente
- Acesse Configuração > Variáveis de ambiente.
- Clique em Editar > Adicionar nova variável de ambiente.
Insira as seguintes variáveis de ambiente, substituindo pelos seus valores:
Chave Exemplo S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(opcional)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(opcional) group.create,org.user.add
EXCLUDE_EVENTS
(opcional) api.access
Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour
). - Destino: sua função Lambda.
- Nome:
snyk-group-audit-1h
.
- Programação recorrente: Taxa (
- Clique em Criar programação.
Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários > Adicionar usuários.
- Clique em Add users.
- Informe os seguintes detalhes de configuração:
- Usuário:
secops-reader
. - Tipo de acesso: Chave de acesso — Acesso programático.
- Usuário:
- Clique em Criar usuário.
- Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
Defina o nome como
secops-reader-policy
.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configure um feed no Google SecOps para ingerir registros de auditoria no nível do grupo do Snyk
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Snyk Group Audit Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Registros de auditoria no nível do grupo do Snyk como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://snyk-audit/snyk/audit/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso:
snyk.group_audit
- Rótulos de ingestão: adicione se quiser.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
content.url | principal.url | Mapeado diretamente do campo content.url no registro bruto. |
created | metadata.event_timestamp | Analisado do campo created no registro bruto usando o formato ISO8601. |
evento | metadata.product_event_type | Mapeado diretamente do campo event no registro bruto. |
groupId | principal.user.group_identifiers | Mapeado diretamente do campo groupId no registro bruto. |
orgId | principal.user.attribute.labels.key | Defina como "orgId". |
orgId | principal.user.attribute.labels.value | Mapeado diretamente do campo orgId no registro bruto. |
userId | principal.user.userid | Mapeado diretamente do campo userId no registro bruto. |
N/A | metadata.event_type | Codificado como "USER_UNCATEGORIZED" no código do analisador. |
N/A | metadata.log_type | Fixado no código do analisador como "SNYK_SDLC". |
N/A | metadata.product_name | Fixado no código do analisador como "SNYK SDLC". |
N/A | metadata.vendor_name | Fixado no código do analisador como "SNYK_SDLC". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.