Recopila archivos CSV personalizados de IOC

Compatible con:

En este documento, se explica cómo transferir archivos CSV personalizados de IOC a Google Security Operations con Amazon S3. Luego, asigna estos campos al UDM, controla varios tipos de datos, como IPs, dominios y hashes, y enriquece el resultado con detalles de amenazas, información de entidades y niveles de gravedad.

Antes de comenzar

  • Instancia de Google SecOps
  • Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)
  • Acceso a una o más URLs de feeds de IOC en formato CSV (HTTPS) o a un extremo interno que proporcione CSV

Configura el bucket de AWS S3 y el IAM para Google SecOps

  1. Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
  2. Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo, csv-ioc).
  3. Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
  4. Selecciona el usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. Haz clic en Crear clave de acceso en la sección Claves de acceso.
  7. Selecciona Servicio de terceros como el Caso de uso.
  8. Haz clic en Siguiente.
  9. Opcional: Agrega una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. Haz clic en Agregar permisos en la sección Políticas de permisos.
  15. Selecciona Agregar permisos.
  16. Selecciona Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Agregar permisos.

Configura la política y el rol de IAM para las cargas de S3

  1. Ve a la consola de AWS > IAM > Políticas > Crear política > pestaña JSON.
  2. Ingresa la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • Reemplaza csv-ioc si ingresaste un nombre de bucket diferente.
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunta la política recién creada.

  6. Asigna el nombre WriteCsvIocToS3Role al rol y haz clic en Crear rol.

Crea la función Lambda

  1. En la consola de AWS, ve a Lambda > Functions > Create function.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    Configuración Valor
    Nombre csv_custom_ioc_to_s3
    Tiempo de ejecución Python 3.13
    Arquitectura x86_64
    Rol de ejecución WriteCsvIocToS3Role
  4. Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (csv_custom_ioc_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. Ve a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.

  6. Ingresa las siguientes variables de entorno y reemplaza los valores por los tuyos:

    Clave Ejemplo
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. Después de crear la función, permanece en su página (o abre Lambda > Functions > tu-función).

  8. Selecciona la pestaña Configuración.

  9. En el panel Configuración general, haz clic en Editar.

  10. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crea una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule.
  2. Proporciona los siguientes detalles de configuración:
    • Programación recurrente: Frecuencia (1 hour).
    • Destino: Tu función Lambda.
    • Nombre: csv-custom-ioc-1h.
  3. Haz clic en Crear programación.

Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps

  1. En la consola de AWS, ve a IAM > Usuarios y, luego, haz clic en Agregar usuarios.
  2. Proporciona los siguientes detalles de configuración:
    • Usuario: Ingresa un nombre único (por ejemplo, secops-reader).
    • Tipo de acceso: Selecciona Clave de acceso: Acceso programático.
    • Haz clic en Crear usuario.
  3. Adjunta una política de lectura mínima (personalizada): Usuarios > selecciona secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política
  4. En el editor de JSON, ingresa la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Configura el nombre como secops-reader-policy.

  6. Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.

  7. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  8. Descarga el archivo CSV (estos valores se ingresan en el feed).

Configura un feed en Google SecOps para transferir archivos CSV de IOC personalizados

  1. Ve a SIEM Settings > Feeds.
  2. Haz clic en Agregar feed nuevo.
  3. En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo, CSV Custom IOC).
  4. Selecciona Amazon S3 V2 como el Tipo de fuente.
  5. Selecciona IOC personalizado en CSV como el Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifica valores para los siguientes parámetros de entrada:
    • URI de S3: s3://csv-ioc/csv-ioc/
    • Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
    • Antigüedad máxima del archivo: 180 días de forma predeterminada
    • ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
    • Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres del recurso: Es el espacio de nombres del recurso.
    • Etiquetas de transmisión: Es la etiqueta que se aplicará a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
asn entity.metadata.threat.detection_fields.asn_label.value Se asigna directamente desde el campo "asn".
category entity.metadata.threat.category_details Se asigna directamente desde el campo "categoría".
classification entity.metadata.threat.category_details Se agrega a "classification - " y se asigna al campo "entity.metadata.threat.category_details".
column2 entity.entity.hostname Se asigna a "entity.entity.hostname" si [category] coincide con ".?ip" o ".?proxy" y [not_ip] es verdadero.
column2 entity.entity.ip Se combina en "entity.entity.ip" si [category] coincide con ".?ip" o ".?proxy" y [not_ip] es falso.
confidence entity.metadata.threat.confidence_score Se convirtió a un número de punto flotante y se asignó al campo "entity.metadata.threat.confidence_score".
country entity.entity.location.country_or_region Se asigna directamente desde el campo "país".
date_first entity.metadata.threat.first_discovered_time Se analiza como ISO8601 y se asigna al campo "entity.metadata.threat.first_discovered_time".
date_last entity.metadata.threat.last_updated_time Se analiza como ISO8601 y se asigna al campo "entity.metadata.threat.last_updated_time".
detail entity.metadata.threat.summary Se asigna directamente desde el campo "detalle".
detail2 entity.metadata.threat.description Se asigna directamente desde el campo "detail2".
domain entity.entity.hostname Se asigna directamente desde el campo "dominio".
email entity.entity.user.email_addresses Se combinó en el campo "entity.entity.user.email_addresses".
id entity.metadata.product_entity_id Se agrega a "id - " y se asigna al campo "entity.metadata.product_entity_id".
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value Se asigna directamente desde el campo "import_session_id".
itype entity.metadata.threat.detection_fields.itype_label.value Se asigna directamente desde el campo "itype".
lat entity.entity.location.region_latitude Se convirtió a un número de punto flotante y se asignó al campo "entity.entity.location.region_latitude".
lon entity.entity.location.region_longitude Se convirtió a un número de punto flotante y se asignó al campo "entity.entity.location.region_longitude".
maltype entity.metadata.threat.detection_fields.maltype_label.value Se asigna directamente desde el campo "maltype".
md5 entity.entity.file.md5 Se asigna directamente desde el campo "md5".
media entity.metadata.threat.detection_fields.media_label.value Se asigna directamente desde el campo "media".
media_type entity.metadata.threat.detection_fields.media_type_label.value Se asigna directamente desde el campo "media_type".
org entity.metadata.threat.detection_fields.org_label.value Se asigna directamente desde el campo "org".
resource_uri entity.entity.url Se asigna a "entity.entity.url" si [itype] no coincide con "(ip
resource_uri entity.metadata.threat.url_back_to_product Se asigna a "entity.metadata.threat.url_back_to_product" si [itype] coincide con "(ip
score entity.metadata.threat.confidence_details Se asigna directamente desde el campo "score".
severity entity.metadata.threat.severity Se convierte a mayúsculas y se asigna al campo "entity.metadata.threat.severity" si coincide con "LOW", "MEDIUM", "HIGH" o "CRITICAL".
source entity.metadata.threat.detection_fields.source_label.value Se asigna directamente desde el campo "source".
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value Se asigna directamente desde el campo "source_feed_id".
srcip entity.entity.ip Se combina en "entity.entity.ip" si [srcip] no está vacío y no es igual a [value].
state entity.metadata.threat.detection_fields.state_label.value Se asigna directamente desde el campo "state".
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value Se asigna directamente desde el campo "trusted_circle_ids".
update_id entity.metadata.threat.detection_fields.update_id_label.value Se asigna directamente desde el campo "update_id".
value entity.entity.file.full_path Se asigna a "entity.entity.file.full_path" si [category] coincide con ".*?file".
value entity.entity.file.md5 Se asigna a "entity.entity.file.md5" si [category] coincide con ".*?md5" y [value] es una cadena hexadecimal de 32 caracteres.
value entity.entity.file.sha1 Se asigna a "entity.entity.file.sha1" si ([category] coincide con ".?md5" y [value] es una cadena hexadecimal de 40 caracteres) o ([category] coincide con ".?sha1" y [value] es una cadena hexadecimal de 40 caracteres).
value entity.entity.file.sha256 Se asigna a "entity.entity.file.sha256" si ([category] coincide con ".?md5" y [value] es una cadena hexadecimal y [file_type] no es "md5") o ([category] coincide con ".?sha256" y [value] es una cadena hexadecimal).
value entity.entity.hostname Se asigna a "entity.entity.hostname" si ([category] coincide con ".?domain") o ([category] coincide con ".?ip" o ".*?proxy" y [not_ip] es verdadero).
value entity.entity.url Se asigna a "entity.entity.url" si ([category] coincide con ".*?url") o ([category] coincide con "url" y [resource_uri] no está vacío).
N/A entity.metadata.collected_timestamp Se propaga con la marca de tiempo del evento.
N/A entity.metadata.interval.end_time Se establece en un valor constante de 253402300799 segundos.
N/A entity.metadata.interval.start_time Se propaga con la marca de tiempo del evento.
N/A entity.metadata.vendor_name Se establece en un valor constante de "IOC personalizado".

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.