Recolha registos do Digital Shadows SearchLight
Este documento explica como carregar registos do Digital Shadows SearchLight para o Google Security Operations através do Amazon S3. O analisador extrai dados de eventos de segurança dos registos JSON. Inicializa os campos do modelo de dados unificado (UDM), analisa a carga útil JSON, mapeia os campos relevantes para o esquema do UDM, extrai entidades como o email e o nome do anfitrião através de padrões grok e cria os objetos security_result
e metadata
no evento do UDM.
Antes de começar
Certifique-se de que cumpre os seguintes pré-requisitos:
- Uma instância do Google SecOps.
- Acesso privilegiado ao inquilino do Digital Shadows SearchLight.
- Acesso privilegiado à AWS (S3, Identity and Access Management [IAM], Lambda e EventBridge).
Recolha os pré-requisitos do Digital Shadows SearchLight (IDs, chaves da API, IDs da organização, tokens)
- Inicie sessão no portal do Digital Shadows SearchLight.
- Aceda a Definições > Credenciais da API.
- Crie um novo cliente da API ou um par de chaves.
- Copie e guarde numa localização segura os seguintes detalhes:
- Chave da API
- API Secret
- ID da conta
- URL base da API:
https://api.searchlight.app/v1
ouhttps://portal-digitalshadows.com/api/v1
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este guia do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
digital-shadows-logs
). - Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro .CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas.
- Clique em Criar política > separador JSON.
- Copie e cole a seguinte política.
JSON da política (substitua
digital-shadows-logs
se tiver introduzido um nome de contentor diferente):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows-searchlight/state.json" } ] }
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
digital-shadows-lambda-role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome digital-shadows-collector
Runtime Python 3.13 Arquitetura x86_64 Função de execução digital-shadows-lambda-role
Depois de criar a função, abra o separador Código, elimine o fragmento e cole o seguinte código (
digital-shadows-collector.py
).import json import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode import boto3 import urllib3 logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(s3, bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: obj = s3.get_object(Bucket=bucket, Key=key) state = json.loads(obj["Body"].read().decode("utf-8")) ts = state.get("last_timestamp") if ts: return ts except s3.exceptions.NoSuchKey: pass except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(s3, bucket, key, ts: str) -> None: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}).encode("utf-8"), ContentType="application/json", ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): # Required s3_bucket = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] # Optional / defaults s3_prefix = os.environ.get("S3_PREFIX", "digital-shadows-searchlight/") state_key = os.environ.get("STATE_KEY", "digital-shadows-searchlight/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) s3 = boto3.client("s3") last_ts = _load_state(s3, s3_bucket, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] # Incidents (time filter often 'published-after' or 'updated-since' depending on tenancy) incidents = _collect(api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after") for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) # Intelligence incidents (alerts) intel_incidents = _collect(api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after") for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) # Indicators (IOCs) indicators = _collect(api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after") for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: # Choose newest timestamp seen in this batch newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{s3_prefix}digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records).encode("utf-8") s3.put_object( Bucket=s3_bucket, Key=key, Body=body, ContentType="application/x-ndjson", ) _save_state(s3, s3_bucket, state_key, newest) msg = f"Wrote {len(records)} records to s3://{s3_bucket}/{key}" else: msg = "No new records" logger.info(msg) return {"statusCode": 200, "body": msg}
Aceda a Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Introduza as variáveis de ambiente fornecidas na tabela seguinte, substituindo os valores de exemplo pelos seus valores.
Variáveis de ambiente
Chave Valor de exemplo S3_BUCKET
digital-shadows-logs
S3_PREFIX
digital-shadows-searchlight/
STATE_KEY
digital-shadows-searchlight/state.json
DS_API_KEY
<your-6-character-api-key>
DS_API_SECRET
<your-32-character-api-secret>
API_BASE
https://api.searchlight.app/v1
(ouhttps://portal-digitalshadows.com/api/v1
)DS_ACCOUNT_ID
<your-account-id>
(obrigatório para a maioria dos inquilinos)PAGE_SIZE
100
MAX_PAGES
10
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > a sua função).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda
digital-shadows-collector
. - Nome:
digital-shadows-collector-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
(Opcional) Crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Aceda a AWS Console > IAM > Users.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza
secops-reader
. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Utilizador: introduza
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::digital-shadows-logs" } ] }
Nome =
secops-reader-policy
.Clique em Criar política > procure/selecione > Seguinte > Adicionar autorizações.
Crie uma chave de acesso para
secops-reader
: Credenciais de segurança > Chaves de acesso.Clique em Criar chave de acesso.
Transfira o
.CSV
. (Vai colar estes valores no feed).
Configure um feed no Google SecOps para carregar registos do Digital Shadows SearchLight
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Digital Shadows SearchLight logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Digital Shadows SearchLight como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://digital-shadows-logs/digital-shadows-searchlight/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.