Raccogliere file CSV personalizzati di indicatori di compromissione

Supportato in:

Questo documento spiega come importare file CSV di indicatori di compromissione personalizzati in Google Security Operations utilizzando Amazon S3. Quindi, mappa questi campi all'UDM, gestendo vari tipi di dati come IP, domini e hash e arricchendo l'output con dettagli sulle minacce, informazioni sulle entità e livelli di gravità.

Prima di iniziare

  • Istanza Google SecOps
  • Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)
  • Accesso a uno o più URL feed IOC CSV (HTTPS) o a un endpoint interno che pubblica CSV

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, csv-ioc).
  3. Crea un utente seguendo questa guida: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca e seleziona il criterio AmazonS3FullAccess.
  18. Fai clic su Avanti.
  19. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Vai alla console AWS > IAM > Policy > Crea policy > scheda JSON.
  2. Inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • Sostituisci csv-ioc se hai inserito un nome bucket diverso.
  3. Fai clic su Avanti > Crea policy.

  4. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  5. Allega il criterio appena creato.

  6. Assegna al ruolo il nome WriteCsvIocToS3Role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome csv_custom_ioc_to_s3
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione WriteCsvIocToS3Role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (csv_custom_ioc_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.

  6. Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:

    Chiave Esempio
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  8. Seleziona la scheda Configurazione.

  9. Nel riquadro Configurazione generale, fai clic su Modifica.

  10. Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Destinazione: la tua funzione Lambda.
    • Nome: csv-custom-ioc-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps

  1. Nella console AWS, vai a IAM > Utenti, poi fai clic su Aggiungi utenti.
  2. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci un nome univoco (ad esempio secops-reader)
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
    • Fai clic su Crea utente.
  3. Allega criterio per la lettura minimi (personalizzati): Utenti > seleziona secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Allega criteri direttamente > Crea criteri
  4. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Imposta il nome su secops-reader-policy.

  6. Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  7. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  8. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configura un feed in Google SecOps per importare file CSV di indicatori di compromissione personalizzati

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, CSV Custom IOC).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona CSV Custom IOC come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://csv-ioc/csv-ioc/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Durata massima del file: 180 giorni per impostazione predefinita.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
asn entity.metadata.threat.detection_fields.asn_label.value Mappato direttamente dal campo "asn".
category entity.metadata.threat.category_details Mappata direttamente dal campo "categoria".
classification entity.metadata.threat.category_details Aggiunto a "classificazione - " e mappato al campo "entity.metadata.threat.category_details".
column2 entity.entity.hostname Mappato a "entity.entity.hostname" se [category] corrisponde a ".?ip" o ".?proxy" e [not_ip] è true.
column2 entity.entity.ip Unito a "entity.entity.ip" se [category] corrisponde a ".?ip" o ".?proxy" e [not_ip] è false.
confidence entity.metadata.threat.confidence_score Convertito in float e mappato al campo "entity.metadata.threat.confidence_score".
country entity.entity.location.country_or_region Mappato direttamente dal campo "Paese".
date_first entity.metadata.threat.first_discovered_time Analizzato come ISO8601 e mappato al campo "entity.metadata.threat.first_discovered_time".
date_last entity.metadata.threat.last_updated_time Analizzato come ISO8601 e mappato al campo "entity.metadata.threat.last_updated_time".
detail entity.metadata.threat.summary Mappato direttamente dal campo "Dettagli".
detail2 entity.metadata.threat.description Mappato direttamente dal campo "detail2".
domain entity.entity.hostname Mappato direttamente dal campo "dominio".
email entity.entity.user.email_addresses Unito al campo "entity.entity.user.email_addresses".
id entity.metadata.product_entity_id Aggiunto a "id - " e mappato al campo "entity.metadata.product_entity_id".
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value Mappato direttamente dal campo "import_session_id".
itype entity.metadata.threat.detection_fields.itype_label.value Mappato direttamente dal campo "itype".
lat entity.entity.location.region_latitude Convertito in float e mappato al campo "entity.entity.location.region_latitude".
lon entity.entity.location.region_longitude Convertito in float e mappato al campo "entity.entity.location.region_longitude".
maltype entity.metadata.threat.detection_fields.maltype_label.value Mappato direttamente dal campo "maltype".
md5 entity.entity.file.md5 Mappato direttamente dal campo "md5".
media entity.metadata.threat.detection_fields.media_label.value Mappato direttamente dal campo "media".
media_type entity.metadata.threat.detection_fields.media_type_label.value Mappato direttamente dal campo "media_type".
org entity.metadata.threat.detection_fields.org_label.value Mappato direttamente dal campo "org".
resource_uri entity.entity.url Mappato a "entity.entity.url" se [itype] non corrisponde a "(ip
resource_uri entity.metadata.threat.url_back_to_product Mappato a "entity.metadata.threat.url_back_to_product" se [itype] corrisponde a "(ip
score entity.metadata.threat.confidence_details Mappato direttamente dal campo "punteggio".
severity entity.metadata.threat.severity Convertito in maiuscolo e mappato al campo "entity.metadata.threat.severity" se corrisponde a "LOW", "MEDIUM", "HIGH" o "CRITICAL".
source entity.metadata.threat.detection_fields.source_label.value Mappato direttamente dal campo "source".
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value Mappato direttamente dal campo "source_feed_id".
srcip entity.entity.ip Unito a "entity.entity.ip" se [srcip] non è vuoto e non è uguale a [value].
state entity.metadata.threat.detection_fields.state_label.value Mappato direttamente dal campo "state".
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value Mappato direttamente dal campo "trusted_circle_ids".
update_id entity.metadata.threat.detection_fields.update_id_label.value Mappato direttamente dal campo "update_id".
value entity.entity.file.full_path Mappato a "entity.entity.file.full_path" se [category] corrisponde a ".*?file".
value entity.entity.file.md5 Mappato a "entity.entity.file.md5" se [category] corrisponde a ".*?md5" e [value] è una stringa esadecimale di 32 caratteri.
value entity.entity.file.sha1 Mappato a "entity.entity.file.sha1" se ([category] corrisponde a ".?md5" e [value] è una stringa esadecimale di 40 caratteri) o ([category] corrisponde a ".?sha1" e [value] è una stringa esadecimale di 40 caratteri).
value entity.entity.file.sha256 Mappato a "entity.entity.file.sha256" se ([category] corrisponde a ".?md5" e [value] è una stringa esadecimale e [file_type] non è "md5") o ([category] corrisponde a ".?sha256" e [value] è una stringa esadecimale).
value entity.entity.hostname Mappato a "entity.entity.hostname" se ([category] corrisponde a ".?domain") o ([category] corrisponde a ".?ip" o ".*?proxy" e [not_ip] è true).
value entity.entity.url Mappato a "entity.entity.url" se ([category] corrisponde a ".*?url") o ([category] corrisponde a "url" e [resource_uri] non è vuoto).
N/D entity.metadata.collected_timestamp Completato con il timestamp dell'evento.
N/D entity.metadata.interval.end_time Imposta un valore costante di 253402300799 secondi.
N/D entity.metadata.interval.start_time Completato con il timestamp dell'evento.
N/D entity.metadata.vendor_name Imposta un valore costante di "Custom IOC".

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.