收集 BeyondTrust Endpoint Privilege Management (EPM) 記錄

支援的國家/地區:

本文說明如何使用兩種不同方法,將 BeyondTrust Endpoint Privilege Management (EPM) 記錄擷取至 Google Security Operations:以 EC2 為基礎的收集作業,以及以 Amazon S3 為基礎的 AWS Lambda 收集作業。剖析器會將 BeyondTrust Endpoint 的原始 JSON 記錄資料轉換為符合 Chronicle UDM 的結構化格式。這個函式會先初始化各種欄位的預設值,然後剖析 JSON 酬載,接著將原始記錄中的特定欄位對應至 event.idm.read_only_udm 物件中的對應 UDM 欄位。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • BeyondTrust Endpoint Privilege Management 租戶或 API 的特殊權限
  • AWS 的具備權限存取權 (S3、IAM、Lambda/EC2、EventBridge)

選擇整合方式

您可以選擇下列其中一種整合方式:

  • 選項 1:以 EC2 為基礎的收集作業:使用 EC2 執行個體搭配排定的指令碼,收集記錄
  • 選項 2:以 AWS Lambda 為基礎的收集作業:使用無伺服器 Lambda 函式和 EventBridge 排程

選項 1:以 EC2 為基礎的集合

設定 AWS IAM,以便擷取 Google SecOps 資料

  1. 請按照這份使用者指南建立使用者建立 IAM 使用者
  2. 選取建立的「使用者」
  3. 選取「安全憑證」分頁標籤。
  4. 在「Access Keys」部分中,按一下「Create Access Key」
  5. 選取「第三方服務」做為「用途」
  6. 點選「下一步」
  7. 選用:新增說明標記。
  8. 按一下「建立存取金鑰」
  9. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  10. 按一下 [完成]
  11. 選取 [權限] 分頁標籤。
  12. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  13. 選取「新增權限」
  14. 選取「直接附加政策」
  15. 搜尋並選取 AmazonS3FullAccess 政策。
  16. 點選「下一步」
  17. 按一下「Add permissions」。

設定 BeyondTrust EPM 的 API 存取權

  1. 以管理員身分登入 BeyondTrust Privilege Management 網頁控制台
  2. 依序前往「設定」>「設定」>「API 設定」
  3. 按一下「建立 API 帳戶」
  4. 提供下列設定詳細資料:
    • 「Name」(名稱):輸入 Google SecOps Collector
    • API 存取權:視需要啟用「稽核 (讀取)」和其他範圍。
  5. 複製並儲存「用戶端 ID」和「用戶端密鑰」
  6. 複製 API 基礎網址,通常是 https://<your-tenant>-services.pm.beyondtrustcloud.com (您會將此網址做為 BPT_API_URL 使用)。

建立 AWS S3 儲存空間

  1. 登入 AWS 管理主控台
  2. 前往 AWS 控制台 > 服務 > S3 > 建立 bucket
  3. 提供下列設定詳細資料:
    • Bucket namemy-beyondtrust-logs
    • 「Region」(地區):[你的選擇] >「Create」(建立)

為 EC2 建立 IAM 角色

  1. 登入 AWS 管理主控台
  2. 依序前往「AWS Console」>「Services」>「IAM」>「Roles」>「Create role」
  3. 提供下列設定詳細資料:
    • 信任的實體AWS 服務 > EC2 > 下一步
    • 附加權限AmazonS3FullAccess (或值區的範圍政策) >「下一步」
    • 角色名稱EC2-S3-BPT-Writer > 建立角色

啟動及設定 EC2 Collector VM

  1. 登入 AWS 管理主控台
  2. 前往「服務」
  3. 在搜尋列中輸入「EC2」EC2並選取。
  4. EC2 資訊主頁中,按一下「Instances」(執行個體)
  5. 按一下「啟動執行個體」
  6. 提供下列設定詳細資料:
    • 「Name」(名稱):輸入 BPT-Log-Collector
    • AMI:選取「Ubuntu Server 22.04 LTS」。
    • 執行個體類型t3.micro (或更大),然後按一下「下一步」
    • 網路:確認「網路」設定已設為預設虛擬私有雲。
    • IAM 角色:從選單中選取 EC2-S3-BPT-Writer IAM 角色。
    • 自動指派公開 IP:啟用 (或確認可透過 VPN 連線) > 點選「下一步」
    • 新增儲存空間:保留預設儲存空間設定 (8 GiB),然後點選「下一步」
    • 選取「建立新的安全性群組」
    • 連入規則:按一下「新增規則」
    • 類型:選取「SSH」
    • 通訊埠22
    • 來源:您的 IP
    • 按一下「檢閱並啟動」
    • 選取或建立金鑰組。
    • 按一下「下載金鑰配對」
    • 儲存下載的 PEM 檔案。您需要這個檔案,才能使用 SSH 連線至執行個體。
  7. 使用 SSH 連線至虛擬機器 (VM)。

安裝收集器必備條件

  1. 執行下列指令:

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. 更新系統並安裝依附元件:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. 建立目錄和狀態檔案:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. 初始化 (例如設為 1 小時前):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

部署 BeyondTrust EPM 收集器指令碼

  1. 建立專案資料夾:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. 匯出必要環境變數 (例如在 ~/.bashrc 中):

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. 建立 collector_bpt.py 並輸入下列程式碼:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. 將其設為可執行:

    chmod +x collector_bpt.py
    

使用 Cron 安排每日排程

  1. 執行下列指令:

    crontab -e
    
  2. 在世界標準時間午夜新增每日工作:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

選項 2:以 AWS Lambda 為基礎的收集作業

收集 BeyondTrust EPM 必備條件

  1. 以管理員身分登入 BeyondTrust Privilege Management 網頁控制台
  2. 依序前往「System Configuration」>「REST API」>「Tokens」
  3. 按一下「新增權杖」
  4. 提供下列設定詳細資料:
    • 「Name」(名稱):輸入 Google SecOps Collector
    • 範圍:選取「Audit:Read」和其他必要範圍。
  5. 按一下「儲存」,然後複製權杖值。
  6. 複製下列詳細資料並儲存於安全位置:
    • API 基礎網址:您的 BeyondTrust EPM API 網址 (例如 https://yourtenant-services.pm.beyondtrustcloud.com)。
    • 用戶端 ID:來自 OAuth 應用程式設定。
    • 用戶端密鑰:來自 OAuth 應用程式設定。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照本使用指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 beyondtrust-epm-logs-bucket)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 管理控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 複製並貼上下列政策:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • 如果您輸入的值區名稱不同,請替換 beyondtrust-epm-logs-bucket
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  5. 附加新建立的政策和代管政策 AWSLambdaBasicExecutionRole (適用於 CloudWatch 記錄)。

  6. 為角色命名 BeyondTrustEPMLogExportRole,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:
設定
名稱 BeyondTrustEPMLogExport
執行階段 Python 3.13
架構 x86_64
執行角色 BeyondTrustEPMLogExportRole
  1. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (BeyondTrustEPMLogExport.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  3. 輸入下列環境變數,並將 換成您的值。

    範例值
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  5. 選取「設定」分頁標籤。

  6. 在「一般設定」面板中,按一下「編輯」

  7. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 BeyondTrustEPMLogExport
    • 名稱BeyondTrustEPMLogExport-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 依序前往 AWS 管理中心 > IAM > 使用者 > 新增使用者
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」
  6. 在 JSON 編輯器中輸入下列政策:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

設定動態饋給 (兩種方法)

如要設定動態消息,請按照下列步驟操作:

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 BeyondTrust EPM logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「BeyondTrust Endpoint Privilege Management」做為「記錄類型」。
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URI:值區 URI
      • s3://your-log-bucket-name/。請將 your-log-bucket-name 替換為實際值區名稱。
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

UDM 對應表

記錄欄位 UDM 對應 邏輯
agent.id principal.asset.attribute.labels.value 已對應至金鑰為「agent_id」的標籤
agent.version principal.asset.attribute.labels.value 已對應至金鑰為「agent_version」的標籤
ecs.version principal.asset.attribute.labels.value 已對應至金鑰為「ecs_version」的標籤
event_data.reason metadata.description 原始記錄中的事件說明
event_datas.ActionId metadata.product_log_id 產品專屬記錄 ID
file.path principal.file.full_path 活動中的完整檔案路徑
headers.content_length additional.fields.value.string_value 已對應至金鑰為「content_length」的標籤
headers.content_type additional.fields.value.string_value 已對應至金鑰為「content_type」的標籤
headers.http_host additional.fields.value.string_value 已對應至金鑰為「http_host」的標籤
headers.http_version network.application_protocol_version HTTP 通訊協定版本
headers.request_method network.http.method HTTP 要求方法
host.hostname principal.hostname 主要主機名稱
host.hostname principal.asset.hostname 主要資產主機名稱
host.ip principal.asset.ip 主要資產 IP 位址
host.ip principal.ip 主體 IP 位址
host.mac principal.mac 主要 MAC 位址
host.os.platform principal.platform 如果等於 macOS,則設為 MAC
host.os.version principal.platform_version 作業系統版本
labels.related_item_id metadata.product_log_id 相關項目 ID
process.command_line principal.process.command_line 程序指令列
process.name additional.fields.value.string_value 已對應至金鑰為「process_name」的標籤
process.parent.name additional.fields.value.string_value 已對應至金鑰為「process_parent_name」的標籤
process.parent.pid principal.process.parent_process.pid 父項程序 PID 轉換為字串
process.pid principal.process.pid 程序 PID 轉換為字串
user.id principal.user.userid 使用者 ID
user.name principal.user.user_display_name 使用者顯示名稱
不適用 metadata.event_timestamp 事件時間戳記設為記錄項目時間戳記
不適用 metadata.event_type 如果沒有主體,則為 GENERIC_EVENT;否則為 STATUS_UPDATE
不適用 network.application_protocol 如果 http_version 欄位包含 HTTP,請設為 HTTP

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。