收集 Duo 管理員記錄
本文說明如何使用 Amazon S3,將 Duo 管理員記錄擷取至 Google Security Operations。剖析器會從記錄 (JSON 格式) 擷取欄位,並將這些欄位對應至統合資料模型 (UDM)。系統會以不同方式處理各種 Duo action
類型 (登入、使用者管理、群組管理),並根據動作和可用資料 (包括使用者詳細資料、驗證因素和安全性結果) 填入相關 UDM 欄位。此外,這項服務還會執行資料轉換作業,例如合併 IP 位址、轉換時間戳記及處理錯誤。
事前準備
- Google SecOps 執行個體
- Duo 租戶的特殊權限存取權 (Admin API 應用程式)
- AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)
設定 Duo Admin API 應用程式
- 登入 Duo 管理面板。
- 依序前往「應用程式」>「應用程式目錄」。
- 新增 Admin API 應用程式。
- 記錄下列值:
- 整合金鑰 (ikey)
- 密鑰 (skey)
- API 主機名稱 (例如
api-XXXXXXXX.duosecurity.com
)
- 在「權限」中,啟用「授予讀取記錄」 (讀取管理員記錄)。
- 儲存應用程式。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照本使用指南建立 Amazon S3 值區:建立值區
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
duo-admin-logs
)。 - 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }
- 如果您輸入了其他值區名稱,請替換
duo-admin-logs
:
- 如果您輸入了其他值區名稱,請替換
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
WriteDuoAdminToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 duo_admin_to_s3
執行階段 Python 3.13 架構 x86_64 執行角色 WriteDuoAdminToS3Role
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
duo_admin_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」。
輸入下列環境變數,並將 換成您的值。
鍵 範例 S3_BUCKET
duo-admin-logs
S3_PREFIX
duo/admin/
STATE_KEY
duo/admin/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式。
- 名稱:
duo-admin-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」。
- 提供下列設定詳細資料:
- 使用者:輸入不重複的名稱 (例如
secops-reader
) - 存取權類型:選取「存取金鑰 - 程式輔助存取」
- 按一下「建立使用者」。
- 使用者:輸入不重複的名稱 (例如
- 附加最低讀取權限政策 (自訂):依序選取「使用者」
secops-reader
「權限」「新增權限」「直接附加政策」「建立政策」 在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定動態饋給,擷取 Duo 管理員記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Duo Administrator Logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Duo 管理員記錄」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://duo-admin-logs/duo/admin/
- 來源刪除選項:根據偏好選取刪除選項。
- 檔案存在時間上限:預設為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
action |
metadata.product_event_type |
原始記錄中的 action 欄位值。 |
desc |
metadata.description |
原始記錄 description 物件的 desc 欄位值。 |
description._status |
target.group.attribute.labels.value |
原始記錄中 description 物件的 _status 欄位值,特別是在處理群組相關動作時。這個值會放在「labels」陣列中,並對應「status」的「key」。 |
description.desc |
metadata.description |
原始記錄 description 物件的 desc 欄位值。 |
description.email |
target.user.email_addresses |
原始記錄 description 物件的 email 欄位值。 |
description.error |
security_result.summary |
原始記錄 description 物件的 error 欄位值。 |
description.factor |
extensions.auth.auth_details |
原始記錄 description 物件的 factor 欄位值。 |
description.groups.0._status |
target.group.attribute.labels.value |
原始記錄 description 物件中 groups 陣列第一個元素的 _status 欄位值。這個值會放在「labels」陣列中,並對應「status」的「key」。 |
description.groups.0.name |
target.group.group_display_name |
原始記錄 description 物件中 groups 陣列第一個元素的 name 欄位值。 |
description.ip_address |
principal.ip |
原始記錄 description 物件的 ip_address 欄位值。 |
description.name |
target.group.group_display_name |
原始記錄 description 物件的 name 欄位值。 |
description.realname |
target.user.user_display_name |
原始記錄 description 物件的 realname 欄位值。 |
description.status |
target.user.attribute.labels.value |
原始記錄 description 物件的 status 欄位值。這個值會放在「labels」陣列中,並對應「status」的「key」。 |
description.uname |
target.user.email_addresses 或target.user.userid |
原始記錄 description 物件的 uname 欄位值。如果符合電子郵件地址格式,則會對應至 email_addresses ;否則會對應至 userid 。 |
host |
principal.hostname |
原始記錄中的 host 欄位值。 |
isotimestamp |
metadata.event_timestamp.seconds |
原始記錄中的 isotimestamp 欄位值,已轉換為 Epoch 秒數。 |
object |
target.group.group_display_name |
原始記錄中的 object 欄位值。 |
timestamp |
metadata.event_timestamp.seconds |
原始記錄中的 timestamp 欄位值。 |
username |
target.user.userid 或principal.user.userid |
如果 action 欄位包含「login」,則值會對應至 target.user.userid 。否則會對應至 principal.user.userid 。如果 action 欄位包含「login」,請設為「USERNAME_PASSWORD」。由剖析器根據 action 欄位判斷。可能的值:USER_LOGIN 、GROUP_CREATION 、USER_UNCATEGORIZED 、GROUP_DELETION 、USER_CREATION 、GROUP_MODIFICATION 、GENERIC_EVENT 。一律設為「DUO_ADMIN」。一律設為「MULTI-FACTOR_AUTHENTICATION」。一律設為「DUO_SECURITY」。如果 eventtype 欄位包含「admin」,請設為「ADMINISTRATOR」。由剖析器根據 action 欄位判斷。如果 action 欄位包含「error」,請設為「BLOCK」;否則請設為「ALLOW」。填入 target.group.attribute.labels 時,請一律設為「狀態」。填入 target.user.attribute.labels 時,請一律設為「狀態」。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。