Recolha registos de auditoria ao nível do grupo do Snyk
Este documento explica como carregar registos de auditoria ao nível do grupo do Snyk para o Google Security Operations através do Amazon S3. O analisador primeiro limpa os campos desnecessários dos registos não processados. Em seguida, extrai informações relevantes, como detalhes do utilizador, tipo de evento e datas/horas, transformando-as e mapeando-as para o esquema UDM do Google SecOps para uma representação padronizada do registo de segurança.
Antes de começar
Certifique-se de que tem os seguintes pré-requisitos:
- Instância do Google SecOps
- Acesso privilegiado ao Snyk (administrador do grupo) e um token da API com acesso ao grupo
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Recolha os pré-requisitos dos registos de auditoria ao nível do grupo do Snyk (IDs, chaves da API, IDs da organização e tokens)
- No Snyk, clique no seu avatar > Definições da conta > Token da API.
- Clique em Revogar e voltar a gerar (ou Gerar) e copie o token.
- Guarde este token como a variável de ambiente
SNYK_API_TOKEN
.
- No Snyk, mude para o seu grupo (comutador na parte superior esquerda).
- Aceda a Definições do grupo. Copie o
<GROUP_ID>
do URL:https://app.snyk.io/group/<GROUP_ID>/settings
. - Em alternativa, use a API REST:
GET https://api.snyk.io/rest/groups?version=2021-06-04
e escolhaid
.
- Aceda a Definições do grupo. Copie o
- Certifique-se de que o utilizador do token tem a autorização Ver registos de auditoria (group.audit.read).
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
snyk-audit
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
WriteSnykAuditToS3Role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
- Faculte os seguintes detalhes de configuração:
Definição | Valor |
---|---|
Nome | snyk_group_audit_to_s3 |
Runtime | Python 3.13 |
Arquitetura | x86_64 |
Função de execução | WriteSnykAuditToS3Role |
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
Adicione variáveis de ambiente
- Aceda a Configuração > Variáveis de ambiente.
- Clique em Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:
Chave Exemplo S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(opcional)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(opcional) group.create,org.user.add
EXCLUDE_EVENTS
(opcional) api.access
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda.
- Nome:
snyk-group-audit-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador:
secops-reader
. - Tipo de acesso: chave de acesso – acesso programático.
- Utilizador:
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
Defina o nome como
secops-reader-policy
.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos de auditoria ao nível do grupo do Snyk
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Snyk Group Audit Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Registos de auditoria ao nível do grupo do Snyk como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://snyk-audit/snyk/audit/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes de recursos:
snyk.group_audit
- Etiquetas de carregamento: adicione-as, se quiser.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
Campo de registo | Mapeamento de UDM | Lógica |
---|---|---|
content.url | principal.url | Mapeado diretamente a partir do campo content.url no registo não processado. |
criado | metadata.event_timestamp | Analisado a partir do campo created no registo não processado através do formato ISO8601. |
evento | metadata.product_event_type | Mapeado diretamente a partir do campo event no registo não processado. |
groupId | principal.user.group_identifiers | Mapeado diretamente a partir do campo groupId no registo não processado. |
orgId | principal.user.attribute.labels.key | Definido como "orgId". |
orgId | principal.user.attribute.labels.value | Mapeado diretamente a partir do campo orgId no registo não processado. |
userId | principal.user.userid | Mapeado diretamente a partir do campo userId no registo não processado. |
N/A | metadata.event_type | Codificado de forma rígida como "USER_UNCATEGORIZED" no código do analisador. |
N/A | metadata.log_type | Codificado como "SNYK_SDLC" no código do analisador. |
N/A | metadata.product_name | Codificado como "SNYK SDLC" no código do analisador. |
N/A | metadata.vendor_name | Codificado como "SNYK_SDLC" no código do analisador. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.