Recoger registros de BeyondTrust Endpoint Privilege Management (EPM)
En este documento se explica cómo ingerir registros de BeyondTrust Endpoint Privilege Management (EPM) en Google Security Operations mediante dos enfoques diferentes: la recogida basada en EC2 y la recogida basada en AWS Lambda mediante Amazon S3. El analizador se centra en transformar los datos de registro JSON sin procesar de BeyondTrust Endpoint en un formato estructurado que se ajuste al UDM de Chronicle. Primero, inicializa los valores predeterminados de varios campos y, a continuación, analiza la carga útil de JSON. Después, asigna campos específicos del registro sin procesar a los campos de UDM correspondientes dentro del objeto event.idm.read_only_udm
.
Antes de empezar
Asegúrate de que cumples los siguientes requisitos previos:
- Instancia de Google SecOps
- Acceso con privilegios al tenant o a la API de BeyondTrust Endpoint Privilege Management
- Acceso privilegiado a AWS (S3, IAM, Lambda/EC2 y EventBridge)
Elige el método de integración
Puedes elegir entre dos métodos de integración:
- Opción 1: Recogida basada en EC2: usa una instancia de EC2 con secuencias de comandos programadas para recoger registros.
- Opción 2: Recogida basada en AWS Lambda: usa funciones Lambda sin servidor con la programación de EventBridge.
Opción 1: Recogida basada en EC2
Configurar la gestión de identidades y accesos de AWS para la ingestión de Google SecOps
- Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros en Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para futuras consultas.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar BeyondTrust EPM para el acceso a la API
- Inicia sesión en la consola web de gestión de privilegios de BeyondTrust como administrador.
- Ve a Configuración > Ajustes > Ajustes de la API.
- Haz clic en Create an API Account (Crear una cuenta de API).
- Proporcione los siguientes detalles de configuración:
- Nombre: escribe
Google SecOps Collector
. - Acceso a la API: habilita Auditoría (lectura) y otros ámbitos según sea necesario.
- Nombre: escribe
- Copia y guarda el ID de cliente y el secreto de cliente.
- Copia la URL base de la API. Normalmente es
https://<your-tenant>-services.pm.beyondtrustcloud.com
(la usarás como BPT_API_URL).
Crear un segmento de AWS S3
- Inicia sesión en la consola de administración de AWS.
- Ve a Consola de AWS > Servicios > S3 > Crear un bucket.
- Proporcione los siguientes detalles de configuración:
- Nombre del segmento:
my-beyondtrust-logs
. - Región: [tu elección] > Crear.
- Nombre del segmento:
Crear un rol de gestión de identidades y accesos para EC2
- Inicia sesión en la consola de administración de AWS.
- Ve a Consola de AWS > Servicios > IAM > Roles > Crear rol.
- Proporcione los siguientes detalles de configuración:
- Entidad de confianza: Servicio de AWS > EC2 > Siguiente.
- Adjuntar permiso: AmazonS3FullAccess (o una política con ámbito a tu bucket) > Siguiente.
- Nombre del rol:
EC2-S3-BPT-Writer
> Crear rol.
Lanzar y configurar la VM de EC2 Collector
- Inicia sesión en la consola de administración de AWS.
- Ve a Servicios.
- En la barra de búsqueda, escribe EC2 y selecciónalo.
- En el panel de control de EC2, haga clic en Instancias.
- Haz clic en Lanzar instancias.
- Proporcione los siguientes detalles de configuración:
- Nombre: escribe
BPT-Log-Collector
. - AMI: selecciona Ubuntu Server 22.04 LTS.
- Tipo de instancia: t3.micro (o un tipo más grande) y, a continuación, haz clic en Siguiente.
- Red: asegúrate de que el ajuste Red esté configurado en tu VPC predeterminada.
- Rol de gestión de identidades y accesos: selecciona el rol de gestión de identidades y accesos EC2-S3-BPT-Writer en el menú.
- Asignar automáticamente IP pública: habilita esta opción (o asegúrate de que puedes acceder a ella mediante una VPN) > Siguiente.
- Añadir almacenamiento: deje la configuración de almacenamiento predeterminada (8 GiB) y haga clic en Siguiente.
- Selecciona Crear un grupo de seguridad.
- Regla de entrada: haz clic en Añadir regla.
- Tipo: selecciona SSH.
- Puerto: 22.
- Origen: tu IP
- Haga clic en Revisar y publicar.
- Seleccione o cree un par de claves.
- Haz clic en Descargar par de claves.
- Guarda el archivo PEM descargado. Necesitarás este archivo para conectarte a tu instancia mediante SSH.
- Nombre: escribe
- Conéctate a tu máquina virtual (VM) mediante SSH.
Instalar los requisitos previos del recopilador
Ejecuta el siguiente comando:
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
Actualiza el sistema e instala las dependencias:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
Crea un directorio y un archivo de estado:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
Inicialízalo (por ejemplo, hace 1 hora):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
Implementar la secuencia de comandos del recopilador de EPM de BeyondTrust
Crea una carpeta de proyecto:
mkdir ~/bpt-collector && cd ~/bpt-collector
Exporta las variables de entorno necesarias (por ejemplo, en
~/.bashrc
):export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"
Crea
collector_bpt.py
e introduce el siguiente código:#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()
Haz que sea ejecutable:
chmod +x collector_bpt.py
Programar diariamente con Cron
Ejecuta el siguiente comando:
crontab -e
Añade la tarea diaria a medianoche UTC:
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
Opción 2: Recogida basada en AWS Lambda
Recopilar los requisitos previos de BeyondTrust EPM
- Inicia sesión en la consola web de gestión de privilegios de BeyondTrust como administrador.
- Ve a Configuración del sistema > API REST > Tokens.
- Haz clic en Add Token (Añadir token).
- Proporcione los siguientes detalles de configuración:
- Nombre: escribe
Google SecOps Collector
. - Ámbitos: selecciona Audit:Read y otros ámbitos según sea necesario.
- Nombre: escribe
- Haz clic en Guardar y copia el valor del token.
- Copia y guarda en un lugar seguro los siguientes detalles:
- URL base de la API: la URL de la API de BeyondTrust EPM (por ejemplo,
https://yourtenant-services.pm.beyondtrustcloud.com
). - ID de cliente: en la configuración de tu aplicación OAuth.
- Secreto de cliente: en la configuración de tu aplicación OAuth.
- URL base de la API: la URL de la API de BeyondTrust EPM (por ejemplo,
Configurar un segmento de AWS S3 y IAM para Google SecOps
- Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
- Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo,
beyondtrust-epm-logs-bucket
). - Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros como Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3
- En la consola de AWS, ve a IAM > Policies > Create policy > pestaña JSON.
Copia y pega la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- Sustituye
beyondtrust-epm-logs-bucket
si has introducido otro nombre de segmento.
- Sustituye
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política que acabas de crear y la política gestionada AWSLambdaBasicExecutionRole (para el registro de CloudWatch).
Dale el nombre
BeyondTrustEPMLogExportRole
al rol y haz clic en Crear rol.
Crear la función Lambda
- En la consola de AWS, ve a Lambda > Funciones > Crear función.
- Haz clic en Crear desde cero.
- Proporciona los siguientes detalles de configuración:
Ajuste | Valor |
---|---|
Nombre | BeyondTrustEPMLogExport |
Tiempo de ejecución | Python 3.13 |
Arquitectura | x86_64 |
Rol de ejecución | BeyondTrustEPMLogExportRole |
Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (
BeyondTrustEPMLogExport.py
):import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'
Ve a Configuración > Variables de entorno > Editar > Añadir nueva variable de entorno.
Introduce las siguientes variables de entorno y sustituye los valores por los tuyos.
Clave Valor de ejemplo S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).
Seleccione la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crear una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
- Proporcione los siguientes detalles de configuración:
- Programación periódica: Precio (
1 hour
). - Destino: tu función Lambda
BeyondTrustEPMLogExport
. - Nombre:
BeyondTrustEPMLogExport-1h
.
- Programación periódica: Precio (
- Haz clic en Crear programación.
Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps
- Ve a Consola de AWS > IAM > Usuarios > Añadir usuarios.
- Haz clic en Add users (Añadir usuarios).
- Proporcione los siguientes detalles de configuración:
- Usuario: introduce
secops-reader
. - Tipo de acceso: selecciona Clave de acceso – Acceso programático.
- Usuario: introduce
- Haz clic en Crear usuario.
- Asigna una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
En el editor de JSON, introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
Asigna el nombre
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se introducen en el feed).
Configurar feeds (ambas opciones)
Para configurar un feed, sigue estos pasos:
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Añadir nuevo feed.
- En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo,
BeyondTrust EPM logs
). - Selecciona Amazon S3 V2 como Tipo de fuente.
- Selecciona BeyondTrust Endpoint Privilege Management como Tipo de registro.
- Haz clic en Siguiente.
- Especifique los valores de los siguientes parámetros de entrada:
- URI de S3: el URI del contenedor
s3://your-log-bucket-name/
. Sustituyeyour-log-bucket-name
por el nombre real del segmento.
- Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
- ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
- Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres de recursos: el espacio de nombres de recursos.
- Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
- URI de S3: el URI del contenedor
- Haz clic en Siguiente.
- Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
agent.id |
principal.asset.attribute.labels.value |
Asignado a la etiqueta con la clave agent_id |
agent.version |
principal.asset.attribute.labels.value |
Asignado a la etiqueta con la clave agent_version |
ecs.version |
principal.asset.attribute.labels.value |
Asignado a la etiqueta con la clave ecs_version |
event_data.reason |
metadata.description |
Descripción del evento del registro sin procesar |
event_datas.ActionId |
metadata.product_log_id |
Identificador de registro específico del producto |
file.path |
principal.file.full_path |
Ruta de archivo completa del evento |
headers.content_length |
additional.fields.value.string_value |
Asignado a la etiqueta con la clave content_length |
headers.content_type |
additional.fields.value.string_value |
Asignado a la etiqueta con la clave content_type |
headers.http_host |
additional.fields.value.string_value |
Asignado a la etiqueta con la clave http_host |
headers.http_version |
network.application_protocol_version |
Versión del protocolo HTTP |
headers.request_method |
network.http.method |
Método de solicitud HTTP |
host.hostname |
principal.hostname |
Nombre de host principal |
host.hostname |
principal.asset.hostname |
Nombre de host del recurso principal |
host.ip |
principal.asset.ip |
Dirección IP del recurso principal |
host.ip |
principal.ip |
Dirección IP principal |
host.mac |
principal.mac |
Dirección MAC principal |
host.os.platform |
principal.platform |
Se establece en MAC si es igual a macOS |
host.os.version |
principal.platform_version |
Versión del sistema operativo |
labels.related_item_id |
metadata.product_log_id |
Identificador de elemento relacionado |
process.command_line |
principal.process.command_line |
Línea de comandos del proceso |
process.name |
additional.fields.value.string_value |
Asignado a la etiqueta con la clave process_name |
process.parent.name |
additional.fields.value.string_value |
Asignado a la etiqueta con la clave process_parent_name |
process.parent.pid |
principal.process.parent_process.pid |
PID de proceso superior convertido en cadena |
process.pid |
principal.process.pid |
PID de proceso convertido en cadena |
user.id |
principal.user.userid |
Identificador de usuario |
user.name |
principal.user.user_display_name |
Nombre visible del usuario |
N/A | metadata.event_timestamp |
Marca de tiempo del evento definida como marca de tiempo de la entrada de registro |
N/A | metadata.event_type |
GENERIC_EVENT si no hay principal; de lo contrario, STATUS_UPDATE |
N/A | network.application_protocol |
Se define como HTTP si el campo http_version contiene HTTP. |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.