Recolha registos de SSO da Delinea

Compatível com:

Este documento explica como carregar registos de início de sessão único (SSO) da Delinea (anteriormente Centrify) para o Google Security Operations através do Amazon S3. O analisador extrai os registos, processando os formatos JSON e syslog. Analisa pares de chave-valor, datas/horas e outros campos relevantes, mapeando-os para o modelo de UDM, com uma lógica específica para processar falhas de início de sessão, agentes do utilizador, níveis de gravidade, mecanismos de autenticação e vários tipos de eventos. Prioriza FailUserName em vez de NormalizedUser para endereços de email de destino em eventos de falha.

Antes de começar

Certifique-se de que cumpre os seguintes pré-requisitos:

  • Uma instância do Google SecOps.
  • Acesso privilegiado ao inquilino do SSO da Delinea (Centrify).
  • Acesso privilegiado à AWS (S3, Identity and Access Management [IAM], Lambda e EventBridge).

Recolha os pré-requisitos do SSO da Delinea (Centrify) (IDs, chaves da API, IDs da organização e tokens)

  1. Inicie sessão no portal de administração da Delinea.
  2. Aceda a Apps > Adicionar apps.
  3. Pesquise Cliente OAuth2 e clique em Adicionar.
  4. Clique em Sim na caixa de diálogo Adicionar app Web.
  5. Clique em Fechar na caixa de diálogo Adicionar apps Web.
  6. Na página Configuração da aplicação, configure o seguinte:
    • Separador Geral:
      • ID da aplicação: introduza um identificador exclusivo (por exemplo, secops-oauth-client)
      • Nome da aplicação: introduza um nome descritivo (por exemplo, SecOps Data Export)
      • Descrição da aplicação: introduza a descrição (por exemplo, OAuth client for exporting audit events to SecOps)
    • Separador Confiança:
      • A aplicação é confidencial: selecione esta opção
      • Tipo de ID de cliente: selecione Confidencial
      • ID de cliente emitido: copie e guarde este valor
      • Segredo do cliente emitido: copie e guarde este valor
    • Separador Tokens:
      • Métodos de autorização: selecione Credenciais do cliente
      • Tipo de token: selecione Jwt RS256
    • Separador Âmbito:
      • Adicione o âmbito siem com a descrição Acesso à integração do SIEM.
      • Adicione o âmbito redrock/query com a descrição Acesso à API de consultas.
  7. Clique em Guardar para criar o cliente OAuth.
  8. Aceda a Serviços principais > Utilizadores > Adicionar utilizador.
  9. Configure o utilizador do serviço:
    • Nome de início de sessão: introduza o ID de cliente do passo 6.
    • Endereço de email: introduza um email válido (campo obrigatório).
    • Nome a apresentar: introduza um nome descritivo (por exemplo, SecOps Service User).
    • Palavra-passe e Confirmar palavra-passe: introduza o segredo do cliente do passo 6
    • Estado: selecione É um cliente confidencial OAuth.
  10. Clique em Criar utilizador.
  11. Aceda a Acesso > Funções e atribua o utilizador do serviço a uma função com as autorizações adequadas para consultar eventos de auditoria.
  12. Copie e guarde numa localização segura os seguintes detalhes:
    • URL do inquilino: o URL do seu inquilino do Centrify (por exemplo, https://yourtenant.my.centrify.com)
    • ID de cliente: do passo 6
    • Segredo do cliente: do passo 6
    • ID da aplicação OAuth: a partir da configuração da aplicação

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor.
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, delinea-centrify-logs-bucket).
  3. Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro .CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise a política AmazonS3FullAccess.
  18. Selecione a política.
  19. Clicar em Seguinte.
  20. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas.
  2. Clique em Criar política > separador JSON.
  3. Copie e cole a seguinte política.
  4. JSON da política (substitua delinea-centrify-logs-bucket se tiver introduzido um nome de contentor diferente):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json"
        }
      ]
    }
    
  5. Clique em Seguinte > Criar política.

  6. Aceda a IAM > Funções.

  7. Clique em Criar função > Serviço AWS > Lambda.

  8. Anexe a política recém-criada e a política gerida AWSLambdaBasicExecutionRole (para registo do CloudWatch).

  9. Dê o nome CentrifySSOLogExportRole à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:

    Definição Valor
    Nome CentrifySSOLogExport
    Runtime Python 3.13
    Arquitetura x86_64
    Função de execução CentrifySSOLogExportRole
  4. Depois de criar a função, abra o separador Código, elimine o stub e cole o seguinte código (CentrifySSOLogExport.py).

    import json
    import boto3
    import requests
    import base64
    from datetime import datetime, timedelta
    import os
    from typing import Dict, List, Optional
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Delinea Centrify SSO audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # Centrify API credentials
        TENANT_URL = os.environ['TENANT_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_APP_ID = os.environ['OAUTH_APP_ID']
    
        # Optional parameters
        PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
        MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID)
    
            # Fetch audit events
            events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing Centrify SSO logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str:
        """
        Get OAuth access token using client credentials flow
        """
    
        # Create basic auth token
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode()).decode()
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
    
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'siem redrock/query'
        }
    
        response = requests.post(token_url, headers=headers, data=data)
        response.raise_for_status()
    
        token_data = response.json()
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]:
        """
        Fetch audit events from Centrify using the Redrock/query API
        """
    
        query_url = f"{tenant_url}/Redrock/query"
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        payload = {
            "Script": sql_query,
            "args": {
                "PageSize": page_size,
                "Limit": page_size * max_pages,
                "Caching": -1
            }
        }
    
        response = requests.post(query_url, headers=headers, json=payload)
        response.raise_for_status()
    
        response_data = response.json()
    
        if not response_data.get('success', False):
            raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
        # Parse the response
        result = response_data.get('Result', {})
        columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
        raw_results = result.get('Results', [])
    
        events = []
        for raw_event in raw_results:
            event = {}
            row_data = raw_event.get('Row', {})
    
            # Map column names to values
            for col_name, col_index in columns.items():
                if col_name in row_data and row_data[col_name] is not None:
                    event[col_name] = row_data[col_name]
    
            # Add metadata
            event['_source'] = 'centrify_sso'
            event['_collected_at'] = datetime.utcnow().isoformat() + 'Z'
    
            events.append(event)
    
        return events
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  5. Aceda a Configuração > Variáveis de ambiente.

  6. Clique em Editar > Adicionar nova variável de ambiente.

  7. Introduza as variáveis de ambiente fornecidas na tabela seguinte, substituindo os valores de exemplo pelos seus valores.

    Variáveis de ambiente

    Chave Valor de exemplo
    S3_BUCKET delinea-centrify-logs-bucket
    S3_PREFIX centrify-sso-logs/
    STATE_KEY centrify-sso-logs/state.json
    TENANT_URL https://yourtenant.my.centrify.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_APP_ID your-oauth-application-id
    OAUTH_SCOPE siem
    PAGE_SIZE 1000
    MAX_PAGES 10
  8. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > a sua função).

  9. Selecione o separador Configuração.

  10. No painel Configuração geral, clique em Editar.

  11. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Alvo: a sua função Lambda CentrifySSOLogExport.
    • Nome: CentrifySSOLogExport-1h.
  3. Clique em Criar programação.

(Opcional) Crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Utilizadores.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: introduza secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Utilizadores > secops-reader > Autorizações.
  6. Clique em Adicionar autorizações > Anexar políticas diretamente.
  7. Selecione Criar política.
  8. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket"
        }
      ]
    }
    
  9. Nome = secops-reader-policy.

  10. Clique em Criar política > pesquise/selecione > Seguinte.

  11. Clique em Adicionar autorizações.

  12. Crie uma chave de acesso para secops-reader: Credenciais de segurança > Chaves de acesso.

  13. Clique em Criar chave de acesso.

  14. Transfira o .CSV. (Vai colar estes valores no feed).

Configure um feed no Google SecOps para carregar registos de SSO da Delinea (Centrify)

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Delinea Centrify SSO logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Centrify como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://delinea-centrify-logs-bucket/centrify-sso-logs/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento do UDM

Campo de registo Mapeamento do UDM Lógica
AccountID security_result.detection_fields.value O valor de AccountID do registo não processado é atribuído a um objeto security_result.detection_fields com key:Account ID.
ApplicationName target.application O valor de ApplicationName do registo não processado é atribuído ao campo target.application.
AuthorityFQDN target.asset.network_domain O valor de AuthorityFQDN do registo não processado é atribuído ao campo target.asset.network_domain.
AuthorityID target.asset.asset_id O valor de AuthorityID do registo não processado é atribuído ao campo target.asset.asset_id, com o prefixo "AuthorityID:".
AzDeploymentId security_result.detection_fields.value O valor de AzDeploymentId do registo não processado é atribuído a um objeto security_result.detection_fields com key:AzDeploymentId.
AzRoleId additional.fields.value.string_value O valor de AzRoleId do registo não processado é atribuído a um objeto additional.fields com key:AzRole Id.
AzRoleName target.user.attribute.roles.name O valor de AzRoleName do registo não processado é atribuído ao campo target.user.attribute.roles.name.
ComputerFQDN principal.asset.network_domain O valor de ComputerFQDN do registo não processado é atribuído ao campo principal.asset.network_domain.
ComputerID principal.asset.asset_id O valor de ComputerID do registo não processado é atribuído ao campo principal.asset.asset_id, com o prefixo "ComputerId:".
ComputerName about.hostname O valor de ComputerName do registo não processado é atribuído ao campo about.hostname.
CredentialId security_result.detection_fields.value O valor de CredentialId do registo não processado é atribuído a um objeto security_result.detection_fields com key:Credential Id.
DirectoryServiceName security_result.detection_fields.value O valor de DirectoryServiceName do registo não processado é atribuído a um objeto security_result.detection_fields com key:Directory Service Name.
DirectoryServiceNameLocalized security_result.detection_fields.value O valor de DirectoryServiceNameLocalized do registo não processado é atribuído a um objeto security_result.detection_fields com key:Directory Service Name Localized.
DirectoryServiceUuid security_result.detection_fields.value O valor de DirectoryServiceUuid do registo não processado é atribuído a um objeto security_result.detection_fields com key:Directory Service Uuid.
EventMessage security_result.summary O valor de EventMessage do registo não processado é atribuído ao campo security_result.summary.
EventType metadata.product_event_type O valor de EventType do registo não processado é atribuído ao campo metadata.product_event_type. Também é usado para determinar o metadata.event_type.
FailReason security_result.summary O valor de FailReason do registo não processado é atribuído ao campo security_result.summary quando presente.
FailUserName target.user.email_addresses O valor de FailUserName do registo não processado é atribuído ao campo target.user.email_addresses quando presente.
FromIPAddress principal.ip O valor de FromIPAddress do registo não processado é atribuído ao campo principal.ip.
ID security_result.detection_fields.value O valor de ID do registo não processado é atribuído a um objeto security_result.detection_fields com key:ID.
InternalTrackingID metadata.product_log_id O valor de InternalTrackingID do registo não processado é atribuído ao campo metadata.product_log_id.
JumpType additional.fields.value.string_value O valor de JumpType do registo não processado é atribuído a um objeto additional.fields com key:Jump Type.
NormalizedUser target.user.email_addresses O valor de NormalizedUser do registo não processado é atribuído ao campo target.user.email_addresses.
OperationMode additional.fields.value.string_value O valor de OperationMode do registo não processado é atribuído a um objeto additional.fields com key:Operation Mode.
ProxyId security_result.detection_fields.value O valor de ProxyId do registo não processado é atribuído a um objeto security_result.detection_fields com key:Proxy Id.
RequestUserAgent network.http.user_agent O valor de RequestUserAgent do registo não processado é atribuído ao campo network.http.user_agent.
SessionGuid network.session_id O valor de SessionGuid do registo não processado é atribuído ao campo network.session_id.
Tenant additional.fields.value.string_value O valor de Tenant do registo não processado é atribuído a um objeto additional.fields com key:Tenant.
ThreadType additional.fields.value.string_value O valor de ThreadType do registo não processado é atribuído a um objeto additional.fields com key:Thread Type.
UserType principal.user.attribute.roles.name O valor de UserType do registo não processado é atribuído ao campo principal.user.attribute.roles.name.
WhenOccurred metadata.event_timestamp O valor de WhenOccurred do registo não processado é analisado e atribuído ao campo metadata.event_timestamp. Este campo também preenche o campo timestamp de nível superior. Valor codificado "SSO". Determinado pelo campo EventType. A predefinição é STATUS_UPDATE se EventType não estiver presente ou não corresponder a nenhum critério específico. Pode ser USER_LOGIN, USER_CREATION, USER_RESOURCE_ACCESS, USER_LOGOUT ou USER_CHANGE_PASSWORD. Valor codificado "CENTRIFY_SSO". Valor codificado "SSO". Valor codificado "Centrify". Se o campo message contiver um ID da sessão, este é extraído e usado. Caso contrário, a predefinição é "1". Extraído do campo host, se disponível, que provém do cabeçalho syslog. Extraído do campo pid, se disponível, que provém do cabeçalho syslog. Se UserGuid estiver presente, é usado o respetivo valor. Caso contrário, se o campo message contiver um ID do utilizador, este é extraído e usado. Definido como "ALLOW" se Level for "Info" e "BLOCK" se FailReason estiver presente. Definido como "AUTH_VIOLATION" se FailReason estiver presente. Determinado pelo campo Level. Definido como "INFORMATIONAL" se Level for "Info", "MEDIUM" se Level for "Warning" e "ERROR" se Level for "Error".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.