收集 Duo 電話記錄
本文說明如何使用 Amazon S3,將 Duo 電話記錄擷取至 Google Security Operations。剖析器會從記錄檔中擷取欄位,然後轉換並對應至統一資料模型 (UDM)。這項服務可處理各種 Duo 記錄格式、轉換時間戳記、擷取使用者資訊、網路詳細資料和安全性結果,最後將輸出內容轉換為標準化 UDM 格式。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體。
- 具有「擁有者」角色的 Duo 管理面板進階存取權。
- AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。
收集 Duo 必要條件 (API 憑證)
- 以具備「擁有者」角色的管理員身分登入 Duo 管理面板。
- 依序前往「應用程式」>「應用程式目錄」。
- 在目錄中找到「Admin API」項目。
- 按一下「+ 新增」即可建立應用程式。
- 複製下列詳細資料並存放在安全位置:
- 整合金鑰
- 密鑰
- API 主機名稱 (例如
api-yyyyyyyy.duosecurity.com
)
- 在「權限」部分,取消選取「授予讀取記錄」以外的所有權限選項。
- 按一下 [儲存變更]。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
duo-telephony-logs
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 .CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」(政策)。
- 按一下「建立政策」>「JSON」分頁。
- 複製並貼上下列政策。
政策 JSON (如果您輸入的 bucket 名稱不同,請替換
duo-telephony-logs
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-telephony-logs/duo-telephony/state.json" } ] }
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
duo-telephony-lambda-role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 duo-telephony-logs-collector
執行階段 Python 3.13 架構 x86_64 執行角色 duo-telephony-lambda-role
建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (
duo-telephony-logs-collector.py
)。import json import boto3 import os import hmac import hashlib import base64 import urllib.parse import urllib.request import email.utils from datetime import datetime, timedelta, timezone from typing import Dict, Any, List, Optional from botocore.exceptions import ClientError s3 = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Duo telephony logs and store them in S3. """ try: # Get configuration from environment variables bucket_name = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'].rstrip('/') state_key = os.environ['STATE_KEY'] integration_key = os.environ['DUO_IKEY'] secret_key = os.environ['DUO_SKEY'] api_hostname = os.environ['DUO_API_HOST'] # Load state state = load_state(bucket_name, state_key) # Calculate time range now = datetime.now(timezone.utc) if state.get('last_offset'): # Continue from last offset next_offset = state['last_offset'] logs = [] has_more = True else: # Start from last timestamp or 24 hours ago mintime = state.get('last_timestamp_ms', int((now - timedelta(hours=24)).timestamp() * 1000)) # Apply 2-minute delay as recommended by Duo maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000) next_offset = None logs = [] has_more = True # Fetch logs with pagination total_fetched = 0 max_iterations = int(os.environ.get('MAX_ITERATIONS', '10')) while has_more and total_fetched < max_iterations: if next_offset: # Use offset for pagination params = { 'limit': '1000', 'next_offset': next_offset } else: # Initial request with time range params = { 'mintime': str(mintime), 'maxtime': str(maxtime), 'limit': '1000', 'sort': 'ts:asc' } # Make API request with retry logic response = duo_api_call_with_retry( 'GET', api_hostname, '/admin/v2/logs/telephony', params, integration_key, secret_key ) if 'items' in response: logs.extend(response['items']) total_fetched += 1 # Check for more data if 'metadata' in response and 'next_offset' in response['metadata']: next_offset = response['metadata']['next_offset'] state['last_offset'] = next_offset else: has_more = False state['last_offset'] = None # Update timestamp for next run if logs: # Get the latest timestamp from logs latest_ts = max([log.get('ts', '') for log in logs]) if latest_ts: # Convert ISO timestamp to milliseconds dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00')) state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1 else: has_more = False # Save logs to S3 if any were fetched if logs: timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') key = f"{s3_prefix}/telephony_{timestamp}.json" # Format logs as newline-delimited JSON log_data = '\n'.join(json.dumps(log) for log in logs) s3.put_object( Bucket=bucket_name, Key=key, Body=log_data.encode('utf-8'), ContentType='application/x-ndjson' ) print(f"Saved {len(logs)} telephony logs to s3://{bucket_name}/{key}") else: print("No new telephony logs found") # Save state save_state(bucket_name, state_key, state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Successfully processed {len(logs)} telephony logs', 'logs_count': len(logs) }) } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def duo_api_call_with_retry(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str, max_retries: int = 3) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API with retry logic. """ for attempt in range(max_retries): try: return duo_api_call(method, host, path, params, ikey, skey) except Exception as e: if '429' in str(e) or '5' in str(e)[:1]: # Rate limit or server error if attempt < max_retries - 1: wait_time = (2 ** attempt) * 2 # Exponential backoff print(f"Retrying after {wait_time} seconds...") import time time.sleep(wait_time) continue raise def duo_api_call(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API. """ # Create canonical string for signing using RFC 2822 date format now = email.utils.formatdate() canon = [now, method.upper(), host.lower(), path] # Add parameters args = [] for key in sorted(params.keys()): val = params[key] args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}") canon.append('&'.join(args)) canon_str = '\n'.join(canon) # Sign the request sig = hmac.new( skey.encode('utf-8'), canon_str.encode('utf-8'), hashlib.sha1 ).hexdigest() # Create authorization header auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8') # Build URL url = f"https://{host}{path}" if params: url += '?' + '&'.join(args) # Make request req = urllib.request.Request(url) req.add_header('Authorization', f'Basic {auth}') req.add_header('Date', now) req.add_header('Host', host) req.add_header('User-Agent', 'duo-telephony-s3-ingestor/1.0') try: with urllib.request.urlopen(req, timeout=30) as response: data = json.loads(response.read().decode('utf-8')) if data.get('stat') == 'OK': return data.get('response', {}) else: raise Exception(f"API error: {data.get('message', 'Unknown error')}") except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') raise Exception(f"HTTP error {e.code}: {error_body}") def load_state(bucket: str, key: str) -> Dict[str, Any]: """Load state from S3.""" try: response = s3.get_object(Bucket=bucket, Key=key) return json.loads(response['Body'].read().decode('utf-8')) except ClientError as e: if e.response.get('Error', {}).get('Code') in ('NoSuchKey', '404'): return {} print(f"Error loading state: {e}") return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(bucket: str, key: str, state: Dict[str, Any]): """Save state to S3.""" try: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}")
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下表提供的環境變數,並將範例值換成您的值。
環境變數
鍵 範例值 S3_BUCKET
duo-telephony-logs
S3_PREFIX
duo-telephony/
STATE_KEY
duo-telephony/state.json
DUO_IKEY
<your-integration-key>
DUO_SKEY
<your-secret-key>
DUO_API_HOST
api-yyyyyyyy.duosecurity.com
MAX_ITERATIONS
10
建立函式後,請留在該函式的頁面 (或依序開啟「Lambda」>「Functions」>「duo-telephony-logs-collector」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
duo-telephony-logs-collector
。 - 名稱:
duo-telephony-logs-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 前往 AWS 控制台 > IAM > 使用者。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-telephony-logs" } ] }
Name =
secops-reader-policy
。依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
為
secops-reader
建立存取金鑰:依序點選「安全憑證」>「存取金鑰」。按一下「建立存取金鑰」。
下載
.CSV
。(您會將這些值貼到動態饋給中)。
在 Google SecOps 中設定動態饋給,以便擷取 Duo 電話記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Duo Telephony logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Duo 電話記錄」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://duo-telephony-logs/duo-telephony/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
context |
metadata.product_event_type |
直接從原始記錄中的 context 欄位對應。 |
credits |
security_result.detection_fields.value |
直接從原始記錄的 credits 欄位對應,以巢狀形式嵌入具有對應鍵 credits 的 detection_fields 物件底下。 |
eventtype |
security_result.detection_fields.value |
直接從原始記錄的 eventtype 欄位對應,以巢狀形式嵌入具有對應鍵 eventtype 的 detection_fields 物件底下。 |
host |
principal.hostname |
如果不是 IP 位址,則直接從原始記錄中的 host 欄位對應。在剖析器中設為「ALLOW」的靜態值。在剖析器中設為「MECHANISM_UNSPECIFIED」的靜態值。從原始記錄的 timestamp 欄位剖析而來,代表自 Epoch 起算的秒數。如果原始記錄中同時存在 context 和 host 欄位,請設為「USER_UNCATEGORIZED」。如果只有 host ,請設為「STATUS_UPDATE」。否則請設為「GENERIC_EVENT」。直接取自原始記錄的 log_type 欄位。在剖析器中設為「Telephony」的靜態值。在剖析器中設為「Duo」的靜態值。 |
phone |
principal.user.phone_numbers |
直接從原始記錄中的 phone 欄位對應。 |
phone |
principal.user.userid |
直接從原始記錄中的 phone 欄位對應。在剖析器中設為「INFORMATIONAL」的靜態值。在剖析器中設為「Duo Telephony」的靜態值。 |
timestamp |
metadata.event_timestamp |
從原始記錄的 timestamp 欄位剖析而來,代表自 Epoch 起算的秒數。 |
type |
security_result.summary |
直接從原始記錄中的 type 欄位對應。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。