BeyondTrust Endpoint Privilege Management (EPM) 로그 수집

다음에서 지원:

이 문서에서는 Amazon S3를 사용하는 EC2 기반 수집과 AWS Lambda 기반 수집이라는 두 가지 접근 방식을 사용하여 BeyondTrust Endpoint Privilege Management (EPM) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 BeyondTrust Endpoint의 원시 JSON 로그 데이터를 Chronicle UDM을 준수하는 구조화된 형식으로 변환하는 데 중점을 둡니다. 먼저 다양한 필드의 기본값을 초기화한 다음 JSON 페이로드를 파싱하고, 이후 원시 로그의 특정 필드를 event.idm.read_only_udm 객체 내의 해당 UDM 필드에 매핑합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • BeyondTrust Endpoint Privilege Management 테넌트 또는 API에 대한 권한 있는 액세스
  • AWS (S3, IAM, Lambda/EC2, EventBridge)에 대한 권한 액세스

통합 방법 선택

다음 두 가지 통합 방법 중에서 선택할 수 있습니다.

  • 옵션 1: EC2 기반 수집: 로그 수집을 위해 예약된 스크립트가 있는 EC2 인스턴스를 사용합니다.
  • 옵션 2: AWS Lambda 기반 수집: EventBridge 일정과 함께 서버리스 Lambda 함수를 사용합니다.

옵션 1: EC2 기반 수집

Google SecOps 수집을 위한 AWS IAM 구성

  1. 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
  2. 생성된 사용자를 선택합니다.
  3. 보안 사용자 인증 정보 탭을 선택합니다.
  4. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  5. 사용 사례서드 파티 서비스를 선택합니다.
  6. 다음을 클릭합니다.
  7. 선택사항: 설명 태그를 추가합니다.
  8. 액세스 키 만들기를 클릭합니다.
  9. CSV 파일 다운로드를 클릭하여 향후 참조할 수 있도록 액세스 키비밀 액세스 키를 저장합니다.
  10. 완료를 클릭합니다.
  11. 권한 탭을 선택합니다.
  12. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  13. 권한 추가를 선택합니다.
  14. 정책 직접 연결을 선택합니다.
  15. AmazonS3FullAccess 정책을 검색하여 선택합니다.
  16. 다음을 클릭합니다.
  17. 권한 추가를 클릭합니다.

API 액세스를 위해 BeyondTrust EPM 구성

  1. 관리자로 BeyondTrust Privilege Management 웹 콘솔에 로그인합니다.
  2. 구성 > 설정 > API 설정으로 이동합니다.
  3. API 계정 만들기를 클릭합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름: Google SecOps Collector를 입력합니다.
    • API 액세스: 감사 (읽기) 및 기타 범위를 필요에 따라 사용 설정합니다.
  5. 클라이언트 ID클라이언트 보안 비밀번호를 복사하여 저장합니다.
  6. API 기본 URL을 복사합니다. 일반적으로 https://<your-tenant>-services.pm.beyondtrustcloud.com입니다 (BPT_API_URL로 사용됨).

AWS S3 버킷 만들기

  1. AWS 관리 콘솔에 로그인합니다.
  2. AWS 콘솔 > 서비스 > S3 > 버킷 만들기로 이동합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 버킷 이름: my-beyondtrust-logs
    • 리전: [선택] > 만들기

EC2용 IAM 역할 만들기

  1. AWS 관리 콘솔에 로그인합니다.
  2. AWS 콘솔 > 서비스 > IAM > 역할 > 역할 만들기로 이동합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 신뢰할 수 있는 엔티티: AWS 서비스 > EC2 > 다음
    • 권한 연결: AmazonS3FullAccess (또는 버킷에 대한 범위가 지정된 정책) > 다음
    • 역할 이름: EC2-S3-BPT-Writer > 역할 만들기

EC2 수집기 VM 실행 및 구성

  1. AWS 관리 콘솔에 로그인합니다.
  2. 서비스로 이동합니다.
  3. 검색창에 EC2를 입력하고 선택합니다.
  4. EC2 대시보드에서 인스턴스를 클릭합니다.
  5. 인스턴스 실행을 클릭합니다.
  6. 다음 구성 세부정보를 제공합니다.
    • 이름: BPT-Log-Collector를 입력합니다.
    • AMI: Ubuntu Server 22.04 LTS를 선택합니다.
    • 인스턴스 유형: t3.micro (또는 그 이상)를 선택하고 다음을 클릭합니다.
    • 네트워크: 네트워크 설정이 기본 VPC로 설정되어 있는지 확인합니다.
    • IAM 역할: 메뉴에서 EC2-S3-BPT-Writer IAM 역할을 선택합니다.
    • 공용 IP 자동 할당: 사용 설정하고 (또는 VPN을 사용하여 연결할 수 있는지 확인) > 다음을 클릭합니다.
    • 스토리지 추가: 기본 스토리지 구성 (8GiB)을 그대로 두고 다음을 클릭합니다.
    • 새 보안 그룹 만들기를 선택합니다.
    • 인바운드 규칙: 규칙 추가를 클릭합니다.
    • Type: SSH를 선택합니다.
    • 포트: 22.
    • 소스: 내 IP
    • 검토 및 실행을 클릭합니다.
    • 키 쌍을 선택하거나 만듭니다.
    • 키 쌍 다운로드를 클릭합니다.
    • 다운로드한 PEM 파일을 저장합니다. SSH를 사용하여 인스턴스에 연결하려면 이 파일이 필요합니다.
  7. SSH를 사용하여 가상 머신 (VM)에 연결합니다.

수집기 기본 요건 설치

  1. 다음 명령어를 실행합니다.

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. 시스템을 업데이트하고 종속 항목을 설치합니다.

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. 디렉터리 및 상태 파일을 만듭니다.

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. 초기화합니다 (예: 1시간 전).

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

BeyondTrust EPM Collector 스크립트 배포

  1. 프로젝트 폴더를 만듭니다.

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. 필수 환경 변수를 내보냅니다 (예: ~/.bashrc).

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. collector_bpt.py를 만들고 다음 코드를 입력합니다.

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. 실행 파일로 만듭니다.

    chmod +x collector_bpt.py
    

크론으로 매일 예약

  1. 다음 명령어를 실행합니다.

    crontab -e
    
  2. 자정(UTC)에 일일 작업을 추가합니다.

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

옵션 2: AWS Lambda 기반 수집

BeyondTrust EPM 필수 구성요소 수집

  1. 관리자로 BeyondTrust Privilege Management 웹 콘솔에 로그인합니다.
  2. 시스템 구성 > REST API > 토큰으로 이동합니다.
  3. 토큰 추가를 클릭합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름: Google SecOps Collector를 입력합니다.
    • 범위: 감사:읽기 및 기타 필요한 범위를 선택합니다.
  5. 저장을 클릭하고 토큰 값을 복사합니다.
  6. 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
    • API 기본 URL: BeyondTrust EPM API URL (예: https://yourtenant-services.pm.beyondtrustcloud.com)
    • 클라이언트 ID: OAuth 애플리케이션 구성에서 가져옵니다.
    • 클라이언트 보안 비밀번호: OAuth 애플리케이션 구성에서 가져옵니다.

Google SecOps용 AWS S3 버킷 및 IAM 구성

  1. 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
  2. 나중에 참조할 수 있도록 버킷 이름리전을 저장합니다 (예: beyondtrust-epm-logs-bucket).
  3. 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
  4. 생성된 사용자를 선택합니다.
  5. 보안 사용자 인증 정보 탭을 선택합니다.
  6. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  7. 사용 사례서드 파티 서비스를 선택합니다.
  8. 다음을 클릭합니다.
  9. 선택사항: 설명 태그를 추가합니다.
  10. 액세스 키 만들기를 클릭합니다.
  11. CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키비밀 액세스 키를 저장합니다.
  12. 완료를 클릭합니다.
  13. 권한 탭을 선택합니다.
  14. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  15. 권한 추가를 선택합니다.
  16. 정책 직접 연결을 선택합니다.
  17. AmazonS3FullAccess 정책을 검색하여 선택합니다.
  18. 다음을 클릭합니다.
  19. 권한 추가를 클릭합니다.

S3 업로드용 IAM 정책 및 역할 구성

  1. AWS 콘솔에서 IAM > 정책 > 정책 만들기 > JSON 탭으로 이동합니다.
  2. 다음 정책을 복사하여 붙여넣습니다.

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • 다른 버킷 이름을 입력한 경우 beyondtrust-epm-logs-bucket을 해당 이름으로 바꿉니다.
  3. 다음 > 정책 만들기를 클릭합니다.

  4. IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.

  5. 새로 만든 정책과 관리형 정책 AWSLambdaBasicExecutionRole (CloudWatch 로깅용)을 연결합니다.

  6. 역할 이름을 BeyondTrustEPMLogExportRole로 지정하고 역할 만들기를 클릭합니다.

Lambda 함수 만들기

  1. AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
  2. 처음부터 작성을 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
설정
이름 BeyondTrustEPMLogExport
런타임 Python 3.13
아키텍처 x86_64
실행 역할 BeyondTrustEPMLogExportRole
  1. 함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 다음 코드를 입력합니다 (BeyondTrustEPMLogExport.py).

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. 구성 > 환경 변수 > 수정 > 새 환경 변수 추가로 이동합니다.

  3. 다음 환경 변수를 입력하고 사용자 값으로 바꿉니다.

    예시 값
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. 함수가 생성된 후 해당 페이지에 머무르거나 Lambda > 함수 > your-function을 엽니다.

  5. 구성 탭을 선택합니다.

  6. 일반 구성 패널에서 수정을 클릭합니다.

  7. 제한 시간5분 (300초)으로 변경하고 저장을 클릭합니다.

EventBridge 일정 만들기

  1. Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
  2. 다음 구성 세부정보를 제공합니다.
    • 반복 일정: 요율 (1 hour)
    • 타겟: Lambda 함수 BeyondTrustEPMLogExport
    • 이름: BeyondTrustEPMLogExport-1h.
  3. 일정 만들기를 클릭합니다.

선택사항: Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기

  1. AWS 콘솔 > IAM > 사용자 > 사용자 추가로 이동합니다.
  2. Add users를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 사용자: secops-reader를 입력합니다.
    • 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
  4. 사용자 만들기를 클릭합니다.
  5. 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
  6. JSON 편집기에서 다음 정책을 입력합니다.

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. 이름을 secops-reader-policy로 설정합니다.

  8. 정책 만들기 > 검색/선택 > 다음 > 권한 추가로 이동합니다.

  9. 보안 사용자 인증 정보> 액세스 키> 액세스 키 만들기로 이동합니다.

  10. CSV를 다운로드합니다 (이러한 값은 피드에 입력됨).

피드 설정 (두 옵션 모두)

피드를 구성하려면 다음 단계를 따르세요.

  1. SIEM 설정> 피드로 이동합니다.
  2. + 새 피드 추가를 클릭합니다.
  3. 피드 이름 필드에 피드 이름을 입력합니다 (예: BeyondTrust EPM logs).
  4. 소스 유형으로 Amazon S3 V2를 선택합니다.
  5. 로그 유형으로 BeyondTrust Endpoint Privilege Management를 선택합니다.
  6. 다음을 클릭합니다.
  7. 다음 입력 파라미터의 값을 지정합니다.
    • S3 URI: 버킷 URI
      • s3://your-log-bucket-name/ your-log-bucket-name을 실제 버킷 이름으로 바꿉니다.
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
    • 액세스 키 ID: S3 버킷에 액세스할 수 있는 사용자 액세스 키입니다.
    • 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
    • 애셋 네임스페이스: 애셋 네임스페이스입니다.
    • 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
  8. 다음을 클릭합니다.
  9. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

UDM 매핑 테이블

로그 필드 UDM 매핑 논리
agent.id principal.asset.attribute.labels.value 키가 agent_id인 라벨에 매핑됨
agent.version principal.asset.attribute.labels.value 키가 agent_version인 라벨에 매핑됨
ecs.version principal.asset.attribute.labels.value 키가 ecs_version인 라벨에 매핑됨
event_data.reason metadata.description 원시 로그의 이벤트 설명
event_datas.ActionId metadata.product_log_id 제품별 로그 식별자
file.path principal.file.full_path 이벤트의 전체 파일 경로
headers.content_length additional.fields.value.string_value 키가 content_length인 라벨에 매핑됨
headers.content_type additional.fields.value.string_value 키가 content_type인 라벨에 매핑됨
headers.http_host additional.fields.value.string_value 키가 http_host인 라벨에 매핑됨
headers.http_version network.application_protocol_version HTTP 프로토콜 버전
headers.request_method network.http.method HTTP 요청 메서드
host.hostname principal.hostname 기본 호스트 이름
host.hostname principal.asset.hostname 기본 애셋 호스트 이름
host.ip principal.asset.ip 기본 애셋 IP 주소
host.ip principal.ip 기본 IP 주소
host.mac principal.mac 기본 MAC 주소
host.os.platform principal.platform macOS와 같은 경우 MAC으로 설정
host.os.version principal.platform_version 운영체제 버전
labels.related_item_id metadata.product_log_id 관련 상품 식별자
process.command_line principal.process.command_line 프로세스 명령줄
process.name additional.fields.value.string_value 키가 process_name인 라벨에 매핑됨
process.parent.name additional.fields.value.string_value 키가 process_parent_name인 라벨에 매핑됨
process.parent.pid principal.process.parent_process.pid 상위 프로세스 PID가 문자열로 변환됨
process.pid principal.process.pid 문자열로 변환된 프로세스 PID
user.id principal.user.userid 사용자 식별자
user.name principal.user.user_display_name 사용자 표시 이름
해당 사항 없음 metadata.event_timestamp 로그 항목 타임스탬프로 설정된 이벤트 타임스탬프
해당 사항 없음 metadata.event_type 주체가 없는 경우 GENERIC_EVENT, 그렇지 않은 경우 STATUS_UPDATE
해당 사항 없음 network.application_protocol http_version 필드에 HTTP가 포함된 경우 HTTP로 설정

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.