收集 Duo 實體內容記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Duo 實體內容資料擷取至 Google Security Operations。剖析器會先從原始 JSON 擷取欄位,然後將這些欄位對應至 UDM 屬性,藉此將 JSON 記錄轉換為統合式資料模型 (UDM)。可處理各種資料情境,包括使用者和資產資訊、軟體詳細資料和安全性標籤,確保 UDM 架構中的全面呈現。

事前準備

  • Google SecOps 執行個體
  • Duo 租戶的特殊權限存取權 (Admin API 應用程式)
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)

設定 Duo Admin API 應用程式

  1. 登入 Duo 管理面板
  2. 依序前往「應用程式」>「應用程式目錄」
  3. 新增 Admin API 應用程式。
  4. 記錄下列值:
    • 整合金鑰 (ikey)
    • 密鑰 (skey)
    • API 主機名稱 (例如 api-XXXXXXXX.duosecurity.com)
  5. 在「權限」中,啟用「授予資源 - 讀取」 (讀取使用者、群組、裝置/端點)。
  6. 儲存應用程式。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照本使用指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 duo-context)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-context/*"
        }
      ]
    }
    
    • 如果您輸入了其他值區名稱,請替換 duo-context
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  5. 附加新建立的政策。

  6. 為角色命名 WriteDuoToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 duo_entity_context_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WriteDuoToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (duo_entity_context_to_s3.py):

    #!/usr/bin/env python3
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    import boto3
    
    # Env
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/context/")
    # Default set can be adjusted via ENV
    RESOURCES = [r.strip() for r in os.environ.get(
        "RESOURCES",
        "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators"
    ).split(",") if r.strip()]
    # Duo paging: default 100; max 500 for these endpoints
    LIMIT = int(os.environ.get("LIMIT", "500"))
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        """RFC3986 encoding with '~' unescaped, keys sorted lexicographically."""
        if not params:
            return ""
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            ks = urllib.parse.quote(str(k), safe="~")
            vs = urllib.parse.quote(str(v), safe="~")
            parts.append(f"{ks}={vs}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1)."""
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8")
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _call(method: str, path: str, params: dict) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "")
        req = Request(url, method=method.upper())
        for k, v in _sign(method, host, path, params).items():
            req.add_header(k, v)
        with urlopen(req, timeout=60) as r:
            return json.loads(r.read().decode("utf-8"))
    
    def _write_json(obj: dict, when: float, resource: str, page: int) -> str:
        prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else ""
        key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"))
        return key
    
    def _fetch_resource(resource: str) -> dict:
        """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset."""
        path = f"/admin/v1/{resource}"
        offset = 0
        page = 0
        now = time.time()
        total_items = 0
    
        while True:
            params = {"limit": LIMIT, "offset": offset}
            data = _call("GET", path, params)
            _write_json(data, now, resource, page)
            page += 1
    
            resp = data.get("response")
            # most endpoints return a list; if not a list, count as 1 object page
            if isinstance(resp, list):
                total_items += len(resp)
            elif resp is not None:
                total_items += 1
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if next_offset is None:
                break
    
            # Duo returns next_offset as int
            try:
                offset = int(next_offset)
            except Exception:
                break
    
        return {"resource": resource, "pages": page, "objects": total_items}
    
    def lambda_handler(event=None, context=None):
        results = []
        for res in RESOURCES:
            results.append(_fetch_resource(res))
        return {"ok": True, "results": results}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  6. 輸入下列環境變數,並將 換成您的值。

    範例
    S3_BUCKET duo-context
    S3_PREFIX duo/context/
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 200
    RESOURCES users,groups,phones,endpoints,tokens,webauthncredentials
  7. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  8. 選取「設定」分頁標籤。

  9. 在「一般設定」面板中,按一下「編輯」

  10. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式。
    • 名稱duo-entity-context-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」
  2. 提供下列設定詳細資料:
    • 使用者:輸入不重複的名稱 (例如 secops-reader)
    • 存取權類型:選取「存取金鑰 - 程式輔助存取」
    • 按一下「建立使用者」
  3. 附加最低讀取權限政策 (自訂):依序選取「使用者」secops-reader「權限」「新增權限」「直接附加政策」「建立政策」
  4. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 將名稱設為 secops-reader-policy

  6. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  7. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  8. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定動態饋給,擷取 Duo 實體內容資料

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Duo Entity Context)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Duo Entity context data」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://duo-context/duo/context/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

UDM 對應表

記錄欄位 UDM 對應 邏輯
已啟用 entity.asset.deployment_status 如果「activated」為 false,則設為「DECOMISSIONED」,否則設為「ACTIVE」。
browsers.browser_family entity.asset.software.name 從原始記錄的「browsers」陣列中擷取。
browsers.browser_version entity.asset.software.version 從原始記錄的「browsers」陣列中擷取。
device_name entity.asset.hostname 直接從原始記錄對應。
disk_encryption_status entity.asset.attribute.labels.key: "disk_encryption_status", entity.asset.attribute.labels.value: 直接從原始記錄對應,並轉換為小寫。
電子郵件 entity.user.email_addresses 如果原始記錄包含「@」,則直接對應;否則,如果「username」或「username1」包含「@」,則使用這些名稱。
已加密 entity.asset.attribute.labels.key: "Encrypted", entity.asset.attribute.labels.value: 直接從原始記錄對應,並轉換為小寫。
epkey entity.asset.product_object_id 如果存在,則做為「product_object_id」使用,否則使用「phone_id」或「token_id」。
指紋 entity.asset.attribute.labels.key: "Finger Print", entity.asset.attribute.labels.value: 直接從原始記錄對應,並轉換為小寫。
firewall_status entity.asset.attribute.labels.key: "firewall_status", entity.asset.attribute.labels.value: 直接從原始記錄對應,並轉換為小寫。
hardware_uuid entity.asset.asset_id 如果存在,則做為「asset_id」使用,否則使用「user_id」。
last_seen entity.asset.last_discover_time 系統會將其剖析為 ISO8601 時間戳記並對應。
模型 entity.asset.hardware.model 直接從原始記錄對應。
數字 entity.user.phone_numbers 直接從原始記錄對應。
os_family entity.asset.platform_software.platform 根據值 (不區分大小寫) 對應至「WINDOWS」、「LINUX」或「MAC」。
os_version entity.asset.platform_software.platform_version 直接從原始記錄對應。
password_status entity.asset.attribute.labels.key: "password_status", entity.asset.attribute.labels.value: 直接從原始記錄對應,並轉換為小寫。
phone_id entity.asset.product_object_id 如果沒有「epkey」,則會做為「product_object_id」使用,否則會使用「token_id」。
security_agents.security_agent entity.asset.software.name 從原始記錄的「security_agents」陣列中擷取。
security_agents.version entity.asset.software.version 從原始記錄的「security_agents」陣列中擷取。
時間戳記 entity.metadata.collected_timestamp 在「metadata」物件中填入「collected_timestamp」欄位。
token_id entity.asset.product_object_id 如果沒有「epkey」和「phone_id」,則會做為「product_object_id」使用。
trusted_endpoint entity.asset.attribute.labels.key: "trusted_endpoint", entity.asset.attribute.labels.value: 直接從原始記錄對應,並轉換為小寫。
類型 entity.asset.type 如果原始記錄的「類型」包含「mobile」(不區分大小寫),請設為「MOBILE」,否則設為「LAPTOP」。
user_id entity.asset.asset_id 如果沒有「hardware_uuid」,則會做為「asset_id」使用。
users.email entity.user.email_addresses 如果這是「users」陣列中的第一位使用者,且包含「@」,則會做為「email_addresses」使用。
users.username entity.user.userid 擷取「@」前的使用者名稱,並在「users」陣列中做為第一個使用者時,當做「userid」使用。
entity.metadata.vendor_name 「Duo」
entity.metadata.product_name 「Duo 實體內容資料」
entity.metadata.entity_type 資產
entity.relations.entity_type 使用者
entity.relations.relationship OWNS

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。