Recolha ficheiros IOC personalizados CSV

Compatível com:

Este documento explica como carregar ficheiros IOC personalizados CSV para o Google Security Operations através do Amazon S3. Em seguida, mapeia estes campos para o UDM, processando vários tipos de dados, como IPs, domínios e hashes, e enriquecendo o resultado com detalhes de ameaças, informações de entidades e níveis de gravidade.

Antes de começar

  • Instância do Google SecOps
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
  • Acesso a um ou mais URLs de feeds de IOCs CSV (HTTPS) ou a um ponto final interno que publique CSV

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, csv-ioc).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Aceda a AWS console > IAM > Policies > Create policy > separador JSON.
  2. Introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • Substitua csv-ioc se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política criada recentemente.

  6. Dê o nome WriteCsvIocToS3Role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:

    Definição Valor
    Nome csv_custom_ioc_to_s3
    Runtime Python 3.13
    Arquitetura x86_64
    Função de execução WriteCsvIocToS3Role
  4. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (csv_custom_ioc_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  6. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:

    Chave Exemplo
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).

  8. Selecione o separador Configuração.

  9. No painel Configuração geral, clique em Editar.

  10. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Alvo: a sua função Lambda.
    • Nome: csv-custom-ioc-1h.
  3. Clique em Criar programação.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Users e, de seguida, clique em Add users.
  2. Indique os seguintes detalhes de configuração:
    • Utilizador: introduza um nome único (por exemplo, secops-reader)
    • Tipo de acesso: selecione Chave de acesso – Acesso programático
    • Clique em Criar utilizador.
  3. Anexe a política de leitura mínima (personalizada): Utilizadores > selecione secops-reader > Autorizações > Adicionar autorizações > Anexar políticas diretamente > Criar política
  4. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Defina o nome como secops-reader-policy.

  6. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  7. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  8. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar ficheiros CSV de IOCs personalizados

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, CSV Custom IOC).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione IOC personalizado CSV como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://csv-ioc/csv-ioc/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: predefinição de 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta a aplicar aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento de UDM Lógica
asn entity.metadata.threat.detection_fields.asn_label.value Mapeado diretamente a partir do campo "asn".
category entity.metadata.threat.category_details Mapeado diretamente a partir do campo "category".
classification entity.metadata.threat.category_details Anexado a "classification - " e mapeado para o campo "entity.metadata.threat.category_details".
column2 entity.entity.hostname Mapeado para "entity.entity.hostname" se [category] corresponder a ".?ip" ou ".?proxy" e [not_ip] for verdadeiro.
column2 entity.entity.ip Unido em "entity.entity.ip" se [category] corresponder a ".?ip" ou ".?proxy" e [not_ip] for falso.
confidence entity.metadata.threat.confidence_score Convertido em número de vírgula flutuante e mapeado para o campo "entity.metadata.threat.confidence_score".
country entity.entity.location.country_or_region Mapeado diretamente a partir do campo "country".
date_first entity.metadata.threat.first_discovered_time Analisado como ISO8601 e mapeado para o campo "entity.metadata.threat.first_discovered_time".
date_last entity.metadata.threat.last_updated_time Analisado como ISO8601 e mapeado para o campo "entity.metadata.threat.last_updated_time".
detail entity.metadata.threat.summary Mapeado diretamente a partir do campo "detail".
detail2 entity.metadata.threat.description Mapeado diretamente a partir do campo "detail2".
domain entity.entity.hostname Mapeado diretamente a partir do campo "domain".
email entity.entity.user.email_addresses Unido no campo "entity.entity.user.email_addresses".
id entity.metadata.product_entity_id Anexado a "id - " e mapeado para o campo "entity.metadata.product_entity_id".
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value Mapeado diretamente a partir do campo "import_session_id".
itype entity.metadata.threat.detection_fields.itype_label.value Mapeado diretamente a partir do campo "itype".
lat entity.entity.location.region_latitude Convertido em flutuante e mapeado para o campo "entity.entity.location.region_latitude".
lon entity.entity.location.region_longitude Convertido em float e mapeado para o campo "entity.entity.location.region_longitude".
maltype entity.metadata.threat.detection_fields.maltype_label.value Mapeado diretamente a partir do campo "maltype".
md5 entity.entity.file.md5 Mapeado diretamente a partir do campo "md5".
media entity.metadata.threat.detection_fields.media_label.value Mapeado diretamente a partir do campo "media".
media_type entity.metadata.threat.detection_fields.media_type_label.value Mapeado diretamente a partir do campo "media_type".
org entity.metadata.threat.detection_fields.org_label.value Mapeado diretamente a partir do campo "org".
resource_uri entity.entity.url Mapeado para "entity.entity.url" se [itype] não corresponder a "(ip
resource_uri entity.metadata.threat.url_back_to_product Mapeado para "entity.metadata.threat.url_back_to_product" se [itype] corresponder a "(ip
score entity.metadata.threat.confidence_details Mapeado diretamente a partir do campo "score".
severity entity.metadata.threat.severity Convertido para letras maiúsculas e mapeado para o campo "entity.metadata.threat.severity" se corresponder a "LOW", "MEDIUM", "HIGH" ou "CRITICAL".
source entity.metadata.threat.detection_fields.source_label.value Mapeado diretamente a partir do campo "source".
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value Mapeado diretamente a partir do campo "source_feed_id".
srcip entity.entity.ip Unido em "entity.entity.ip" se [srcip] não estiver vazio e não for igual a [value].
state entity.metadata.threat.detection_fields.state_label.value Mapeado diretamente a partir do campo "state".
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value Mapeado diretamente a partir do campo "trusted_circle_ids".
update_id entity.metadata.threat.detection_fields.update_id_label.value Mapeado diretamente a partir do campo "update_id".
value entity.entity.file.full_path Mapeado para "entity.entity.file.full_path" se [category] corresponder a ".*?file".
value entity.entity.file.md5 Mapeado para "entity.entity.file.md5" se [category] corresponder a ".*?md5" e [value] for uma string hexadecimal de 32 carateres.
value entity.entity.file.sha1 Mapeado para "entity.entity.file.sha1" se ([category] corresponder a ".?md5" e [value] for uma string hexadecimal de 40 carateres) ou ([category] corresponder a ".?sha1" e [value] for uma string hexadecimal de 40 carateres).
value entity.entity.file.sha256 Mapeado para "entity.entity.file.sha256" se ([category] corresponder a ".?md5" e [value] for uma string hexadecimal e [file_type] não for "md5") ou ([category] corresponder a ".?sha256" e [value] for uma string hexadecimal).
value entity.entity.hostname Mapeado para "entity.entity.hostname" se ([category] corresponder a ".?domain") ou ([category] corresponder a ".?ip" ou ".*?proxy" e [not_ip] for verdadeiro).
value entity.entity.url Mapeado para "entity.entity.url" se ([category] corresponder a ".*?url") ou ([category] corresponder a "url" e [resource_uri] não estiver vazio).
N/A entity.metadata.collected_timestamp Preenchida com a data/hora do evento.
N/A entity.metadata.interval.end_time Definido para um valor constante de 253402300799 segundos.
N/A entity.metadata.interval.start_time Preenchida com a data/hora do evento.
N/A entity.metadata.vendor_name Definido como um valor constante de "IOC personalizado".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.