收集 BeyondTrust Endpoint Privilege Management (EPM) 記錄
本文說明如何使用兩種不同方法,將 BeyondTrust Endpoint Privilege Management (EPM) 記錄擷取至 Google Security Operations:以 EC2 為基礎的收集作業,以及以 Amazon S3 為基礎的 AWS Lambda 收集作業。剖析器會將 BeyondTrust Endpoint 的原始 JSON 記錄資料轉換為符合 Chronicle UDM 的結構化格式。這個函式會先初始化各種欄位的預設值,然後剖析 JSON 酬載,接著將原始記錄中的特定欄位對應至 event.idm.read_only_udm
物件中的對應 UDM 欄位。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- BeyondTrust Endpoint Privilege Management 租戶或 API 的特殊權限
- AWS 的具備權限存取權 (S3、IAM、Lambda/EC2、EventBridge)
選擇整合方式
您可以選擇下列其中一種整合方式:
- 選項 1:以 EC2 為基礎的收集作業:使用 EC2 執行個體搭配排定的指令碼,收集記錄
- 選項 2:以 AWS Lambda 為基礎的收集作業:使用無伺服器 Lambda 函式和 EventBridge 排程
選項 1:以 EC2 為基礎的集合
設定 AWS IAM,以便擷取 Google SecOps 資料
- 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 BeyondTrust EPM 的 API 存取權
- 以管理員身分登入 BeyondTrust Privilege Management 網頁控制台。
- 依序前往「設定」>「設定」>「API 設定」。
- 按一下「建立 API 帳戶」。
- 提供下列設定詳細資料:
- 「Name」(名稱):輸入
Google SecOps Collector
。 - API 存取權:視需要啟用「稽核 (讀取)」和其他範圍。
- 「Name」(名稱):輸入
- 複製並儲存「用戶端 ID」和「用戶端密鑰」。
- 複製 API 基礎網址,通常是
https://<your-tenant>-services.pm.beyondtrustcloud.com
(您會將此網址做為 BPT_API_URL 使用)。
建立 AWS S3 儲存空間
- 登入 AWS 管理主控台。
- 前往 AWS 控制台 > 服務 > S3 > 建立 bucket。
- 提供下列設定詳細資料:
- Bucket name:
my-beyondtrust-logs
。 - 「Region」(地區):[你的選擇] >「Create」(建立)。
- Bucket name:
為 EC2 建立 IAM 角色
- 登入 AWS 管理主控台。
- 依序前往「AWS Console」>「Services」>「IAM」>「Roles」>「Create role」。
- 提供下列設定詳細資料:
- 信任的實體:AWS 服務 > EC2 > 下一步。
- 附加權限:AmazonS3FullAccess (或值區的範圍政策) >「下一步」。
- 角色名稱:
EC2-S3-BPT-Writer
> 建立角色。
啟動及設定 EC2 Collector VM
- 登入 AWS 管理主控台。
- 前往「服務」。
- 在搜尋列中輸入「EC2」EC2並選取。
- 在 EC2 資訊主頁中,按一下「Instances」(執行個體)。
- 按一下「啟動執行個體」。
- 提供下列設定詳細資料:
- 「Name」(名稱):輸入
BPT-Log-Collector
。 - AMI:選取「Ubuntu Server 22.04 LTS」。
- 執行個體類型:t3.micro (或更大),然後按一下「下一步」。
- 網路:確認「網路」設定已設為預設虛擬私有雲。
- IAM 角色:從選單中選取 EC2-S3-BPT-Writer IAM 角色。
- 自動指派公開 IP:啟用 (或確認可透過 VPN 連線) > 點選「下一步」。
- 新增儲存空間:保留預設儲存空間設定 (8 GiB),然後點選「下一步」。
- 選取「建立新的安全性群組」。
- 連入規則:按一下「新增規則」。
- 類型:選取「SSH」。
- 通訊埠:22。
- 來源:您的 IP
- 按一下「檢閱並啟動」。
- 選取或建立金鑰組。
- 按一下「下載金鑰配對」。
- 儲存下載的 PEM 檔案。您需要這個檔案,才能使用 SSH 連線至執行個體。
- 「Name」(名稱):輸入
- 使用 SSH 連線至虛擬機器 (VM)。
安裝收集器必備條件
執行下列指令:
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
更新系統並安裝依附元件:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
建立目錄和狀態檔案:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
初始化 (例如設為 1 小時前):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
部署 BeyondTrust EPM 收集器指令碼
建立專案資料夾:
mkdir ~/bpt-collector && cd ~/bpt-collector
匯出必要環境變數 (例如在
~/.bashrc
中):export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"
建立
collector_bpt.py
並輸入下列程式碼:#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()
將其設為可執行:
chmod +x collector_bpt.py
使用 Cron 安排每日排程
執行下列指令:
crontab -e
在世界標準時間午夜新增每日工作:
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
選項 2:以 AWS Lambda 為基礎的收集作業
收集 BeyondTrust EPM 必備條件
- 以管理員身分登入 BeyondTrust Privilege Management 網頁控制台。
- 依序前往「System Configuration」>「REST API」>「Tokens」。
- 按一下「新增權杖」。
- 提供下列設定詳細資料:
- 「Name」(名稱):輸入
Google SecOps Collector
。 - 範圍:選取「Audit:Read」和其他必要範圍。
- 「Name」(名稱):輸入
- 按一下「儲存」,然後複製權杖值。
- 複製下列詳細資料並儲存於安全位置:
- API 基礎網址:您的 BeyondTrust EPM API 網址 (例如
https://yourtenant-services.pm.beyondtrustcloud.com
)。 - 用戶端 ID:來自 OAuth 應用程式設定。
- 用戶端密鑰:來自 OAuth 應用程式設定。
- API 基礎網址:您的 BeyondTrust EPM API 網址 (例如
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照本使用指南建立 Amazon S3 值區:建立值區。
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
beyondtrust-epm-logs-bucket
)。 - 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 管理控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
複製並貼上下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- 如果您輸入的值區名稱不同,請替換
beyondtrust-epm-logs-bucket
。
- 如果您輸入的值區名稱不同,請替換
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策和代管政策 AWSLambdaBasicExecutionRole (適用於 CloudWatch 記錄)。
為角色命名
BeyondTrustEPMLogExportRole
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
- 請提供下列設定詳細資料:
設定 | 值 |
---|---|
名稱 | BeyondTrustEPMLogExport |
執行階段 | Python 3.13 |
架構 | x86_64 |
執行角色 | BeyondTrustEPMLogExportRole |
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
BeyondTrustEPMLogExport.py
):import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'
依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」。
輸入下列環境變數,並將 換成您的值。
鍵 範例值 S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
BeyondTrustEPMLogExport
。 - 名稱:
BeyondTrustEPMLogExport-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 依序前往 AWS 管理中心 > IAM > 使用者 > 新增使用者。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
設定動態饋給 (兩種方法)
如要設定動態消息,請按照下列步驟操作:
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
BeyondTrust EPM logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「BeyondTrust Endpoint Privilege Management」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:值區 URI
s3://your-log-bucket-name/
。請將your-log-bucket-name
替換為實際值區名稱。
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:值區 URI
- 點選「下一步」。
- 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
agent.id |
principal.asset.attribute.labels.value |
已對應至金鑰為「agent_id 」的標籤 |
agent.version |
principal.asset.attribute.labels.value |
已對應至金鑰為「agent_version 」的標籤 |
ecs.version |
principal.asset.attribute.labels.value |
已對應至金鑰為「ecs_version 」的標籤 |
event_data.reason |
metadata.description |
原始記錄中的事件說明 |
event_datas.ActionId |
metadata.product_log_id |
產品專屬記錄 ID |
file.path |
principal.file.full_path |
活動中的完整檔案路徑 |
headers.content_length |
additional.fields.value.string_value |
已對應至金鑰為「content_length 」的標籤 |
headers.content_type |
additional.fields.value.string_value |
已對應至金鑰為「content_type 」的標籤 |
headers.http_host |
additional.fields.value.string_value |
已對應至金鑰為「http_host 」的標籤 |
headers.http_version |
network.application_protocol_version |
HTTP 通訊協定版本 |
headers.request_method |
network.http.method |
HTTP 要求方法 |
host.hostname |
principal.hostname |
主要主機名稱 |
host.hostname |
principal.asset.hostname |
主要資產主機名稱 |
host.ip |
principal.asset.ip |
主要資產 IP 位址 |
host.ip |
principal.ip |
主體 IP 位址 |
host.mac |
principal.mac |
主要 MAC 位址 |
host.os.platform |
principal.platform |
如果等於 macOS,則設為 MAC |
host.os.version |
principal.platform_version |
作業系統版本 |
labels.related_item_id |
metadata.product_log_id |
相關項目 ID |
process.command_line |
principal.process.command_line |
程序指令列 |
process.name |
additional.fields.value.string_value |
已對應至金鑰為「process_name 」的標籤 |
process.parent.name |
additional.fields.value.string_value |
已對應至金鑰為「process_parent_name 」的標籤 |
process.parent.pid |
principal.process.parent_process.pid |
父項程序 PID 轉換為字串 |
process.pid |
principal.process.pid |
程序 PID 轉換為字串 |
user.id |
principal.user.userid |
使用者 ID |
user.name |
principal.user.user_display_name |
使用者顯示名稱 |
不適用 | metadata.event_timestamp |
事件時間戳記設為記錄項目時間戳記 |
不適用 | metadata.event_type |
如果沒有主體,則為 GENERIC_EVENT;否則為 STATUS_UPDATE |
不適用 | network.application_protocol |
如果 http_version 欄位包含 HTTP,請設為 HTTP |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。