Coletar registros do BeyondTrust Endpoint Privilege Management (EPM)

Compatível com:

Este documento explica como ingerir registros do BeyondTrust Endpoint Privilege Management (EPM) no Google Security Operations usando duas abordagens diferentes: coleta baseada em EC2 e coleta baseada em AWS Lambda usando o Amazon S3. O analisador se concentra em transformar dados de registro JSON brutos do BeyondTrust Endpoint em um formato estruturado de acordo com o UDM do Chronicle. Primeiro, ele inicializa os valores padrão para vários campos e, em seguida, analisa o payload JSON, mapeando campos específicos do registro bruto para os campos correspondentes da UDM no objeto event.idm.read_only_udm.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Instância do Google SecOps
  • Acesso privilegiado ao locatário ou à API do BeyondTrust Endpoint Privilege Management
  • Acesso privilegiado à AWS (S3, IAM, Lambda/EC2, EventBridge)

Escolher o método de integração

Você pode escolher entre dois métodos de integração:

  • Opção 1: coleta baseada no EC2: usa uma instância do EC2 com scripts programados para coleta de registros.
  • Opção 2: coleta baseada no AWS Lambda: usa funções Lambda sem servidor com programação do EventBridge.

Opção 1: coleta baseada no EC2

Configurar o IAM da AWS para a ingestão do Google SecOps

  1. Crie um usuário seguindo este guia do usuário: Como criar um usuário do IAM.
  2. Selecione o usuário criado.
  3. Selecione a guia Credenciais de segurança.
  4. Clique em Criar chave de acesso na seção Chaves de acesso.
  5. Selecione Serviço de terceiros como Caso de uso.
  6. Clique em Próxima.
  7. Opcional: adicione uma tag de descrição.
  8. Clique em Criar chave de acesso.
  9. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
  10. Clique em Concluído.
  11. Selecione a guia Permissões.
  12. Clique em Adicionar permissões na seção Políticas de permissões.
  13. Selecione Adicionar permissões.
  14. Selecione Anexar políticas diretamente.
  15. Pesquise e selecione a política AmazonS3FullAccess.
  16. Clique em Próxima.
  17. Clique em Adicionar permissões

Configurar o BeyondTrust EPM para acesso à API

  1. Faça login no console da Web do BeyondTrust Privilege Management como administrador.
  2. Acesse Configuração > Configurações > Configurações da API.
  3. Clique em Criar uma conta de API.
  4. Informe os seguintes detalhes de configuração:
    • Nome: insira Google SecOps Collector.
    • Acesso à API: ative Auditoria (leitura) e outros escopos conforme necessário.
  5. Copie e salve o ID do cliente e a chave secreta do cliente.
  6. Copie o URL de base da API. Normalmente, ele é https://<your-tenant>-services.pm.beyondtrustcloud.com. Use-o como BPT_API_URL.

Criar um bucket do AWS S3

  1. Faça login no Console de Gerenciamento da AWS.
  2. Acesse Console da AWS > Serviços > S3 > Criar bucket.
  3. Informe os seguintes detalhes de configuração:
    • Nome do bucket: my-beyondtrust-logs.
    • Região: [sua escolha] > Criar.

Criar um papel do IAM para o EC2

  1. Faça login no Console de Gerenciamento da AWS.
  2. Acesse Console da AWS > Serviços > IAM > Funções > Criar função.
  3. Informe os seguintes detalhes de configuração:
    • Entidade confiável: Serviço da AWS > EC2 > Próxima.
    • Anexar permissão: AmazonS3FullAccess (ou uma política com escopo para seu bucket) > Próxima.
    • Nome da função: EC2-S3-BPT-Writer > Criar função.

Inicie e configure a VM do coletor do EC2

  1. Faça login no Console de Gerenciamento da AWS.
  2. Acesse Serviços.
  3. Na barra de pesquisa, digite EC2 e selecione essa opção.
  4. No painel do EC2, clique em Instâncias.
  5. Clique em Iniciar instâncias.
  6. Informe os seguintes detalhes de configuração:
    • Nome: insira BPT-Log-Collector.
    • AMI: selecione Ubuntu Server 22.04 LTS.
    • Tipo de instância: t3.micro (ou maior). Em seguida, clique em Próxima.
    • Rede: verifique se a configuração Rede está definida como sua VPC padrão.
    • Função do IAM: selecione a função do IAM EC2-S3-BPT-Writer no menu.
    • Atribuir IP público automaticamente: ative ou verifique se é possível acessar usando VPN > Próxima.
    • Adicionar armazenamento: deixe a configuração de armazenamento padrão (8 GiB) e clique em Próxima.
    • Selecione Criar um novo grupo de segurança.
    • Regra de entrada: clique em Adicionar regra.
    • Tipo: selecione SSH.
    • Porta: 22.
    • Origem: seu IP
    • Clique em Revisar e iniciar.
    • Selecione ou crie um par de chaves.
    • Clique em Fazer o download do par de chaves.
    • Salve o arquivo PEM baixado. Você vai precisar desse arquivo para se conectar à instância usando SSH.
  7. Conecte-se à sua máquina virtual (VM) usando SSH.

Instalar os pré-requisitos do coletor

  1. Execute este comando:

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. Atualize o sistema e instale as dependências:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. Crie um diretório e um arquivo de estado:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. Inicialize-o (por exemplo, para 1 hora atrás):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

Implantar o script do coletor do BeyondTrust EPM

  1. Crie uma pasta para o projeto:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. Exporte as variáveis de ambiente necessárias (por exemplo, em ~/.bashrc):

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. Crie collector_bpt.py e insira o seguinte código:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. Torne-o executável:

    chmod +x collector_bpt.py
    

Programar diariamente com o Cron

  1. Execute este comando:

    crontab -e
    
  2. Adicione o job diário à meia-noite UTC:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

Opção 2: coleta baseada no AWS Lambda

Coletar pré-requisitos do BeyondTrust EPM

  1. Faça login no console da Web do BeyondTrust Privilege Management como administrador.
  2. Acesse Configuração do sistema > API REST > Tokens.
  3. Clique em Adicionar token.
  4. Informe os seguintes detalhes de configuração:
    • Nome: insira Google SecOps Collector.
    • Escopos: selecione Audit:Read e outros escopos conforme necessário.
  5. Clique em Salvar e copie o valor do token.
  6. Copie e salve em um local seguro os seguintes detalhes:
    • URL base da API: o URL da API EPM do BeyondTrust (por exemplo, https://yourtenant-services.pm.beyondtrustcloud.com).
    • ID do cliente: na configuração do aplicativo OAuth.
    • Chave secreta do cliente: na configuração do aplicativo OAuth.

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket.
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, beyondtrust-epm-logs-bucket).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clique em Próxima.
  19. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
  2. Copie e cole a seguinte política:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • Substitua beyondtrust-epm-logs-bucket se você tiver inserido um nome de bucket diferente.
  3. Clique em Próxima > Criar política.

  4. Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.

  5. Anexe a política recém-criada e a política gerenciada AWSLambdaBasicExecutionRole (para geração de registros do CloudWatch).

  6. Nomeie a função como BeyondTrustEPMLogExportRole e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:
Configuração Valor
Nome BeyondTrustEPMLogExport
Ambiente de execução Python 3.13
Arquitetura x86_64
Função de execução BeyondTrustEPMLogExportRole
  1. Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (BeyondTrustEPMLogExport.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  3. Insira as seguintes variáveis de ambiente, substituindo pelos seus valores.

    Chave Valor de exemplo
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.

  5. Selecione a guia Configuração.

  6. No painel Configuração geral, clique em Editar.

  7. Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour).
    • Destino: sua função Lambda BeyondTrustEPMLogExport.
    • Nome: BeyondTrustEPMLogExport-1h.
  3. Clique em Criar programação.

Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps

  1. Acesse Console da AWS > IAM > Usuários > Adicionar usuários.
  2. Clique em Add users.
  3. Informe os seguintes detalhes de configuração:
    • Usuário: insira secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar usuário.
  5. Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
  6. No editor JSON, insira a seguinte política:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.

  9. Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Faça o download do CSV (esses valores são inseridos no feed).

Configurar feeds (ambas as opções)

Para configurar um feed, siga estas etapas:

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, insira um nome para o feed (por exemplo, BeyondTrust EPM logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione BeyondTrust Endpoint Privilege Management como o Tipo de registro.
  6. Clique em Próxima.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: o URI do bucket
      • s3://your-log-bucket-name/. Substitua your-log-bucket-name pelo nome real do bucket.
    • Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  8. Clique em Próxima.
  9. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento da UDM

Campo de registro Mapeamento do UDM Lógica
agent.id principal.asset.attribute.labels.value Associado ao rótulo com a chave agent_id
agent.version principal.asset.attribute.labels.value Associado ao rótulo com a chave agent_version
ecs.version principal.asset.attribute.labels.value Associado ao rótulo com a chave ecs_version
event_data.reason metadata.description Descrição do evento do registro bruto
event_datas.ActionId metadata.product_log_id Identificador de registro específico do produto
file.path principal.file.full_path Caminho completo do arquivo do evento
headers.content_length additional.fields.value.string_value Associado ao rótulo com a chave content_length
headers.content_type additional.fields.value.string_value Associado ao rótulo com a chave content_type
headers.http_host additional.fields.value.string_value Associado ao rótulo com a chave http_host
headers.http_version network.application_protocol_version Versão do protocolo HTTP
headers.request_method network.http.method Método de solicitação HTTP
host.hostname principal.hostname Nome do host principal
host.hostname principal.asset.hostname Nome do host do recurso principal
host.ip principal.asset.ip Endereço IP do recurso principal
host.ip principal.ip Endereço IP principal
host.mac principal.mac Endereço MAC principal
host.os.platform principal.platform Definido como MAC se for igual a macOS
host.os.version principal.platform_version Versão do sistema operacional
labels.related_item_id metadata.product_log_id Identificador de item relacionado
process.command_line principal.process.command_line Linha de comando do processo
process.name additional.fields.value.string_value Associado ao rótulo com a chave process_name
process.parent.name additional.fields.value.string_value Associado ao rótulo com a chave process_parent_name
process.parent.pid principal.process.parent_process.pid PID do processo pai convertido em string
process.pid principal.process.pid PID do processo convertido em string
user.id principal.user.userid Identificador de usuário
user.name principal.user.user_display_name Nome de exibição do usuário
N/A metadata.event_timestamp Carimbo de data/hora do evento definido como o carimbo de data/hora da entrada de registro
N/A metadata.event_type GENERIC_EVENT se não houver principal, caso contrário, STATUS_UPDATE
N/A network.application_protocol Definido como HTTP se o campo "http_version" contiver HTTP

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.