Recoger registros de BeyondTrust Endpoint Privilege Management (EPM)

Disponible en:

En este documento se explica cómo ingerir registros de BeyondTrust Endpoint Privilege Management (EPM) en Google Security Operations mediante dos enfoques diferentes: la recogida basada en EC2 y la recogida basada en AWS Lambda mediante Amazon S3. El analizador se centra en transformar los datos de registro JSON sin procesar de BeyondTrust Endpoint en un formato estructurado que se ajuste al UDM de Chronicle. Primero, inicializa los valores predeterminados de varios campos y, a continuación, analiza la carga útil de JSON. Después, asigna campos específicos del registro sin procesar a los campos de UDM correspondientes dentro del objeto event.idm.read_only_udm.

Antes de empezar

Asegúrate de que cumples los siguientes requisitos previos:

  • Instancia de Google SecOps
  • Acceso con privilegios al tenant o a la API de BeyondTrust Endpoint Privilege Management
  • Acceso privilegiado a AWS (S3, IAM, Lambda/EC2 y EventBridge)

Elige el método de integración

Puedes elegir entre dos métodos de integración:

  • Opción 1: Recogida basada en EC2: usa una instancia de EC2 con secuencias de comandos programadas para recoger registros.
  • Opción 2: Recogida basada en AWS Lambda: usa funciones Lambda sin servidor con la programación de EventBridge.

Opción 1: Recogida basada en EC2

Configurar la gestión de identidades y accesos de AWS para la ingestión de Google SecOps

  1. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  2. Selecciona el Usuario creado.
  3. Selecciona la pestaña Credenciales de seguridad.
  4. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  5. Selecciona Servicio de terceros en Caso práctico.
  6. Haz clic en Siguiente.
  7. Opcional: añade una etiqueta de descripción.
  8. Haz clic en Crear clave de acceso.
  9. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para futuras consultas.
  10. Haz clic en Listo.
  11. Selecciona la pestaña Permisos.
  12. En la sección Políticas de permisos, haz clic en Añadir permisos.
  13. Selecciona Añadir permisos.
  14. Seleccione Adjuntar políticas directamente.
  15. Busca y selecciona la política AmazonS3FullAccess.
  16. Haz clic en Siguiente.
  17. Haz clic en Añadir permisos.

Configurar BeyondTrust EPM para el acceso a la API

  1. Inicia sesión en la consola web de gestión de privilegios de BeyondTrust como administrador.
  2. Ve a Configuración > Ajustes > Ajustes de la API.
  3. Haz clic en Create an API Account (Crear una cuenta de API).
  4. Proporcione los siguientes detalles de configuración:
    • Nombre: escribe Google SecOps Collector.
    • Acceso a la API: habilita Auditoría (lectura) y otros ámbitos según sea necesario.
  5. Copia y guarda el ID de cliente y el secreto de cliente.
  6. Copia la URL base de la API. Normalmente es https://<your-tenant>-services.pm.beyondtrustcloud.com (la usarás como BPT_API_URL).

Crear un segmento de AWS S3

  1. Inicia sesión en la consola de administración de AWS.
  2. Ve a Consola de AWS > Servicios > S3 > Crear un bucket.
  3. Proporcione los siguientes detalles de configuración:
    • Nombre del segmento: my-beyondtrust-logs.
    • Región: [tu elección] > Crear.

Crear un rol de gestión de identidades y accesos para EC2

  1. Inicia sesión en la consola de administración de AWS.
  2. Ve a Consola de AWS > Servicios > IAM > Roles > Crear rol.
  3. Proporcione los siguientes detalles de configuración:
    • Entidad de confianza: Servicio de AWS > EC2 > Siguiente.
    • Adjuntar permiso: AmazonS3FullAccess (o una política con ámbito a tu bucket) > Siguiente.
    • Nombre del rol: EC2-S3-BPT-Writer > Crear rol.

Lanzar y configurar la VM de EC2 Collector

  1. Inicia sesión en la consola de administración de AWS.
  2. Ve a Servicios.
  3. En la barra de búsqueda, escribe EC2 y selecciónalo.
  4. En el panel de control de EC2, haga clic en Instancias.
  5. Haz clic en Lanzar instancias.
  6. Proporcione los siguientes detalles de configuración:
    • Nombre: escribe BPT-Log-Collector.
    • AMI: selecciona Ubuntu Server 22.04 LTS.
    • Tipo de instancia: t3.micro (o un tipo más grande) y, a continuación, haz clic en Siguiente.
    • Red: asegúrate de que el ajuste Red esté configurado en tu VPC predeterminada.
    • Rol de gestión de identidades y accesos: selecciona el rol de gestión de identidades y accesos EC2-S3-BPT-Writer en el menú.
    • Asignar automáticamente IP pública: habilita esta opción (o asegúrate de que puedes acceder a ella mediante una VPN) > Siguiente.
    • Añadir almacenamiento: deje la configuración de almacenamiento predeterminada (8 GiB) y haga clic en Siguiente.
    • Selecciona Crear un grupo de seguridad.
    • Regla de entrada: haz clic en Añadir regla.
    • Tipo: selecciona SSH.
    • Puerto: 22.
    • Origen: tu IP
    • Haga clic en Revisar y publicar.
    • Seleccione o cree un par de claves.
    • Haz clic en Descargar par de claves.
    • Guarda el archivo PEM descargado. Necesitarás este archivo para conectarte a tu instancia mediante SSH.
  7. Conéctate a tu máquina virtual (VM) mediante SSH.

Instalar los requisitos previos del recopilador

  1. Ejecuta el siguiente comando:

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. Actualiza el sistema e instala las dependencias:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. Crea un directorio y un archivo de estado:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. Inicialízalo (por ejemplo, hace 1 hora):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

Implementar la secuencia de comandos del recopilador de EPM de BeyondTrust

  1. Crea una carpeta de proyecto:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. Exporta las variables de entorno necesarias (por ejemplo, en ~/.bashrc):

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. Crea collector_bpt.py e introduce el siguiente código:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. Haz que sea ejecutable:

    chmod +x collector_bpt.py
    

Programar diariamente con Cron

  1. Ejecuta el siguiente comando:

    crontab -e
    
  2. Añade la tarea diaria a medianoche UTC:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

Opción 2: Recogida basada en AWS Lambda

Recopilar los requisitos previos de BeyondTrust EPM

  1. Inicia sesión en la consola web de gestión de privilegios de BeyondTrust como administrador.
  2. Ve a Configuración del sistema > API REST > Tokens.
  3. Haz clic en Add Token (Añadir token).
  4. Proporcione los siguientes detalles de configuración:
    • Nombre: escribe Google SecOps Collector.
    • Ámbitos: selecciona Audit:Read y otros ámbitos según sea necesario.
  5. Haz clic en Guardar y copia el valor del token.
  6. Copia y guarda en un lugar seguro los siguientes detalles:
    • URL base de la API: la URL de la API de BeyondTrust EPM (por ejemplo, https://yourtenant-services.pm.beyondtrustcloud.com).
    • ID de cliente: en la configuración de tu aplicación OAuth.
    • Secreto de cliente: en la configuración de tu aplicación OAuth.

Configurar un segmento de AWS S3 y IAM para Google SecOps

  1. Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
  2. Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo, beyondtrust-epm-logs-bucket).
  3. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  4. Selecciona el Usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  7. Selecciona Servicio de terceros como Caso práctico.
  8. Haz clic en Siguiente.
  9. Opcional: añade una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. En la sección Políticas de permisos, haz clic en Añadir permisos.
  15. Selecciona Añadir permisos.
  16. Seleccione Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Añadir permisos.

Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3

  1. En la consola de AWS, ve a IAM > Policies > Create policy > pestaña JSON.
  2. Copia y pega la siguiente política:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • Sustituye beyondtrust-epm-logs-bucket si has introducido otro nombre de segmento.
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunta la política que acabas de crear y la política gestionada AWSLambdaBasicExecutionRole (para el registro de CloudWatch).

  6. Dale el nombre BeyondTrustEPMLogExportRole al rol y haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:
Ajuste Valor
Nombre BeyondTrustEPMLogExport
Tiempo de ejecución Python 3.13
Arquitectura x86_64
Rol de ejecución BeyondTrustEPMLogExportRole
  1. Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (BeyondTrustEPMLogExport.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. Ve a Configuración > Variables de entorno > Editar > Añadir nueva variable de entorno.

  3. Introduce las siguientes variables de entorno y sustituye los valores por los tuyos.

    Clave Valor de ejemplo
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  5. Seleccione la pestaña Configuración.

  6. En el panel Configuración general, haz clic en Editar.

  7. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
  2. Proporcione los siguientes detalles de configuración:
    • Programación periódica: Precio (1 hour).
    • Destino: tu función Lambda BeyondTrustEPMLogExport.
    • Nombre: BeyondTrustEPMLogExport-1h.
  3. Haz clic en Crear programación.

Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps

  1. Ve a Consola de AWS > IAM > Usuarios > Añadir usuarios.
  2. Haz clic en Add users (Añadir usuarios).
  3. Proporcione los siguientes detalles de configuración:
    • Usuario: introduce secops-reader.
    • Tipo de acceso: selecciona Clave de acceso – Acceso programático.
  4. Haz clic en Crear usuario.
  5. Asigna una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
  6. En el editor de JSON, introduce la siguiente política:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. Asigna el nombre secops-reader-policy.

  8. Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.

  9. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  10. Descarga el archivo CSV (estos valores se introducen en el feed).

Configurar feeds (ambas opciones)

Para configurar un feed, sigue estos pasos:

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Añadir nuevo feed.
  3. En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo, BeyondTrust EPM logs).
  4. Selecciona Amazon S3 V2 como Tipo de fuente.
  5. Selecciona BeyondTrust Endpoint Privilege Management como Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifique los valores de los siguientes parámetros de entrada:
    • URI de S3: el URI del contenedor
      • s3://your-log-bucket-name/. Sustituye your-log-bucket-name por el nombre real del segmento.
    • Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
    • Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
    • ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
    • Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
agent.id principal.asset.attribute.labels.value Asignado a la etiqueta con la clave agent_id
agent.version principal.asset.attribute.labels.value Asignado a la etiqueta con la clave agent_version
ecs.version principal.asset.attribute.labels.value Asignado a la etiqueta con la clave ecs_version
event_data.reason metadata.description Descripción del evento del registro sin procesar
event_datas.ActionId metadata.product_log_id Identificador de registro específico del producto
file.path principal.file.full_path Ruta de archivo completa del evento
headers.content_length additional.fields.value.string_value Asignado a la etiqueta con la clave content_length
headers.content_type additional.fields.value.string_value Asignado a la etiqueta con la clave content_type
headers.http_host additional.fields.value.string_value Asignado a la etiqueta con la clave http_host
headers.http_version network.application_protocol_version Versión del protocolo HTTP
headers.request_method network.http.method Método de solicitud HTTP
host.hostname principal.hostname Nombre de host principal
host.hostname principal.asset.hostname Nombre de host del recurso principal
host.ip principal.asset.ip Dirección IP del recurso principal
host.ip principal.ip Dirección IP principal
host.mac principal.mac Dirección MAC principal
host.os.platform principal.platform Se establece en MAC si es igual a macOS
host.os.version principal.platform_version Versión del sistema operativo
labels.related_item_id metadata.product_log_id Identificador de elemento relacionado
process.command_line principal.process.command_line Línea de comandos del proceso
process.name additional.fields.value.string_value Asignado a la etiqueta con la clave process_name
process.parent.name additional.fields.value.string_value Asignado a la etiqueta con la clave process_parent_name
process.parent.pid principal.process.parent_process.pid PID de proceso superior convertido en cadena
process.pid principal.process.pid PID de proceso convertido en cadena
user.id principal.user.userid Identificador de usuario
user.name principal.user.user_display_name Nombre visible del usuario
N/A metadata.event_timestamp Marca de tiempo del evento definida como marca de tiempo de la entrada de registro
N/A metadata.event_type GENERIC_EVENT si no hay principal; de lo contrario, STATUS_UPDATE
N/A network.application_protocol Se define como HTTP si el campo http_version contiene HTTP.

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.