收集 Duo 驗證記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Duo 驗證記錄擷取至 Google Security Operations。剖析器會從 JSON 格式的訊息中擷取記錄。這項服務會將原始記錄資料轉換為統一資料模型 (UDM),對應使用者、裝置、應用程式、位置和驗證詳細資料等欄位,同時處理各種驗證因素和結果,將安全事件分類。剖析器也會執行資料清理、型別轉換和錯誤處理,確保資料品質和一致性。

事前準備

  • Google SecOps 執行個體
  • Duo 租戶的特殊權限存取權 (Admin API 應用程式)
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)

設定 Duo Admin API 應用程式

  1. 登入 Duo 管理面板
  2. 依序前往「Applications」>「Protect an Application」
  3. 新增 Admin API 應用程式。
  4. 複製並將下列值儲存至安全位置:
    • 整合金鑰 (ikey)
    • 密鑰 (skey)
    • API 主機名稱 (例如 api-XXXXXXXX.duosecurity.com)
  5. 在「Permissions」(權限) 中,啟用「Grant read log」(授予讀取記錄權限) (讀取驗證記錄)。
  6. 儲存應用程式。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照本使用指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 duo-auth-logs)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAuthObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json"
        }
      ]
    }
    
    
    • 如果您輸入的值區名稱不同,請替換 duo-auth-logs
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  5. 附加新建立的政策。

  6. 為角色命名 WriteDuoAuthToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 duo_auth_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WriteDuoAuthToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (duo_auth_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages)
    # Notes:
    # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch).
    # - Pagination via metadata.next_offset ("<millis>,<txid>").
    # - We save state (mintime_ms) in ms to resume next run without gaps.
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json")
    LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000)  # default 100, max 1000
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method=method.upper())
            req.add_header("Accept", "application/json")
            for k, v in _sign(method, host, path, params).items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _read_state_ms() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            val = json.loads(obj["Body"].read()).get("mintime")
            if val is None:
                return None
            # Backward safety: if seconds were stored, convert to ms
            return int(val) * 1000 if len(str(int(val))) <= 10 else int(val)
        except Exception:
            return None
    
    def _write_state_ms(mintime_ms: int):
        body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _write_page(payload: dict, when_epoch_s: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = int(time.time())
        # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms)
        maxtime_ms = (now_s - 120) * 1000
        mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000)  # 1 hour on first run
    
        page = 0
        total = 0
        next_offset = None
    
        while True:
            params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT}
            if next_offset:
                params["next_offset"] = next_offset
    
            data = _http("GET", "/admin/v2/logs/authentication", params)
            _write_page(data, maxtime_ms // 1000, page)
            page += 1
    
            resp = data.get("response")
            items = resp if isinstance(resp, list) else []
            total += len(items)
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if not next_offset:
                break
    
        # Advance window to maxtime_ms for next run
        _write_state_ms(maxtime_ms)
        return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  6. 輸入下列環境變數,並將 換成您的值:

    範例
    S3_BUCKET duo-auth-logs
    S3_PREFIX duo/auth/
    STATE_KEY duo/auth/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 500
  7. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your‑function」)。

  8. 選取「設定」分頁標籤。

  9. 在「一般設定」面板中,按一下「編輯」

  10. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式。
    • 名稱duo-auth-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」
  2. 提供下列設定詳細資料:
    • 使用者:輸入不重複的名稱 (例如 secops-reader)
    • 存取權類型:選取「存取金鑰 - 程式輔助存取」
    • 按一下「建立使用者」
  3. 附加最低讀取權限政策 (自訂):依序選取「使用者」secops-reader「權限」「新增權限」「直接附加政策」「建立政策」
  4. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 將名稱設為 secops-reader-policy

  6. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  7. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  8. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定動態饋給,擷取 Duo 驗證記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Duo Authentication Logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Duo Auth」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://duo-auth-logs/duo/auth/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

UDM 對應表

記錄欄位 UDM 對應 邏輯
access_device.browser target.resource.attribute.labels.value 如果存在 access_device.browser,其值會對應至 UDM。
access_device.hostname principal.hostname 如果 access_device.hostname 存在且不為空白,系統會將其值對應至 UDM。如果為空值且 event_type 為 USER_CREATION,則 event_type 會變更為 USER_UNCATEGORIZED。如果 access_device.hostname 為空值,且 hostname 欄位存在,系統會使用 hostname 的值。
access_device.ip principal.ip 如果 access_device.ip 存在且為有效的 IPv4 位址,系統會將其值對應至 UDM。如果不是有效的 IPv4 位址,系統會將其做為字串值新增至 additional.fields,並使用鍵 access_device.ip
access_device.location.city principal.location.city 如有,則該值會對應至 UDM。
access_device.location.country principal.location.country_or_region 如有,則該值會對應至 UDM。
access_device.location.state principal.location.state 如有,則該值會對應至 UDM。
access_device.os principal.platform 如果存在,該值會轉換為對應的 UDM 值 (MAC、WINDOWS、LINUX)。
access_device.os_version principal.platform_version 如有,則該值會對應至 UDM。
application.key target.resource.id 如有,則該值會對應至 UDM。
application.name target.application 如有,則該值會對應至 UDM。
auth_device.ip target.ip 如果存在且不是「無」,則值會對應至 UDM。
auth_device.location.city target.location.city 如有,則該值會對應至 UDM。
auth_device.location.country target.location.country_or_region 如有,則該值會對應至 UDM。
auth_device.location.state target.location.state 如有,則該值會對應至 UDM。
auth_device.name target.hostnametarget.user.phone_numbers 如果 auth_device.name 存在且是電話號碼 (正規化後),系統會將其新增至 target.user.phone_numbers。否則會對應至 target.hostname
client_ip target.ip 如果存在且不是「無」,則值會對應至 UDM。
client_section target.resource.attribute.labels.value 如果存在 client_section,系統會將其值對應至 UDM,並使用 client_section 做為鍵。
dn target.user.userid 如果存在 dn,但沒有 user.nameusername,系統會使用 grok 從 dn 欄位擷取 userid,並對應至 UDM。event_type 設為 USER_LOGIN。
event_type metadata.product_event_type」和「metadata.event_type 該值會對應至 metadata.product_event_type。這項屬性也用於判斷 metadata.event_type:「驗證」會變成 USER_LOGIN,「註冊」會變成 USER_CREATION,如果屬性空白或不是上述任一值,則會變成 GENERIC_EVENT。
factor extensions.auth.mechanism」和「extensions.auth.auth_details 這個值會轉換為對應的 UDM auth.mechanism 值 (HARDWARE_KEY、REMOTE_INTERACTIVE、LOCAL、OTP)。原始值也會對應至 extensions.auth.auth_details
hostname principal.hostname 如果存在且 access_device.hostname 為空,則值會對應至 UDM。
log_format target.resource.attribute.labels.value 如果存在 log_format,系統會將其值對應至 UDM,並使用 log_format 做為鍵。
log_level.__class_uuid__ target.resource.attribute.labels.value 如果存在 log_level.__class_uuid__,其值會對應至 UDM,並以 __class_uuid__ 做為鍵。
log_level.name target.resource.attribute.labels.value」和「security_result.severity 如果存在 log_level.name,其值會對應至 UDM,並以 name 做為鍵。如果值為「info」,security_result.severity 會設為 INFORMATIONAL。
log_logger.unpersistable target.resource.attribute.labels.value 如果存在 log_logger.unpersistable,其值會對應至 UDM,並以 unpersistable 做為鍵。
log_namespace target.resource.attribute.labels.value 如果存在 log_namespace,系統會將其值對應至 UDM,並使用 log_namespace 做為鍵。
log_source target.resource.attribute.labels.value 如果存在 log_source,系統會將其值對應至 UDM,並使用 log_source 做為鍵。
msg security_result.summary 如果存在且 reason 為空,則值會對應至 UDM。
reason security_result.summary 如有,則該值會對應至 UDM。
result security_result.action_details」和「security_result.action 如有,則值會對應至 security_result.action_details。「success」或「SUCCESS」會轉譯為 security_result.action ALLOW,否則為 BLOCK。
server_section target.resource.attribute.labels.value 如果存在 server_section,系統會將其值對應至 UDM,並使用 server_section 做為鍵。
server_section_ikey target.resource.attribute.labels.value 如果存在 server_section_ikey,系統會將其值對應至 UDM,並使用 server_section_ikey 做為鍵。
status security_result.action_details」和「security_result.action 如有的話,該值會對應至 security_result.action_details。「允許」會轉譯為 security_result.action ALLOW,「拒絕」則會轉譯為 BLOCK。
timestamp metadata.event_timestamp」和「event.timestamp 這個值會轉換為時間戳記,並對應至 metadata.event_timestampevent.timestamp
txid metadata.product_log_id」和「network.session_id 這個值會同時對應至 metadata.product_log_idnetwork.session_id
user.groups target.user.group_identifiers 陣列中的所有值都會加到 target.user.group_identifiers
user.key target.user.product_object_id 如有,則該值會對應至 UDM。
user.name target.user.userid 如有,則該值會對應至 UDM。
username target.user.userid 如果存在且不是 user.name,則值會對應至 UDM。event_type 設為 USER_LOGIN。
(剖析器邏輯) metadata.vendor_name 一律設為「DUO_SECURITY」。
(剖析器邏輯) metadata.product_name 一律設為「MULTI-FACTOR_AUTHENTICATION」。
(剖析器邏輯) metadata.log_type 取自原始記錄檔的頂層 log_type 欄位。
(剖析器邏輯) extensions.auth.type 一律設為「SSO」。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。