收集 CSV 自訂 IOC 檔案

支援的國家/地區:

本文說明如何使用 Amazon S3,將 CSV 自訂 IOC 檔案擷取至 Google Security Operations。接著,系統會將這些欄位對應至 UDM,處理各種資料類型 (例如 IP、網域和雜湊),並在輸出內容中加入威脅詳細資料、實體資訊和嚴重程度。

事前準備

  • Google SecOps 執行個體
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)
  • 存取一或多個 CSV IOC 動態饋給網址 (HTTPS) 或提供 CSV 的內部端點

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照本使用指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 csv-ioc)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • 如果您輸入的值區名稱不同,請替換 csv-ioc
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  5. 附加新建立的政策。

  6. 為角色命名 WriteCsvIocToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 csv_custom_ioc_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WriteCsvIocToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (csv_custom_ioc_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  6. 輸入下列環境變數,並將 換成您的值:

    範例
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  8. 選取「設定」分頁標籤。

  9. 在「一般設定」面板中,按一下「編輯」

  10. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式。
    • 名稱csv-custom-ioc-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」
  2. 提供下列設定詳細資料:
    • 使用者:輸入不重複的名稱 (例如 secops-reader)
    • 存取權類型:選取「存取金鑰 - 程式輔助存取」
    • 按一下「建立使用者」
  3. 附加最低讀取權限政策 (自訂):依序選取「使用者」secops-reader「權限」「新增權限」「直接附加政策」「建立政策」
  4. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 將名稱設為 secops-reader-policy

  6. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  7. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  8. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定動態饋給,以便擷取 CSV 自訂 IOC 檔案

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 CSV Custom IOC)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「CSV Custom IOC」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://csv-ioc/csv-ioc/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

UDM 對應表

記錄欄位 UDM 對應 邏輯
asn entity.metadata.threat.detection_fields.asn_label.value 直接從「asn」欄位對應。
category entity.metadata.threat.category_details 直接從「類別」欄位對應。
classification entity.metadata.threat.category_details 附加至「classification - 」,並對應至「entity.metadata.threat.category_details」欄位。
column2 entity.entity.hostname 如果 [category] 符合「.?ip」或「.?proxy」,且 [not_ip] 為 true,則對應至「entity.entity.hostname」。
column2 entity.entity.ip 如果 [category] 符合「.?ip」或「.?proxy」,且 [not_ip] 為 false,則會合併至「entity.entity.ip」。
confidence entity.metadata.threat.confidence_score 轉換為浮點數,並對應至「entity.metadata.threat.confidence_score」欄位。
country entity.entity.location.country_or_region 直接從「國家/地區」欄位對應。
date_first entity.metadata.threat.first_discovered_time 系統會將此值剖析為 ISO8601,並對應至「entity.metadata.threat.first_discovered_time」欄位。
date_last entity.metadata.threat.last_updated_time 系統會將這個值剖析為 ISO8601,並對應至「entity.metadata.threat.last_updated_time」欄位。
detail entity.metadata.threat.summary 直接從「詳細資料」欄位對應。
detail2 entity.metadata.threat.description 直接從「detail2」欄位對應。
domain entity.entity.hostname 直接從「網域」欄位對應。
email entity.entity.user.email_addresses 已併入「entity.entity.user.email_addresses」欄位。
id entity.metadata.product_entity_id 附加至「id - 」,並對應至「entity.metadata.product_entity_id」欄位。
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value 直接從「import_session_id」欄位對應。
itype entity.metadata.threat.detection_fields.itype_label.value 直接從「itype」欄位對應。
lat entity.entity.location.region_latitude 轉換為浮點數,並對應至「entity.entity.location.region_latitude」欄位。
lon entity.entity.location.region_longitude 轉換為浮點數,並對應至「entity.entity.location.region_longitude」欄位。
maltype entity.metadata.threat.detection_fields.maltype_label.value 直接從「maltype」欄位對應。
md5 entity.entity.file.md5 直接從「md5」欄位對應。
media entity.metadata.threat.detection_fields.media_label.value 直接從「媒體」欄位對應。
media_type entity.metadata.threat.detection_fields.media_type_label.value 直接從「media_type」欄位對應。
org entity.metadata.threat.detection_fields.org_label.value 直接從「org」欄位對應。
resource_uri entity.entity.url 如果 [itype] 不符合「(ip
resource_uri entity.metadata.threat.url_back_to_product 如果 [itype] 符合「(ip
score entity.metadata.threat.confidence_details 直接從「分數」欄位對應。
severity entity.metadata.threat.severity 如果符合「LOW」、「MEDIUM」、「HIGH」或「CRITICAL」,則會轉換為大寫,並對應至「entity.metadata.threat.severity」欄位。
source entity.metadata.threat.detection_fields.source_label.value 直接從「來源」欄位對應。
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value 直接從「source_feed_id」欄位對應。
srcip entity.entity.ip 如果 [srcip] 不為空白且不等於 [value],則會合併至「entity.entity.ip」。
state entity.metadata.threat.detection_fields.state_label.value 直接從「state」欄位對應。
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value 直接從「trusted_circle_ids」欄位對應。
update_id entity.metadata.threat.detection_fields.update_id_label.value 直接從「update_id」欄位對應。
value entity.entity.file.full_path 如果 [category] 符合「.*?file」,則會對應至「entity.entity.file.full_path」。
value entity.entity.file.md5 如果 [category] 符合「.*?md5」,且 [value] 是 32 個字元的十六進位字串,則會對應至「entity.entity.file.md5」。
value entity.entity.file.sha1 如果 [category] 符合「.?md5」,且 [value] 是 40 個字元的十六進位字串,或如果 [category] 符合「.?sha1」,且 [value] 是 40 個字元的十六進位字串,則會對應至「entity.entity.file.sha1」。
value entity.entity.file.sha256 如果 ([category] 符合「.?md5」且 [value] 是十六進位字串,且 [file_type] 不是「md5」) 或 ([category] 符合「.?sha256」且 [value] 是十六進位字串),則會對應至「entity.entity.file.sha256」。
value entity.entity.hostname 如果 ([category] 符合「.?domain」),或 ([category] 符合「.?ip」或「.*?proxy」,且 [not_ip] 為 true),則對應至「entity.entity.hostname」。
value entity.entity.url 如果 ([category] 符合「.*?url」) 或 ([category] 符合「url」且 [resource_uri] 不為空白),則對應至「entity.entity.url」。
不適用 entity.metadata.collected_timestamp 填入事件時間戳記。
不適用 entity.metadata.interval.end_time 設為 253402300799 秒的常數值。
不適用 entity.metadata.interval.start_time 填入事件時間戳記。
不適用 entity.metadata.vendor_name 設為「Custom IOC」常數值。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。