BeyondTrust Endpoint Privilege Management (EPM)-Logs erfassen

Unterstützt in:

In diesem Dokument wird beschrieben, wie Sie BeyondTrust Endpoint Privilege Management-Logs (EPM) in Google Security Operations aufnehmen. Dazu gibt es zwei verschiedene Ansätze: EC2-basierte Erfassung und AWS Lambda-basierte Erfassung mit Amazon S3. Der Parser konzentriert sich auf die Umwandlung von JSON-Rohlogdaten von BeyondTrust Endpoint in ein strukturiertes Format, das dem Chronicle UDM entspricht. Zuerst werden Standardwerte für verschiedene Felder initialisiert und dann die JSON-Nutzlast geparst. Anschließend werden bestimmte Felder aus dem Rohlog in entsprechende UDM-Felder im event.idm.read_only_udm-Objekt abgebildet.

Hinweise

Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:

  • Google SecOps-Instanz
  • Privilegierter Zugriff auf den BeyondTrust Endpoint Privilege Management-Mandanten oder die API
  • Privilegierter Zugriff auf AWS (S3, IAM, Lambda/EC2, EventBridge)

Integrationsmethode auswählen

Sie können zwischen zwei Integrationsmethoden wählen:

  • Option 1: EC2-basierte Erfassung: Hier wird eine EC2-Instanz mit geplanten Skripten für die Protokollerfassung verwendet.
  • Option 2: Erfassung auf Basis von AWS Lambda: Hier werden serverlose Lambda-Funktionen mit EventBridge-Planung verwendet.

Option 1: EC2-basierte Erfassung

AWS IAM für die Google SecOps-Aufnahme konfigurieren

  1. Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
  2. Wählen Sie den erstellten Nutzer aus.
  3. Wählen Sie den Tab Sicherheitsanmeldedaten aus.
  4. Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
  5. Wählen Sie Drittanbieterdienst als Anwendungsfall aus.
  6. Klicken Sie auf Weiter.
  7. Optional: Fügen Sie ein Beschreibungstag hinzu.
  8. Klicken Sie auf Zugriffsschlüssel erstellen.
  9. Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) für die zukünftige Verwendung zu speichern.
  10. Klicken Sie auf Fertig.
  11. Wählen Sie den Tab Berechtigungen aus.
  12. Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
  13. Wählen Sie Berechtigungen hinzufügen aus.
  14. Wählen Sie Richtlinien direkt anhängen aus.
  15. Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
  16. Klicken Sie auf Weiter.
  17. Klicken Sie auf Berechtigungen hinzufügen.

BeyondTrust EPM für den API-Zugriff konfigurieren

  1. Melden Sie sich als Administrator in der BeyondTrust Privilege Management-Webkonsole an.
  2. Rufen Sie Konfiguration > Einstellungen > API-Einstellungen auf.
  3. Klicken Sie auf API-Konto erstellen.
  4. Geben Sie die folgenden Konfigurationsdetails an:
    • Name: Geben Sie Google SecOps Collector ein.
    • API-Zugriff: Aktivieren Sie Audit (Read) und andere Bereiche nach Bedarf.
  5. Kopieren und speichern Sie die Client-ID und den Clientschlüssel.
  6. Kopieren Sie Ihre API-Basis-URL. Sie lautet in der Regel https://<your-tenant>-services.pm.beyondtrustcloud.com. Sie verwenden sie als BPT_API_URL.

AWS S3-Bucket erstellen

  1. Melden Sie sich bei der AWS Management Console an.
  2. Rufen Sie die AWS Console > Services > S3 > Create bucket auf.
  3. Geben Sie die folgenden Konfigurationsdetails an:
    • Bucket-Name: my-beyondtrust-logs.
    • Region: [Ihre Auswahl] > Erstellen.

IAM-Rolle für EC2 erstellen

  1. Melden Sie sich bei der AWS Management Console an.
  2. Rufen Sie die AWS-Konsole > Dienste > IAM > Rollen > Rolle erstellen auf.
  3. Geben Sie die folgenden Konfigurationsdetails an:
    • Vertrauenswürdige Identität: AWS-Dienst > EC2 > Weiter.
    • Berechtigung anhängen: AmazonS3FullAccess (oder eine Richtlinie mit eingeschränktem Umfang für Ihren Bucket) > Weiter.
    • Rollenname: EC2-S3-BPT-Writer > Rolle erstellen.

EC2 Collector-VM starten und konfigurieren

  1. Melden Sie sich bei der AWS Management Console an.
  2. Rufen Sie Dienste auf.
  3. Geben Sie in die Suchleiste EC2 ein und wählen Sie das Ergebnis aus.
  4. Klicken Sie im EC2-Dashboard auf Instances (Instanzen).
  5. Klicken Sie auf Instanzen starten.
  6. Geben Sie die folgenden Konfigurationsdetails an:
    • Name: Geben Sie BPT-Log-Collector ein.
    • AMI: Wählen Sie Ubuntu Server 22.04 LTS aus.
    • Instanztyp: t3.micro (oder größer). Klicken Sie dann auf Weiter.
    • Netzwerk: Achten Sie darauf, dass die Einstellung Netzwerk auf Ihre Standard-VPC festgelegt ist.
    • IAM-Rolle: Wählen Sie im Menü die IAM-Rolle EC2-S3-BPT-Writer aus.
    • Öffentliche IP automatisch zuweisen: Aktivieren Sie diese Option (oder stellen Sie sicher, dass Sie sie über VPN erreichen können) > Weiter.
    • Speicher hinzufügen: Behalten Sie die Standardspeicherkonfiguration (8 GiB) bei und klicken Sie auf Weiter.
    • Wählen Sie Neue Sicherheitsgruppe erstellen aus.
    • Regel für eingehenden Traffic: Klicken Sie auf Regel hinzufügen.
    • Type (Typ): Wählen Sie SSH aus.
    • Port: 22.
    • Quelle: Ihre IP-Adresse
    • Klicken Sie auf Überprüfen und starten.
    • Wählen Sie ein Schlüsselpaar aus oder erstellen Sie eines.
    • Klicken Sie auf Schlüsselpaar herunterladen.
    • Speichern Sie die heruntergeladene PEM-Datei. Sie benötigen diese Datei, um eine SSH-Verbindung zu Ihrer Instanz herzustellen.
  7. Stellen Sie über SSH eine Verbindung zu Ihrer VM her.

Erforderliche Komponenten für Collector installieren

  1. Führen Sie dazu diesen Befehl aus:

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. Aktualisieren Sie das System und installieren Sie die Abhängigkeiten:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. Verzeichnis und Statusdatei erstellen:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. Initialisieren Sie sie (z. B. auf vor einer Stunde):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

BeyondTrust EPM Collector-Script bereitstellen

  1. Projektordner erstellen:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. Exportieren Sie die erforderlichen Umgebungsvariablen (z. B. in ~/.bashrc):

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. Erstellen Sie collector_bpt.py und geben Sie den folgenden Code ein:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. Machen Sie es ausführbar:

    chmod +x collector_bpt.py
    

Tägliche Ausführung mit Cron planen

  1. Führen Sie dazu diesen Befehl aus:

    crontab -e
    
  2. Fügen Sie den täglichen Job um Mitternacht UTC hinzu:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

Option 2: Erfassung auf Basis von AWS Lambda

Voraussetzungen für BeyondTrust EPM

  1. Melden Sie sich als Administrator in der BeyondTrust Privilege Management-Webkonsole an.
  2. Gehen Sie zu Systemkonfiguration > REST API > Tokens.
  3. Klicken Sie auf Token hinzufügen.
  4. Geben Sie die folgenden Konfigurationsdetails an:
    • Name: Geben Sie Google SecOps Collector ein.
    • Bereiche: Wählen Sie Audit:Read und andere Bereiche nach Bedarf aus.
  5. Klicken Sie auf Speichern und kopieren Sie den Tokenwert.
  6. Kopieren und speichern Sie die folgenden Details an einem sicheren Ort:
    • API-Basis-URL: Ihre BeyondTrust EPM API-URL (z. B. https://yourtenant-services.pm.beyondtrustcloud.com).
    • Client-ID: Aus der Konfiguration Ihrer OAuth-Anwendung.
    • Clientschlüssel: Aus der Konfiguration Ihrer OAuth-Anwendung.

AWS S3-Bucket und IAM für Google SecOps konfigurieren

  1. Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
  2. Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B. beyondtrust-epm-logs-bucket).
  3. Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
  4. Wählen Sie den erstellten Nutzer aus.
  5. Wählen Sie den Tab Sicherheitsanmeldedaten aus.
  6. Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
  7. Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
  8. Klicken Sie auf Weiter.
  9. Optional: Fügen Sie ein Beschreibungstag hinzu.
  10. Klicken Sie auf Zugriffsschlüssel erstellen.
  11. Klicken Sie auf CSV-Datei herunterladen, um den Zugriffsschlüssel und den geheimen Zugriffsschlüssel zur späteren Verwendung zu speichern.
  12. Klicken Sie auf Fertig.
  13. Wählen Sie den Tab Berechtigungen aus.
  14. Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
  15. Wählen Sie Berechtigungen hinzufügen aus.
  16. Wählen Sie Richtlinien direkt anhängen aus.
  17. Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
  18. Klicken Sie auf Weiter.
  19. Klicken Sie auf Berechtigungen hinzufügen.

IAM-Richtlinie und -Rolle für S3-Uploads konfigurieren

  1. Rufen Sie in der AWS-Konsole IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
  2. Kopieren Sie die folgende Richtlinie und fügen Sie sie ein:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • Ersetzen Sie beyondtrust-epm-logs-bucket, wenn Sie einen anderen Bucket-Namen eingegeben haben.
  3. Klicken Sie auf Weiter > Richtlinie erstellen.

  4. Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.

  5. Hängen Sie die neu erstellte Richtlinie und die verwaltete Richtlinie AWSLambdaBasicExecutionRole (für CloudWatch-Logging) an.

  6. Geben Sie der Rolle den Namen BeyondTrustEPMLogExportRole und klicken Sie auf Rolle erstellen.

Lambda-Funktion erstellen

  1. Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
  2. Klicken Sie auf Von Grund auf erstellen.
  3. Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert
Name BeyondTrustEPMLogExport
Laufzeit Python 3.13
Architektur x86_64
Ausführungsrolle BeyondTrustEPMLogExportRole
  1. Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (BeyondTrustEPMLogExport.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. Klicken Sie auf Konfiguration> Umgebungsvariablen> Bearbeiten> Neue Umgebungsvariable hinzufügen.

  3. Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte.

    Schlüssel Beispielwert
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre Funktion.

  5. Wählen Sie den Tab Konfiguration aus.

  6. Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.

  7. Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.

EventBridge-Zeitplan erstellen

  1. Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
  2. Geben Sie die folgenden Konfigurationsdetails an:
    • Wiederkehrender Zeitplan: Preis (1 hour).
    • Ziel: Ihre Lambda-Funktion BeyondTrustEPMLogExport.
    • Name: BeyondTrustEPMLogExport-1h.
  3. Klicken Sie auf Zeitplan erstellen.

Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen

  1. Rufen Sie die AWS-Konsole > IAM > Nutzer > Nutzer hinzufügen auf.
  2. Klicken Sie auf Add users (Nutzer hinzufügen).
  3. Geben Sie die folgenden Konfigurationsdetails an:
    • Nutzer: Geben Sie secops-reader ein.
    • Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
  4. Klicken Sie auf Nutzer erstellen.
  5. Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
  6. Geben Sie im JSON-Editor die folgende Richtlinie ein:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. Legen Sie secops-reader-policy als Name fest.

  8. Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.

  9. Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.

  10. Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).

Feeds einrichten (beide Optionen)

So konfigurieren Sie einen Feed:

  1. Rufen Sie die SIEM-Einstellungen > Feeds auf.
  2. Klicken Sie auf + Neuen Feed hinzufügen.
  3. Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B. BeyondTrust EPM logs.
  4. Wählen Sie Amazon S3 V2 als Quelltyp aus.
  5. Wählen Sie BeyondTrust Endpoint Privilege Management als Logtyp aus.
  6. Klicken Sie auf Weiter.
  7. Geben Sie Werte für die folgenden Eingabeparameter an:
    • S3-URI: Der Bucket-URI
      • s3://your-log-bucket-name/. Ersetzen Sie your-log-bucket-name durch den tatsächlichen Namen des Buckets.
    • Optionen zum Löschen von Quellen: Wählen Sie die gewünschte Option zum Löschen aus.
    • Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Der Standardwert ist 180 Tage.
    • Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
    • Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
    • Asset-Namespace: Der Asset-Namespace.
    • Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
  8. Klicken Sie auf Weiter.
  9. Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.

UDM-Zuordnungstabelle

Logfeld UDM-Zuordnung Logik
agent.id principal.asset.attribute.labels.value Dem Label mit dem Schlüssel agent_id zugeordnet
agent.version principal.asset.attribute.labels.value Dem Label mit dem Schlüssel agent_version zugeordnet
ecs.version principal.asset.attribute.labels.value Dem Label mit dem Schlüssel ecs_version zugeordnet
event_data.reason metadata.description Ereignisbeschreibung aus dem Rohprotokoll
event_datas.ActionId metadata.product_log_id Produktspezifische Log-Kennzeichnung
file.path principal.file.full_path Vollständiger Dateipfad aus dem Ereignis
headers.content_length additional.fields.value.string_value Dem Label mit dem Schlüssel content_length zugeordnet
headers.content_type additional.fields.value.string_value Dem Label mit dem Schlüssel content_type zugeordnet
headers.http_host additional.fields.value.string_value Dem Label mit dem Schlüssel http_host zugeordnet
headers.http_version network.application_protocol_version HTTP-Protokollversion
headers.request_method network.http.method HTTP-Anfragemethode
host.hostname principal.hostname Principal-Hostname
host.hostname principal.asset.hostname Hostname des Hauptassets
host.ip principal.asset.ip IP-Adresse des Haupt-Assets
host.ip principal.ip IP-Adresse des Hauptkontos
host.mac principal.mac Primäre MAC-Adresse
host.os.platform principal.platform Auf MAC setzen, wenn gleich macOS
host.os.version principal.platform_version Betriebssystemversion
labels.related_item_id metadata.product_log_id Kennzeichnung ähnlicher Artikel
process.command_line principal.process.command_line Befehlszeile für den Prozess
process.name additional.fields.value.string_value Dem Label mit dem Schlüssel process_name zugeordnet
process.parent.name additional.fields.value.string_value Dem Label mit dem Schlüssel process_parent_name zugeordnet
process.parent.pid principal.process.parent_process.pid PID des übergeordneten Prozesses in String konvertiert
process.pid principal.process.pid Prozess-PID in String konvertiert
user.id principal.user.userid Nutzer-ID
user.name principal.user.user_display_name Anzeigename des Nutzers
metadata.event_timestamp Ereigniszeitstempel auf Zeitstempel des Logeintrags festgelegt
metadata.event_type GENERIC_EVENT, wenn kein Prinzipal vorhanden ist, andernfalls STATUS_UPDATE
network.application_protocol Auf HTTP festlegen, wenn das Feld „http_version“ HTTP enthält

Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten