Raccogliere file CSV personalizzati di indicatori di compromissione
Questo documento spiega come importare file CSV di indicatori di compromissione personalizzati in Google Security Operations utilizzando Amazon S3. Quindi, mappa questi campi all'UDM, gestendo vari tipi di dati come IP, domini e hash e arricchendo l'output con dettagli sulle minacce, informazioni sulle entità e livelli di gravità.
Prima di iniziare
- Istanza Google SecOps
- Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)
- Accesso a uno o più URL feed IOC CSV (HTTPS) o a un endpoint interno che pubblica CSV
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
csv-ioc
). - Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Vai alla console AWS > IAM > Policy > Crea policy > scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutCsvIocObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::csv-ioc/*" } ] }
- Sostituisci
csv-ioc
se hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea policy.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
WriteCsvIocToS3Role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome csv_custom_ioc_to_s3
Tempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione WriteCsvIocToS3Role
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
csv_custom_ioc_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform) # - Multiple URLs (comma-separated) # - Optional auth header # - Retries for 429/5xx # - Unique filenames per page # - Sets ContentType=text/csv import os, time, json from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/") IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()] AUTH_HEADER = os.environ.get("AUTH_HEADER", "") # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>" TIMEOUT = int(os.environ.get("TIMEOUT", "60")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed in IOC_URLS") req = Request(url, method="GET") # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization if AUTH_HEADER: if ":" in AUTH_HEADER: k, v = AUTH_HEADER.split(":", 1) req.add_header(k.strip(), v.strip()) else: req.add_header("Authorization", AUTH_HEADER.strip()) req.add_header("Accept", "text/csv, */*") return req def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes: attempt, backoff = 0, 1.0 while True: try: with urlopen(req, timeout=timeout) as r: return r.read() except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _safe_name(url: str) -> str: # Create a short, filesystem-safe token for the URL return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100] def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str: key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv" s3.put_object( Bucket=BUCKET, Key=key, Body=blob, ContentType="text/csv", ) return key def lambda_handler(event=None, context=None): assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL" run_ts = int(time.time()) written = [] for i, url in enumerate(IOC_URLS): req = _build_request(url) data = _http_bytes(req) key = _put_csv(data, url, run_ts, i) written.append({"url": url, "s3_key": key, "bytes": len(data)}) return {"ok": True, "written": written} if __name__ == "__main__": print(json.dumps(lambda_handler(), indent=2))
Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:
Chiave Esempio S3_BUCKET
csv-ioc
S3_PREFIX
csv-ioc/
IOC_URLS
https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
AUTH_HEADER
Authorization: Bearer <token>
TIMEOUT
60
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Destinazione: la tua funzione Lambda.
- Nome:
csv-custom-ioc-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti, poi fai clic su Aggiungi utenti.
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci un nome univoco (ad esempio
secops-reader
) - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
- Fai clic su Crea utente.
- Utente: inserisci un nome univoco (ad esempio
- Allega criterio per la lettura minimi (personalizzati): Utenti > seleziona
secops-reader
> Autorizzazioni > Aggiungi autorizzazioni > Allega criteri direttamente > Crea criteri Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Imposta il nome su
secops-reader-policy
.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare file CSV di indicatori di compromissione personalizzati
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
CSV Custom IOC
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona CSV Custom IOC come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://csv-ioc/csv-ioc/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Durata massima del file: 180 giorni per impostazione predefinita.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
asn |
entity.metadata.threat.detection_fields.asn_label.value | Mappato direttamente dal campo "asn". |
category |
entity.metadata.threat.category_details | Mappata direttamente dal campo "categoria". |
classification |
entity.metadata.threat.category_details | Aggiunto a "classificazione - " e mappato al campo "entity.metadata.threat.category_details". |
column2 |
entity.entity.hostname | Mappato a "entity.entity.hostname" se [category] corrisponde a ".?ip" o ".?proxy" e [not_ip] è true. |
column2 |
entity.entity.ip | Unito a "entity.entity.ip" se [category] corrisponde a ".?ip" o ".?proxy" e [not_ip] è false. |
confidence |
entity.metadata.threat.confidence_score | Convertito in float e mappato al campo "entity.metadata.threat.confidence_score". |
country |
entity.entity.location.country_or_region | Mappato direttamente dal campo "Paese". |
date_first |
entity.metadata.threat.first_discovered_time | Analizzato come ISO8601 e mappato al campo "entity.metadata.threat.first_discovered_time". |
date_last |
entity.metadata.threat.last_updated_time | Analizzato come ISO8601 e mappato al campo "entity.metadata.threat.last_updated_time". |
detail |
entity.metadata.threat.summary | Mappato direttamente dal campo "Dettagli". |
detail2 |
entity.metadata.threat.description | Mappato direttamente dal campo "detail2". |
domain |
entity.entity.hostname | Mappato direttamente dal campo "dominio". |
email |
entity.entity.user.email_addresses | Unito al campo "entity.entity.user.email_addresses". |
id |
entity.metadata.product_entity_id | Aggiunto a "id - " e mappato al campo "entity.metadata.product_entity_id". |
import_session_id |
entity.metadata.threat.detection_fields.import_session_id_label.value | Mappato direttamente dal campo "import_session_id". |
itype |
entity.metadata.threat.detection_fields.itype_label.value | Mappato direttamente dal campo "itype". |
lat |
entity.entity.location.region_latitude | Convertito in float e mappato al campo "entity.entity.location.region_latitude". |
lon |
entity.entity.location.region_longitude | Convertito in float e mappato al campo "entity.entity.location.region_longitude". |
maltype |
entity.metadata.threat.detection_fields.maltype_label.value | Mappato direttamente dal campo "maltype". |
md5 |
entity.entity.file.md5 | Mappato direttamente dal campo "md5". |
media |
entity.metadata.threat.detection_fields.media_label.value | Mappato direttamente dal campo "media". |
media_type |
entity.metadata.threat.detection_fields.media_type_label.value | Mappato direttamente dal campo "media_type". |
org |
entity.metadata.threat.detection_fields.org_label.value | Mappato direttamente dal campo "org". |
resource_uri |
entity.entity.url | Mappato a "entity.entity.url" se [itype] non corrisponde a "(ip |
resource_uri |
entity.metadata.threat.url_back_to_product | Mappato a "entity.metadata.threat.url_back_to_product" se [itype] corrisponde a "(ip |
score |
entity.metadata.threat.confidence_details | Mappato direttamente dal campo "punteggio". |
severity |
entity.metadata.threat.severity | Convertito in maiuscolo e mappato al campo "entity.metadata.threat.severity" se corrisponde a "LOW", "MEDIUM", "HIGH" o "CRITICAL". |
source |
entity.metadata.threat.detection_fields.source_label.value | Mappato direttamente dal campo "source". |
source_feed_id |
entity.metadata.threat.detection_fields.source_feed_id_label.value | Mappato direttamente dal campo "source_feed_id". |
srcip |
entity.entity.ip | Unito a "entity.entity.ip" se [srcip] non è vuoto e non è uguale a [value]. |
state |
entity.metadata.threat.detection_fields.state_label.value | Mappato direttamente dal campo "state". |
trusted_circle_ids |
entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | Mappato direttamente dal campo "trusted_circle_ids". |
update_id |
entity.metadata.threat.detection_fields.update_id_label.value | Mappato direttamente dal campo "update_id". |
value |
entity.entity.file.full_path | Mappato a "entity.entity.file.full_path" se [category] corrisponde a ".*?file". |
value |
entity.entity.file.md5 | Mappato a "entity.entity.file.md5" se [category] corrisponde a ".*?md5" e [value] è una stringa esadecimale di 32 caratteri. |
value |
entity.entity.file.sha1 | Mappato a "entity.entity.file.sha1" se ([category] corrisponde a ".?md5" e [value] è una stringa esadecimale di 40 caratteri) o ([category] corrisponde a ".?sha1" e [value] è una stringa esadecimale di 40 caratteri). |
value |
entity.entity.file.sha256 | Mappato a "entity.entity.file.sha256" se ([category] corrisponde a ".?md5" e [value] è una stringa esadecimale e [file_type] non è "md5") o ([category] corrisponde a ".?sha256" e [value] è una stringa esadecimale). |
value |
entity.entity.hostname | Mappato a "entity.entity.hostname" se ([category] corrisponde a ".?domain") o ([category] corrisponde a ".?ip" o ".*?proxy" e [not_ip] è true). |
value |
entity.entity.url | Mappato a "entity.entity.url" se ([category] corrisponde a ".*?url") o ([category] corrisponde a "url" e [resource_uri] non è vuoto). |
N/D | entity.metadata.collected_timestamp | Completato con il timestamp dell'evento. |
N/D | entity.metadata.interval.end_time | Imposta un valore costante di 253402300799 secondi. |
N/D | entity.metadata.interval.start_time | Completato con il timestamp dell'evento. |
N/D | entity.metadata.vendor_name | Imposta un valore costante di "Custom IOC". |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.