Recopilar archivos CSV de IOC personalizados
En este documento se explica cómo ingerir archivos CSV de IOC personalizados en Google Security Operations mediante Amazon S3. A continuación, asigna estos campos al UDM, gestionando varios tipos de datos, como IPs, dominios y hashes, y enriqueciendo la salida con detalles de amenazas, información de entidades y niveles de gravedad.
Antes de empezar
- Instancia de Google SecOps
- Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)
- Acceso a una o varias URLs de feeds de indicadores de compromiso (IOC) en formato CSV (HTTPS) o a un endpoint interno que proporcione CSV
Configurar un segmento de AWS S3 y IAM para Google SecOps
- Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
- Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo,
csv-ioc
). - Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros como Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3
- Ve a la consola de AWS > IAM > Políticas > Crear política > pestaña JSON.
Introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutCsvIocObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::csv-ioc/*" } ] }
- Sustituye
csv-ioc
si has introducido otro nombre de segmento.
- Sustituye
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunte la política que acaba de crear.
Dale el nombre
WriteCsvIocToS3Role
al rol y haz clic en Crear rol.
Crear la función Lambda
- En la consola de AWS, ve a Lambda > Funciones > Crear función.
- Haz clic en Crear desde cero.
Proporciona los siguientes detalles de configuración:
Ajuste Valor Nombre csv_custom_ioc_to_s3
Tiempo de ejecución Python 3.13 Arquitectura x86_64 Rol de ejecución WriteCsvIocToS3Role
Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (
csv_custom_ioc_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform) # - Multiple URLs (comma-separated) # - Optional auth header # - Retries for 429/5xx # - Unique filenames per page # - Sets ContentType=text/csv import os, time, json from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/") IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()] AUTH_HEADER = os.environ.get("AUTH_HEADER", "") # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>" TIMEOUT = int(os.environ.get("TIMEOUT", "60")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed in IOC_URLS") req = Request(url, method="GET") # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization if AUTH_HEADER: if ":" in AUTH_HEADER: k, v = AUTH_HEADER.split(":", 1) req.add_header(k.strip(), v.strip()) else: req.add_header("Authorization", AUTH_HEADER.strip()) req.add_header("Accept", "text/csv, */*") return req def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes: attempt, backoff = 0, 1.0 while True: try: with urlopen(req, timeout=timeout) as r: return r.read() except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _safe_name(url: str) -> str: # Create a short, filesystem-safe token for the URL return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100] def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str: key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv" s3.put_object( Bucket=BUCKET, Key=key, Body=blob, ContentType="text/csv", ) return key def lambda_handler(event=None, context=None): assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL" run_ts = int(time.time()) written = [] for i, url in enumerate(IOC_URLS): req = _build_request(url) data = _http_bytes(req) key = _put_csv(data, url, run_ts, i) written.append({"url": url, "s3_key": key, "bytes": len(data)}) return {"ok": True, "written": written} if __name__ == "__main__": print(json.dumps(lambda_handler(), indent=2))
Ve a Configuración > Variables de entorno > Editar > Añadir nueva variable de entorno.
Introduce las siguientes variables de entorno y sustituye los valores por los tuyos:
Clave Ejemplo S3_BUCKET
csv-ioc
S3_PREFIX
csv-ioc/
IOC_URLS
https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
AUTH_HEADER
Authorization: Bearer <token>
TIMEOUT
60
Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).
Seleccione la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crear una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
- Proporcione los siguientes detalles de configuración:
- Programación periódica: Precio (
1 hour
). - Destino: tu función Lambda.
- Nombre:
csv-custom-ioc-1h
.
- Programación periódica: Precio (
- Haz clic en Crear programación.
Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps
- En la consola de AWS, vaya a IAM > Usuarios y, a continuación, haga clic en Añadir usuarios.
- Proporcione los siguientes detalles de configuración:
- Usuario: introduce un nombre único (por ejemplo,
secops-reader
). - Tipo de acceso: selecciona Clave de acceso - Acceso programático.
- Haz clic en Crear usuario.
- Usuario: introduce un nombre único (por ejemplo,
- Asigna una política de lectura mínima (personalizada): Usuarios > selecciona
secops-reader
> Permisos > Añadir permisos > Asignar políticas directamente > Crear política. En el editor de JSON, introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Asigna el nombre
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se introducen en el feed).
Configurar un feed en Google SecOps para ingerir archivos CSV de IOC personalizados
- Ve a Configuración de SIEM > Feeds.
- Haz clic en Añadir feed.
- En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo,
CSV Custom IOC
). - Selecciona Amazon S3 V2 como Tipo de fuente.
- Seleccione CSV Custom IOC (Indicador de compromiso personalizado en CSV) como Log type (Tipo de registro).
- Haz clic en Siguiente.
- Especifique los valores de los siguientes parámetros de entrada:
- URI de S3:
s3://csv-ioc/csv-ioc/
- Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
- Antigüedad máxima de los archivos: 180 días de forma predeterminada.
- ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
- Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres de recursos: el espacio de nombres de recursos.
- Etiquetas de ingestión: etiqueta que se aplicará a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
asn |
entity.metadata.threat.detection_fields.asn_label.value | Se asigna directamente desde el campo "asn". |
category |
entity.metadata.threat.category_details | Se asigna directamente desde el campo "category". |
classification |
entity.metadata.threat.category_details | Se añade a "classification - " y se asigna al campo "entity.metadata.threat.category_details". |
column2 |
entity.entity.hostname | Se asigna a "entity.entity.hostname" si [category] coincide con ".?ip" o ".?proxy" y [not_ip] es true. |
column2 |
entity.entity.ip | Se combina en "entity.entity.ip" si [category] coincide con ".?ip" o ".?proxy" y [not_ip] es false. |
confidence |
entity.metadata.threat.confidence_score | Se ha convertido a float y se ha asignado al campo "entity.metadata.threat.confidence_score". |
country |
entity.entity.location.country_or_region | Se asigna directamente desde el campo "country". |
date_first |
entity.metadata.threat.first_discovered_time | Se analiza como ISO8601 y se asigna al campo "entity.metadata.threat.first_discovered_time". |
date_last |
entity.metadata.threat.last_updated_time | Se analiza como ISO8601 y se asigna al campo "entity.metadata.threat.last_updated_time". |
detail |
entity.metadata.threat.summary | Se asigna directamente desde el campo "detail". |
detail2 |
entity.metadata.threat.description | Se asigna directamente desde el campo "detail2". |
domain |
entity.entity.hostname | Se asigna directamente desde el campo "domain". |
email |
entity.entity.user.email_addresses | Se ha combinado en el campo "entity.entity.user.email_addresses". |
id |
entity.metadata.product_entity_id | Se añade a "id - " y se asigna al campo "entity.metadata.product_entity_id". |
import_session_id |
entity.metadata.threat.detection_fields.import_session_id_label.value | Se asigna directamente desde el campo "import_session_id". |
itype |
entity.metadata.threat.detection_fields.itype_label.value | Se asigna directamente desde el campo "itype". |
lat |
entity.entity.location.region_latitude | Se ha convertido a float y se ha asignado al campo "entity.entity.location.region_latitude". |
lon |
entity.entity.location.region_longitude | Se ha convertido a float y se ha asignado al campo "entity.entity.location.region_longitude". |
maltype |
entity.metadata.threat.detection_fields.maltype_label.value | Se asigna directamente desde el campo "maltype". |
md5 |
entity.entity.file.md5 | Se asigna directamente desde el campo "md5". |
media |
entity.metadata.threat.detection_fields.media_label.value | Se asigna directamente desde el campo "media". |
media_type |
entity.metadata.threat.detection_fields.media_type_label.value | Se asigna directamente desde el campo "media_type". |
org |
entity.metadata.threat.detection_fields.org_label.value | Se asigna directamente desde el campo "org". |
resource_uri |
entity.entity.url | Se asigna a "entity.entity.url" si [itype] no coincide con "(ip |
resource_uri |
entity.metadata.threat.url_back_to_product | Se asigna a "entity.metadata.threat.url_back_to_product" si [itype] coincide con "(ip |
score |
entity.metadata.threat.confidence_details | Se asigna directamente desde el campo "score". |
severity |
entity.metadata.threat.severity | Se convierte a mayúsculas y se asigna al campo "entity.metadata.threat.severity" si coincide con "LOW", "MEDIUM", "HIGH" o "CRITICAL". |
source |
entity.metadata.threat.detection_fields.source_label.value | Se asigna directamente desde el campo "source". |
source_feed_id |
entity.metadata.threat.detection_fields.source_feed_id_label.value | Se asigna directamente desde el campo "source_feed_id". |
srcip |
entity.entity.ip | Se combina con "entity.entity.ip" si [srcip] no está vacío y no es igual a [value]. |
state |
entity.metadata.threat.detection_fields.state_label.value | Se asigna directamente desde el campo "state". |
trusted_circle_ids |
entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | Se asigna directamente desde el campo "trusted_circle_ids". |
update_id |
entity.metadata.threat.detection_fields.update_id_label.value | Se asigna directamente desde el campo "update_id". |
value |
entity.entity.file.full_path | Se asigna a "entity.entity.file.full_path" si [category] coincide con ".*?file". |
value |
entity.entity.file.md5 | Se asigna a "entity.entity.file.md5" si [category] coincide con ".*?md5" y [value] es una cadena hexadecimal de 32 caracteres. |
value |
entity.entity.file.sha1 | Se asigna a "entity.entity.file.sha1" si ([category] coincide con ".?md5" y [value] es una cadena hexadecimal de 40 caracteres) o ([category] coincide con ".?sha1" y [value] es una cadena hexadecimal de 40 caracteres). |
value |
entity.entity.file.sha256 | Se asigna a "entity.entity.file.sha256" si ([category] coincide con ".?md5" y [value] es una cadena hexadecimal y [file_type] no es "md5") o ([category] coincide con ".?sha256" y [value] es una cadena hexadecimal). |
value |
entity.entity.hostname | Se asigna a "entity.entity.hostname" si ([category] coincide con ".?domain") o ([category] coincide con ".?ip" o ".*?proxy" y [not_ip] es true). |
value |
entity.entity.url | Se asigna a "entity.entity.url" si ([category] coincide con ".*?url") o ([category] coincide con "url" y [resource_uri] no está vacío). |
N/A | entity.metadata.collected_timestamp | Se rellena con la marca de tiempo del evento. |
N/A | entity.metadata.interval.end_time | Se establece en un valor constante de 253402300799 segundos. |
N/A | entity.metadata.interval.start_time | Se rellena con la marca de tiempo del evento. |
N/A | entity.metadata.vendor_name | Se asigna el valor constante "Custom IOC". |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.