Recopilar archivos CSV de IOC personalizados

Disponible en:

En este documento se explica cómo ingerir archivos CSV de IOC personalizados en Google Security Operations mediante Amazon S3. A continuación, asigna estos campos al UDM, gestionando varios tipos de datos, como IPs, dominios y hashes, y enriqueciendo la salida con detalles de amenazas, información de entidades y niveles de gravedad.

Antes de empezar

  • Instancia de Google SecOps
  • Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)
  • Acceso a una o varias URLs de feeds de indicadores de compromiso (IOC) en formato CSV (HTTPS) o a un endpoint interno que proporcione CSV

Configurar un segmento de AWS S3 y IAM para Google SecOps

  1. Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
  2. Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo, csv-ioc).
  3. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  4. Selecciona el Usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  7. Selecciona Servicio de terceros como Caso práctico.
  8. Haz clic en Siguiente.
  9. Opcional: añade una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. En la sección Políticas de permisos, haz clic en Añadir permisos.
  15. Selecciona Añadir permisos.
  16. Seleccione Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Añadir permisos.

Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3

  1. Ve a la consola de AWS > IAM > Políticas > Crear política > pestaña JSON.
  2. Introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • Sustituye csv-ioc si has introducido otro nombre de segmento.
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunte la política que acaba de crear.

  6. Dale el nombre WriteCsvIocToS3Role al rol y haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    Ajuste Valor
    Nombre csv_custom_ioc_to_s3
    Tiempo de ejecución Python 3.13
    Arquitectura x86_64
    Rol de ejecución WriteCsvIocToS3Role
  4. Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (csv_custom_ioc_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. Ve a Configuración > Variables de entorno > Editar > Añadir nueva variable de entorno.

  6. Introduce las siguientes variables de entorno y sustituye los valores por los tuyos:

    Clave Ejemplo
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  8. Seleccione la pestaña Configuración.

  9. En el panel Configuración general, haz clic en Editar.

  10. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
  2. Proporcione los siguientes detalles de configuración:
    • Programación periódica: Precio (1 hour).
    • Destino: tu función Lambda.
    • Nombre: csv-custom-ioc-1h.
  3. Haz clic en Crear programación.

Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps

  1. En la consola de AWS, vaya a IAM > Usuarios y, a continuación, haga clic en Añadir usuarios.
  2. Proporcione los siguientes detalles de configuración:
    • Usuario: introduce un nombre único (por ejemplo, secops-reader).
    • Tipo de acceso: selecciona Clave de acceso - Acceso programático.
    • Haz clic en Crear usuario.
  3. Asigna una política de lectura mínima (personalizada): Usuarios > selecciona secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
  4. En el editor de JSON, introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Asigna el nombre secops-reader-policy.

  6. Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.

  7. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  8. Descarga el archivo CSV (estos valores se introducen en el feed).

Configurar un feed en Google SecOps para ingerir archivos CSV de IOC personalizados

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en Añadir feed.
  3. En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo, CSV Custom IOC).
  4. Selecciona Amazon S3 V2 como Tipo de fuente.
  5. Seleccione CSV Custom IOC (Indicador de compromiso personalizado en CSV) como Log type (Tipo de registro).
  6. Haz clic en Siguiente.
  7. Especifique los valores de los siguientes parámetros de entrada:
    • URI de S3: s3://csv-ioc/csv-ioc/
    • Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
    • Antigüedad máxima de los archivos: 180 días de forma predeterminada.
    • ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
    • Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: etiqueta que se aplicará a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
asn entity.metadata.threat.detection_fields.asn_label.value Se asigna directamente desde el campo "asn".
category entity.metadata.threat.category_details Se asigna directamente desde el campo "category".
classification entity.metadata.threat.category_details Se añade a "classification - " y se asigna al campo "entity.metadata.threat.category_details".
column2 entity.entity.hostname Se asigna a "entity.entity.hostname" si [category] coincide con ".?ip" o ".?proxy" y [not_ip] es true.
column2 entity.entity.ip Se combina en "entity.entity.ip" si [category] coincide con ".?ip" o ".?proxy" y [not_ip] es false.
confidence entity.metadata.threat.confidence_score Se ha convertido a float y se ha asignado al campo "entity.metadata.threat.confidence_score".
country entity.entity.location.country_or_region Se asigna directamente desde el campo "country".
date_first entity.metadata.threat.first_discovered_time Se analiza como ISO8601 y se asigna al campo "entity.metadata.threat.first_discovered_time".
date_last entity.metadata.threat.last_updated_time Se analiza como ISO8601 y se asigna al campo "entity.metadata.threat.last_updated_time".
detail entity.metadata.threat.summary Se asigna directamente desde el campo "detail".
detail2 entity.metadata.threat.description Se asigna directamente desde el campo "detail2".
domain entity.entity.hostname Se asigna directamente desde el campo "domain".
email entity.entity.user.email_addresses Se ha combinado en el campo "entity.entity.user.email_addresses".
id entity.metadata.product_entity_id Se añade a "id - " y se asigna al campo "entity.metadata.product_entity_id".
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value Se asigna directamente desde el campo "import_session_id".
itype entity.metadata.threat.detection_fields.itype_label.value Se asigna directamente desde el campo "itype".
lat entity.entity.location.region_latitude Se ha convertido a float y se ha asignado al campo "entity.entity.location.region_latitude".
lon entity.entity.location.region_longitude Se ha convertido a float y se ha asignado al campo "entity.entity.location.region_longitude".
maltype entity.metadata.threat.detection_fields.maltype_label.value Se asigna directamente desde el campo "maltype".
md5 entity.entity.file.md5 Se asigna directamente desde el campo "md5".
media entity.metadata.threat.detection_fields.media_label.value Se asigna directamente desde el campo "media".
media_type entity.metadata.threat.detection_fields.media_type_label.value Se asigna directamente desde el campo "media_type".
org entity.metadata.threat.detection_fields.org_label.value Se asigna directamente desde el campo "org".
resource_uri entity.entity.url Se asigna a "entity.entity.url" si [itype] no coincide con "(ip
resource_uri entity.metadata.threat.url_back_to_product Se asigna a "entity.metadata.threat.url_back_to_product" si [itype] coincide con "(ip
score entity.metadata.threat.confidence_details Se asigna directamente desde el campo "score".
severity entity.metadata.threat.severity Se convierte a mayúsculas y se asigna al campo "entity.metadata.threat.severity" si coincide con "LOW", "MEDIUM", "HIGH" o "CRITICAL".
source entity.metadata.threat.detection_fields.source_label.value Se asigna directamente desde el campo "source".
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value Se asigna directamente desde el campo "source_feed_id".
srcip entity.entity.ip Se combina con "entity.entity.ip" si [srcip] no está vacío y no es igual a [value].
state entity.metadata.threat.detection_fields.state_label.value Se asigna directamente desde el campo "state".
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value Se asigna directamente desde el campo "trusted_circle_ids".
update_id entity.metadata.threat.detection_fields.update_id_label.value Se asigna directamente desde el campo "update_id".
value entity.entity.file.full_path Se asigna a "entity.entity.file.full_path" si [category] coincide con ".*?file".
value entity.entity.file.md5 Se asigna a "entity.entity.file.md5" si [category] coincide con ".*?md5" y [value] es una cadena hexadecimal de 32 caracteres.
value entity.entity.file.sha1 Se asigna a "entity.entity.file.sha1" si ([category] coincide con ".?md5" y [value] es una cadena hexadecimal de 40 caracteres) o ([category] coincide con ".?sha1" y [value] es una cadena hexadecimal de 40 caracteres).
value entity.entity.file.sha256 Se asigna a "entity.entity.file.sha256" si ([category] coincide con ".?md5" y [value] es una cadena hexadecimal y [file_type] no es "md5") o ([category] coincide con ".?sha256" y [value] es una cadena hexadecimal).
value entity.entity.hostname Se asigna a "entity.entity.hostname" si ([category] coincide con ".?domain") o ([category] coincide con ".?ip" o ".*?proxy" y [not_ip] es true).
value entity.entity.url Se asigna a "entity.entity.url" si ([category] coincide con ".*?url") o ([category] coincide con "url" y [resource_uri] no está vacío).
N/A entity.metadata.collected_timestamp Se rellena con la marca de tiempo del evento.
N/A entity.metadata.interval.end_time Se establece en un valor constante de 253402300799 segundos.
N/A entity.metadata.interval.start_time Se rellena con la marca de tiempo del evento.
N/A entity.metadata.vendor_name Se asigna el valor constante "Custom IOC".

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.