收集 BeyondTrust Endpoint Privilege Management (EPM) 日志
本文档介绍了如何使用两种不同的方法将 BeyondTrust Endpoint Privilege Management (EPM) 日志注入到 Google Security Operations:基于 EC2 的收集和基于 AWS Lambda 的收集(使用 Amazon S3)。该解析器专注于将 BeyondTrust Endpoint 的原始 JSON 日志数据转换为符合 Chronicle UDM 的结构化格式。它首先初始化各种字段的默认值,然后解析 JSON 载荷,随后将原始日志中的特定字段映射到 event.idm.read_only_udm 对象中的相应 UDM 字段。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
 - 对 BeyondTrust Endpoint Privilege Management 租户或 API 的特权访问权限
 - 对 AWS(S3、IAM、Lambda/EC2、EventBridge)的特权访问权限
 
选择集成方法
您可以选择以下两种集成方法之一:
- 方法 1:基于 EC2 的收集:使用具有预定脚本的 EC2 实例进行日志收集
 - 选项 2:基于 AWS Lambda 的收集:使用无服务器 Lambda 函数和 EventBridge 调度
 
选项 1:基于 EC2 的集合
为 Google SecOps 注入配置 AWS IAM
- 按照以下用户指南创建用户:创建 IAM 用户。
 - 选择创建的用户。
 - 选择安全凭据标签页。
 - 在访问密钥部分中,点击创建访问密钥。
 - 选择第三方服务作为使用情形。
 - 点击下一步。
 - 可选:添加说明标记。
 - 点击创建访问密钥。
 - 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
 - 点击完成。
 - 选择权限标签页。
 - 在权限政策部分中,点击添加权限。
 - 选择添加权限。
 - 选择直接附加政策。
 - 搜索并选择 AmazonS3FullAccess 政策。
 - 点击下一步。
 - 点击添加权限。
 
配置 BeyondTrust EPM 以实现 API 访问
- 以管理员身份登录 BeyondTrust Privilege Management Web 控制台。
 - 依次前往配置 > 设置 > API 设置。
 - 点击 Create an API Account。
 - 提供以下配置详细信息:
- 名称:输入 
Google SecOps Collector。 - API 访问权限:根据需要启用审核(读取)和其他范围。
 
 - 名称:输入 
 - 复制并保存客户端 ID 和客户端密钥。
 - 复制您的 API 基准网址;该网址通常为 
https://<your-tenant>-services.pm.beyondtrustcloud.com(您将使用此网址作为 BPT_API_URL)。 
创建 AWS S3 存储分区
- 登录 AWS Management Console。
 - 前往 AWS 控制台 > 服务 > S3 > 创建存储桶。
 - 提供以下配置详细信息:
- 存储分区名称:
my-beyondtrust-logs。 - 区域:[您的选择] > 创建。
 
 - 存储分区名称:
 
为 EC2 创建 IAM 角色
- 登录 AWS Management Console。
 - 依次前往 AWS 控制台 > 服务 > IAM > 角色 > 创建角色。
 - 提供以下配置详细信息:
- 可信实体:AWS 服务 > EC2 > 下一步。
 - 附加权限:AmazonS3FullAccess(或您存储桶的范围限定政策)> 下一步。
 - 角色名称:
EC2-S3-BPT-Writer> 创建角色。 
 
启动并配置 EC2 收集器虚拟机
- 登录 AWS Management Console。
 - 前往服务。
 - 在搜索栏中,输入 EC2,然后选择该选项。
 - 在 EC2 信息中心中,点击实例。
 - 点击启动实例。
 - 提供以下配置详细信息:
- 名称:输入 
BPT-Log-Collector。 - AMI:选择 Ubuntu Server 22.04 LTS。
 - 实例类型:t3.micro(或更大),然后点击下一步。
 - 网络:确保网络设置已设置为您的默认 VPC。
 - IAM 角色:从菜单中选择 EC2-S3-BPT-Writer IAM 角色。
 - 自动分配公共 IP:启用(或确保您可以使用 VPN 访问)> 下一步。
 - 添加存储空间:保留默认存储空间配置 (8 GiB),然后点击下一步。
 - 选择创建新的安全组。
 - 入站规则:点击添加规则。
 - 类型:选择 SSH。
 - 端口:22。
 - 来源:您的 IP
 - 点击查看并发布。
 - 选择或创建密钥对。
 - 点击下载密钥对。
 - 保存下载的 PEM 文件。您需要使用此文件通过 SSH 连接到实例。
 
 - 名称:输入 
 - 使用 SSH 连接到您的虚拟机 (VM)。
 
安装收集器前提条件
运行以下命令:
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>更新系统并安装依赖项:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3创建目录和状态文件:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt对其进行初始化(例如,初始化为 1 小时前):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
部署 BeyondTrust EPM 收集器脚本
创建项目文件夹:
mkdir ~/bpt-collector && cd ~/bpt-collector导出所需的环境变量(例如,在
~/.bashrc中):export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"创建
collector_bpt.py并输入以下代码:#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()使其可执行:
chmod +x collector_bpt.py
使用 Cron 安排每日任务
运行以下命令:
crontab -e添加世界协调时间每天零点运行的作业:
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
选项 2:基于 AWS Lambda 的收集
收集 BeyondTrust EPM 前提条件
- 以管理员身份登录 BeyondTrust Privilege Management Web 控制台。
 - 依次前往系统配置 > REST API > 令牌。
 - 点击添加令牌。
 - 提供以下配置详细信息:
- 名称:输入 
Google SecOps Collector。 - 范围:选择 Audit:Read 和其他所需范围。
 
 - 名称:输入 
 - 点击保存,然后复制令牌值。
 - 复制以下详细信息并将其保存在安全的位置:
- API 基准网址:您的 BeyondTrust EPM API 网址(例如 
https://yourtenant-services.pm.beyondtrustcloud.com)。 - 客户端 ID:来自您的 OAuth 应用配置。
 - 客户端密钥:来自您的 OAuth 应用配置。
 
 - API 基准网址:您的 BeyondTrust EPM API 网址(例如 
 
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶。
 - 保存存储桶名称和区域以供日后参考(例如 
beyondtrust-epm-logs-bucket)。 - 按照以下用户指南创建用户:创建 IAM 用户。
 - 选择创建的用户。
 - 选择安全凭据标签页。
 - 在访问密钥部分中,点击创建访问密钥。
 - 选择第三方服务作为使用情形。
 - 点击下一步。
 - 可选:添加说明标记。
 - 点击创建访问密钥。
 - 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
 - 点击完成。
 - 选择权限标签页。
 - 在权限政策部分中,点击添加权限。
 - 选择添加权限。
 - 选择直接附加政策
 - 搜索并选择 AmazonS3FullAccess 政策。
 - 点击下一步。
 - 点击添加权限。
 
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
 复制并粘贴以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }- 如果您输入了其他存储桶名称,请替换 
beyondtrust-epm-logs-bucket。 
- 如果您输入了其他存储桶名称,请替换 
 依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策和受管政策 AWSLambdaBasicExecutionRole(用于 CloudWatch 日志记录)。
将角色命名为
BeyondTrustEPMLogExportRole,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
 - 点击从头开始创作。
 - 提供以下配置详细信息:
 
| 设置 | 值 | 
|---|---|
| 名称 | BeyondTrustEPMLogExport | 
| 运行时 | Python 3.13 | 
| 架构 | x86_64 | 
| 执行角色 | BeyondTrustEPMLogExportRole | 
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
BeyondTrustEPMLogExport.py):import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'依次前往配置 > 环境变量 > 修改 > 添加新的环境变量。
输入以下环境变量,并将其替换为您的值。
键 示例值 S3_BUCKETbeyondtrust-epm-logs-bucketS3_PREFIXbeyondtrust-epm-logs/STATE_KEYbeyondtrust-epm-logs/state.jsonBPT_API_URLhttps://yourtenant-services.pm.beyondtrustcloud.comCLIENT_IDyour-client-idCLIENT_SECRETyour-client-secretOAUTH_SCOPEurn:management:apiRECORD_SIZE1000MAX_ITERATIONS10创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度程序 > 创建计划。
 - 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour)。 - 目标:您的 Lambda 函数 
BeyondTrustEPMLogExport。 - 名称:
BeyondTrustEPMLogExport-1h。 
 - 周期性安排:费率 (
 - 点击创建时间表。
 
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台 > IAM > 用户 > 添加用户。
 - 点击 Add users(添加用户)。
 - 提供以下配置详细信息:
- 用户:输入 
secops-reader。 - 访问类型:选择访问密钥 - 以程序化方式访问。
 
 - 用户:输入 
 - 点击创建用户。
 - 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
 在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }将名称设置为
secops-reader-policy。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值将输入到 Feed 中)。
设置 Feed(两种方式)
如需配置 Feed,请按以下步骤操作:
- 依次前往 SIEM 设置> Feed。
 - 点击 + 添加新 Feed。
 - 在Feed 名称字段中,输入 Feed 的名称(例如 
BeyondTrust EPM logs)。 - 选择 Amazon S3 V2 作为来源类型。
 - 选择 BeyondTrust Endpoint Privilege Management 作为日志类型。
 - 点击下一步。
 - 为以下输入参数指定值:
- S3 URI:存储桶 URI 
s3://your-log-bucket-name/。 将your-log-bucket-name替换为存储桶的实际名称。
 - 来源删除选项:根据您的偏好设置选择删除选项。
 - 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
 - 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
 - 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
 - 资产命名空间:资产命名空间。
 - 注入标签:应用于此 Feed 中事件的标签。
 
 - S3 URI:存储桶 URI 
 - 点击下一步。
 - 在最终确定界面中查看新的 Feed 配置,然后点击提交。
 
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 | 
|---|---|---|
agent.id | 
principal.asset.attribute.labels.value | 
已映射到键为 agent_id 的标签 | 
agent.version | 
principal.asset.attribute.labels.value | 
已映射到键为 agent_version 的标签 | 
ecs.version | 
principal.asset.attribute.labels.value | 
已映射到键为 ecs_version 的标签 | 
event_data.reason | 
metadata.description | 
原始日志中的事件说明 | 
event_datas.ActionId | 
metadata.product_log_id | 
特定于产品的日志标识符 | 
file.path | 
principal.file.full_path | 
活动中的完整文件路径 | 
headers.content_length | 
additional.fields.value.string_value | 
已映射到键为 content_length 的标签 | 
headers.content_type | 
additional.fields.value.string_value | 
已映射到键为 content_type 的标签 | 
headers.http_host | 
additional.fields.value.string_value | 
已映射到键为 http_host 的标签 | 
headers.http_version | 
network.application_protocol_version | 
HTTP 协议版本 | 
headers.request_method | 
network.http.method | 
HTTP 请求方法 | 
host.hostname | 
principal.hostname | 
主账号主机名 | 
host.hostname | 
principal.asset.hostname | 
主要资产主机名 | 
host.ip | 
principal.asset.ip | 
主要资产 IP 地址 | 
host.ip | 
principal.ip | 
主 IP 地址 | 
host.mac | 
principal.mac | 
主要 MAC 地址 | 
host.os.platform | 
principal.platform | 
如果等于 macOS,则设置为 MAC | 
host.os.version | 
principal.platform_version | 
操作系统版本 | 
labels.related_item_id | 
metadata.product_log_id | 
相关商品标识符 | 
process.command_line | 
principal.process.command_line | 
进程命令行 | 
process.name | 
additional.fields.value.string_value | 
已映射到键为 process_name 的标签 | 
process.parent.name | 
additional.fields.value.string_value | 
已映射到键为 process_parent_name 的标签 | 
process.parent.pid | 
principal.process.parent_process.pid | 
父级进程 PID 已转换为字符串 | 
process.pid | 
principal.process.pid | 
进程 PID 已转换为字符串 | 
user.id | 
principal.user.userid | 
用户标识符 | 
user.name | 
principal.user.user_display_name | 
用户显示名称 | 
| 不适用 | metadata.event_timestamp | 
事件时间戳设置为日志条目时间戳 | 
| 不适用 | metadata.event_type | 
如果没有正文,则为 GENERIC_EVENT;否则为 STATUS_UPDATE | 
| 不适用 | network.application_protocol | 
如果 http_version 字段包含 HTTP,则设置为 HTTP | 
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。