Recolha ficheiros IOC personalizados CSV
Este documento explica como carregar ficheiros IOC personalizados CSV para o Google Security Operations através do Amazon S3. Em seguida, mapeia estes campos para o UDM, processando vários tipos de dados, como IPs, domínios e hashes, e enriquecendo o resultado com detalhes de ameaças, informações de entidades e níveis de gravidade.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
- Acesso a um ou mais URLs de feeds de IOCs CSV (HTTPS) ou a um ponto final interno que publique CSV
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
csv-ioc
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Aceda a AWS console > IAM > Policies > Create policy > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutCsvIocObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::csv-ioc/*" } ] }
- Substitua
csv-ioc
se tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
WriteCsvIocToS3Role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome csv_custom_ioc_to_s3
Runtime Python 3.13 Arquitetura x86_64 Função de execução WriteCsvIocToS3Role
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
csv_custom_ioc_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform) # - Multiple URLs (comma-separated) # - Optional auth header # - Retries for 429/5xx # - Unique filenames per page # - Sets ContentType=text/csv import os, time, json from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/") IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()] AUTH_HEADER = os.environ.get("AUTH_HEADER", "") # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>" TIMEOUT = int(os.environ.get("TIMEOUT", "60")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed in IOC_URLS") req = Request(url, method="GET") # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization if AUTH_HEADER: if ":" in AUTH_HEADER: k, v = AUTH_HEADER.split(":", 1) req.add_header(k.strip(), v.strip()) else: req.add_header("Authorization", AUTH_HEADER.strip()) req.add_header("Accept", "text/csv, */*") return req def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes: attempt, backoff = 0, 1.0 while True: try: with urlopen(req, timeout=timeout) as r: return r.read() except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _safe_name(url: str) -> str: # Create a short, filesystem-safe token for the URL return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100] def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str: key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv" s3.put_object( Bucket=BUCKET, Key=key, Body=blob, ContentType="text/csv", ) return key def lambda_handler(event=None, context=None): assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL" run_ts = int(time.time()) written = [] for i, url in enumerate(IOC_URLS): req = _build_request(url) data = _http_bytes(req) key = _put_csv(data, url, run_ts, i) written.append({"url": url, "s3_key": key, "bytes": len(data)}) return {"ok": True, "written": written} if __name__ == "__main__": print(json.dumps(lambda_handler(), indent=2))
Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:
Chave Exemplo S3_BUCKET
csv-ioc
S3_PREFIX
csv-ioc/
IOC_URLS
https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
AUTH_HEADER
Authorization: Bearer <token>
TIMEOUT
60
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda.
- Nome:
csv-custom-ioc-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Users e, de seguida, clique em Add users.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza um nome único (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso – Acesso programático
- Clique em Criar utilizador.
- Utilizador: introduza um nome único (por exemplo,
- Anexe a política de leitura mínima (personalizada): Utilizadores > selecione
secops-reader
> Autorizações > Adicionar autorizações > Anexar políticas diretamente > Criar política No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Defina o nome como
secops-reader-policy
.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar ficheiros CSV de IOCs personalizados
- Aceda a Definições do SIEM > Feeds.
- Clique em Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
CSV Custom IOC
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione IOC personalizado CSV como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://csv-ioc/csv-ioc/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: predefinição de 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta a aplicar aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
Campo de registo | Mapeamento de UDM | Lógica |
---|---|---|
asn |
entity.metadata.threat.detection_fields.asn_label.value | Mapeado diretamente a partir do campo "asn". |
category |
entity.metadata.threat.category_details | Mapeado diretamente a partir do campo "category". |
classification |
entity.metadata.threat.category_details | Anexado a "classification - " e mapeado para o campo "entity.metadata.threat.category_details". |
column2 |
entity.entity.hostname | Mapeado para "entity.entity.hostname" se [category] corresponder a ".?ip" ou ".?proxy" e [not_ip] for verdadeiro. |
column2 |
entity.entity.ip | Unido em "entity.entity.ip" se [category] corresponder a ".?ip" ou ".?proxy" e [not_ip] for falso. |
confidence |
entity.metadata.threat.confidence_score | Convertido em número de vírgula flutuante e mapeado para o campo "entity.metadata.threat.confidence_score". |
country |
entity.entity.location.country_or_region | Mapeado diretamente a partir do campo "country". |
date_first |
entity.metadata.threat.first_discovered_time | Analisado como ISO8601 e mapeado para o campo "entity.metadata.threat.first_discovered_time". |
date_last |
entity.metadata.threat.last_updated_time | Analisado como ISO8601 e mapeado para o campo "entity.metadata.threat.last_updated_time". |
detail |
entity.metadata.threat.summary | Mapeado diretamente a partir do campo "detail". |
detail2 |
entity.metadata.threat.description | Mapeado diretamente a partir do campo "detail2". |
domain |
entity.entity.hostname | Mapeado diretamente a partir do campo "domain". |
email |
entity.entity.user.email_addresses | Unido no campo "entity.entity.user.email_addresses". |
id |
entity.metadata.product_entity_id | Anexado a "id - " e mapeado para o campo "entity.metadata.product_entity_id". |
import_session_id |
entity.metadata.threat.detection_fields.import_session_id_label.value | Mapeado diretamente a partir do campo "import_session_id". |
itype |
entity.metadata.threat.detection_fields.itype_label.value | Mapeado diretamente a partir do campo "itype". |
lat |
entity.entity.location.region_latitude | Convertido em flutuante e mapeado para o campo "entity.entity.location.region_latitude". |
lon |
entity.entity.location.region_longitude | Convertido em float e mapeado para o campo "entity.entity.location.region_longitude". |
maltype |
entity.metadata.threat.detection_fields.maltype_label.value | Mapeado diretamente a partir do campo "maltype". |
md5 |
entity.entity.file.md5 | Mapeado diretamente a partir do campo "md5". |
media |
entity.metadata.threat.detection_fields.media_label.value | Mapeado diretamente a partir do campo "media". |
media_type |
entity.metadata.threat.detection_fields.media_type_label.value | Mapeado diretamente a partir do campo "media_type". |
org |
entity.metadata.threat.detection_fields.org_label.value | Mapeado diretamente a partir do campo "org". |
resource_uri |
entity.entity.url | Mapeado para "entity.entity.url" se [itype] não corresponder a "(ip |
resource_uri |
entity.metadata.threat.url_back_to_product | Mapeado para "entity.metadata.threat.url_back_to_product" se [itype] corresponder a "(ip |
score |
entity.metadata.threat.confidence_details | Mapeado diretamente a partir do campo "score". |
severity |
entity.metadata.threat.severity | Convertido para letras maiúsculas e mapeado para o campo "entity.metadata.threat.severity" se corresponder a "LOW", "MEDIUM", "HIGH" ou "CRITICAL". |
source |
entity.metadata.threat.detection_fields.source_label.value | Mapeado diretamente a partir do campo "source". |
source_feed_id |
entity.metadata.threat.detection_fields.source_feed_id_label.value | Mapeado diretamente a partir do campo "source_feed_id". |
srcip |
entity.entity.ip | Unido em "entity.entity.ip" se [srcip] não estiver vazio e não for igual a [value]. |
state |
entity.metadata.threat.detection_fields.state_label.value | Mapeado diretamente a partir do campo "state". |
trusted_circle_ids |
entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | Mapeado diretamente a partir do campo "trusted_circle_ids". |
update_id |
entity.metadata.threat.detection_fields.update_id_label.value | Mapeado diretamente a partir do campo "update_id". |
value |
entity.entity.file.full_path | Mapeado para "entity.entity.file.full_path" se [category] corresponder a ".*?file". |
value |
entity.entity.file.md5 | Mapeado para "entity.entity.file.md5" se [category] corresponder a ".*?md5" e [value] for uma string hexadecimal de 32 carateres. |
value |
entity.entity.file.sha1 | Mapeado para "entity.entity.file.sha1" se ([category] corresponder a ".?md5" e [value] for uma string hexadecimal de 40 carateres) ou ([category] corresponder a ".?sha1" e [value] for uma string hexadecimal de 40 carateres). |
value |
entity.entity.file.sha256 | Mapeado para "entity.entity.file.sha256" se ([category] corresponder a ".?md5" e [value] for uma string hexadecimal e [file_type] não for "md5") ou ([category] corresponder a ".?sha256" e [value] for uma string hexadecimal). |
value |
entity.entity.hostname | Mapeado para "entity.entity.hostname" se ([category] corresponder a ".?domain") ou ([category] corresponder a ".?ip" ou ".*?proxy" e [not_ip] for verdadeiro). |
value |
entity.entity.url | Mapeado para "entity.entity.url" se ([category] corresponder a ".*?url") ou ([category] corresponder a "url" e [resource_uri] não estiver vazio). |
N/A | entity.metadata.collected_timestamp | Preenchida com a data/hora do evento. |
N/A | entity.metadata.interval.end_time | Definido para um valor constante de 253402300799 segundos. |
N/A | entity.metadata.interval.start_time | Preenchida com a data/hora do evento. |
N/A | entity.metadata.vendor_name | Definido como um valor constante de "IOC personalizado". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.