Coletar registros do SSO da Delinea

Compatível com:

Este documento explica como ingerir registros de logon único (SSO) da Delinea (antiga Centrify) no Google Security Operations usando o Amazon S3. O analisador extrai os registros, processando os formatos JSON e syslog. Ele analisa pares de chave-valor, carimbos de data/hora e outros campos relevantes, mapeando-os para o modelo da UDM, com uma lógica específica para lidar com falhas de login, user agents, níveis de gravidade, mecanismos de autenticação e vários tipos de eventos. Ele prioriza FailUserName em vez de NormalizedUser para endereços de e-mail de destino em eventos de falha.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps.
  • Acesso privilegiado ao locatário do SSO da Delinea (Centrify).
  • Acesso privilegiado à AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).

Coletar os pré-requisitos do SSO da Delinea (Centrify) (IDs, chaves de API, IDs da organização, tokens)

  1. Faça login no Portal do administrador da Delinea.
  2. Acesse Apps > Adicionar apps.
  3. Pesquise Cliente OAuth2 e clique em Adicionar.
  4. Clique em Sim na caixa de diálogo Adicionar app da Web.
  5. Clique em Fechar na caixa de diálogo Adicionar apps da Web.
  6. Na página Configuração do aplicativo, configure o seguinte:
    • Guia Geral:
      • ID do aplicativo: insira um identificador exclusivo (por exemplo, secops-oauth-client).
      • Nome do aplicativo: insira um nome descritivo, por exemplo, SecOps Data Export.
      • Descrição do aplicativo: insira uma descrição (por exemplo, OAuth client for exporting audit events to SecOps)
    • Guia Confiança:
      • A inscrição é confidencial: marque essa opção
      • Tipo de ID do cliente: selecione Confidencial
      • ID do cliente emitido: copie e salve esse valor.
      • Chave secreta do cliente emitida: copie e salve esse valor.
    • Guia Tokens:
      • Métodos de autenticação: selecione Credenciais do cliente.
      • Tipo de token: selecione Jwt RS256.
    • Guia Escopo:
      • Adicione o escopo siem com a descrição Acesso à integração do SIEM.
      • Adicione o escopo redrock/query com a descrição Acesso à API Query.
  7. Clique em Salvar para criar o cliente OAuth.
  8. Acesse Serviços principais > Usuários > Adicionar usuário.
  9. Configure o usuário de serviço:
    • Nome de login: insira o ID do cliente da etapa 6.
    • Endereço de e-mail: insira um e-mail válido (campo obrigatório).
    • Nome de exibição: insira um nome descritivo, por exemplo, SecOps Service User.
    • Senha e Confirmar senha: insira a Chave secreta do cliente da etapa 6.
    • Status: selecione É um cliente OAuth confidencial.
  10. Clique em Create User.
  11. Acesse Acesso > Papéis e atribua ao usuário de serviço um papel com as permissões adequadas para consultar eventos de auditoria.
  12. Copie e salve em um local seguro os seguintes detalhes:
    • URL do locatário: o URL do locatário do Centrify (por exemplo, https://yourtenant.my.centrify.com)
    • ID do cliente: da etapa 6
    • Chave secreta do cliente: da etapa 6
    • ID do aplicativo OAuth: na configuração do aplicativo

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket.
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, delinea-centrify-logs-bucket).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo .CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise a política AmazonS3FullAccess.
  18. Selecione a política.
  19. Clique em Próxima.
  20. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. No console da AWS, acesse IAM > Políticas.
  2. Clique em Criar política > guia JSON.
  3. Copie e cole a política a seguir.
  4. JSON da política (substitua delinea-centrify-logs-bucket se você tiver inserido um nome de bucket diferente):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json"
        }
      ]
    }
    
  5. Clique em Próxima > Criar política.

  6. Acesse IAM > Papéis.

  7. Clique em Criar função > Serviço da AWS > Lambda.

  8. Anexe a política recém-criada e a política gerenciada AWSLambdaBasicExecutionRole (para geração de registros do CloudWatch).

  9. Nomeie a função como CentrifySSOLogExportRole e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome CentrifySSOLogExport
    Ambiente de execução Python 3.13
    Arquitetura x86_64
    Função de execução CentrifySSOLogExportRole
  4. Depois que a função for criada, abra a guia Código, exclua o stub e cole o código a seguir (CentrifySSOLogExport.py).

    import json
    import boto3
    import requests
    import base64
    from datetime import datetime, timedelta
    import os
    from typing import Dict, List, Optional
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Delinea Centrify SSO audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # Centrify API credentials
        TENANT_URL = os.environ['TENANT_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_APP_ID = os.environ['OAUTH_APP_ID']
    
        # Optional parameters
        PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
        MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID)
    
            # Fetch audit events
            events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing Centrify SSO logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str:
        """
        Get OAuth access token using client credentials flow
        """
    
        # Create basic auth token
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode()).decode()
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
    
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'siem redrock/query'
        }
    
        response = requests.post(token_url, headers=headers, data=data)
        response.raise_for_status()
    
        token_data = response.json()
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]:
        """
        Fetch audit events from Centrify using the Redrock/query API
        """
    
        query_url = f"{tenant_url}/Redrock/query"
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        payload = {
            "Script": sql_query,
            "args": {
                "PageSize": page_size,
                "Limit": page_size * max_pages,
                "Caching": -1
            }
        }
    
        response = requests.post(query_url, headers=headers, json=payload)
        response.raise_for_status()
    
        response_data = response.json()
    
        if not response_data.get('success', False):
            raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
        # Parse the response
        result = response_data.get('Result', {})
        columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
        raw_results = result.get('Results', [])
    
        events = []
        for raw_event in raw_results:
            event = {}
            row_data = raw_event.get('Row', {})
    
            # Map column names to values
            for col_name, col_index in columns.items():
                if col_name in row_data and row_data[col_name] is not None:
                    event[col_name] = row_data[col_name]
    
            # Add metadata
            event['_source'] = 'centrify_sso'
            event['_collected_at'] = datetime.utcnow().isoformat() + 'Z'
    
            events.append(event)
    
        return events
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  5. Acesse Configuração > Variáveis de ambiente.

  6. Clique em Editar > Adicionar nova variável de ambiente.

  7. Insira as variáveis de ambiente fornecidas na tabela a seguir, substituindo os valores de exemplo pelos seus.

    Variáveis de ambiente

    Chave Valor de exemplo
    S3_BUCKET delinea-centrify-logs-bucket
    S3_PREFIX centrify-sso-logs/
    STATE_KEY centrify-sso-logs/state.json
    TENANT_URL https://yourtenant.my.centrify.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_APP_ID your-oauth-application-id
    OAUTH_SCOPE siem
    PAGE_SIZE 1000
    MAX_PAGES 10
  8. Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.

  9. Selecione a guia Configuração.

  10. No painel Configuração geral, clique em Editar.

  11. Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour).
    • Destino: sua função Lambda CentrifySSOLogExport.
    • Nome: CentrifySSOLogExport-1h.
  3. Clique em Criar programação.

(Opcional) Criar um usuário e chaves do IAM somente leitura para o Google SecOps

  1. No console da AWS, acesse IAM > Usuários.
  2. Clique em Add users.
  3. Informe os seguintes detalhes de configuração:
    • Usuário: insira secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar usuário.
  5. Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões.
  6. Clique em Adicionar permissões > Anexar políticas diretamente.
  7. Selecione Criar política.
  8. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket"
        }
      ]
    }
    
  9. Name = secops-reader-policy.

  10. Clique em Criar política > pesquise/selecione > Próxima.

  11. Clique em Adicionar permissões

  12. Crie uma chave de acesso para secops-reader: Credenciais de segurança > Chaves de acesso.

  13. Clique em Criar chave de acesso.

  14. Faça o download do .CSV. Cole esses valores no feed.

Configurar um feed no Google SecOps para ingerir registros de SSO da Delinea (Centrify)

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, insira um nome para o feed (por exemplo, Delinea Centrify SSO logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Centrify como o Tipo de registro.
  6. Clique em Próxima.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://delinea-centrify-logs-bucket/centrify-sso-logs/
    • Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  8. Clique em Próxima.
  9. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
AccountID security_result.detection_fields.value O valor de AccountID do registro bruto é atribuído a um objeto security_result.detection_fields com key:Account ID.
ApplicationName target.application O valor de ApplicationName do registro bruto é atribuído ao campo target.application.
AuthorityFQDN target.asset.network_domain O valor de AuthorityFQDN do registro bruto é atribuído ao campo target.asset.network_domain.
AuthorityID target.asset.asset_id O valor de AuthorityID do registro bruto é atribuído ao campo target.asset.asset_id, com o prefixo "AuthorityID:".
AzDeploymentId security_result.detection_fields.value O valor de AzDeploymentId do registro bruto é atribuído a um objeto security_result.detection_fields com key:AzDeploymentId.
AzRoleId additional.fields.value.string_value O valor de AzRoleId do registro bruto é atribuído a um objeto additional.fields com key:AzRole Id.
AzRoleName target.user.attribute.roles.name O valor de AzRoleName do registro bruto é atribuído ao campo target.user.attribute.roles.name.
ComputerFQDN principal.asset.network_domain O valor de ComputerFQDN do registro bruto é atribuído ao campo principal.asset.network_domain.
ComputerID principal.asset.asset_id O valor de ComputerID do registro bruto é atribuído ao campo principal.asset.asset_id, com o prefixo "ComputerId:".
ComputerName about.hostname O valor de ComputerName do registro bruto é atribuído ao campo about.hostname.
CredentialId security_result.detection_fields.value O valor de CredentialId do registro bruto é atribuído a um objeto security_result.detection_fields com key:Credential Id.
DirectoryServiceName security_result.detection_fields.value O valor de DirectoryServiceName do registro bruto é atribuído a um objeto security_result.detection_fields com key:Directory Service Name.
DirectoryServiceNameLocalized security_result.detection_fields.value O valor de DirectoryServiceNameLocalized do registro bruto é atribuído a um objeto security_result.detection_fields com key:Directory Service Name Localized.
DirectoryServiceUuid security_result.detection_fields.value O valor de DirectoryServiceUuid do registro bruto é atribuído a um objeto security_result.detection_fields com key:Directory Service Uuid.
EventMessage security_result.summary O valor de EventMessage do registro bruto é atribuído ao campo security_result.summary.
EventType metadata.product_event_type O valor de EventType do registro bruto é atribuído ao campo metadata.product_event_type. Ele também é usado para determinar o metadata.event_type.
FailReason security_result.summary O valor de FailReason do registro bruto é atribuído ao campo security_result.summary quando presente.
FailUserName target.user.email_addresses O valor de FailUserName do registro bruto é atribuído ao campo target.user.email_addresses quando presente.
FromIPAddress principal.ip O valor de FromIPAddress do registro bruto é atribuído ao campo principal.ip.
ID security_result.detection_fields.value O valor de ID do registro bruto é atribuído a um objeto security_result.detection_fields com key:ID.
InternalTrackingID metadata.product_log_id O valor de InternalTrackingID do registro bruto é atribuído ao campo metadata.product_log_id.
JumpType additional.fields.value.string_value O valor de JumpType do registro bruto é atribuído a um objeto additional.fields com key:Jump Type.
NormalizedUser target.user.email_addresses O valor de NormalizedUser do registro bruto é atribuído ao campo target.user.email_addresses.
OperationMode additional.fields.value.string_value O valor de OperationMode do registro bruto é atribuído a um objeto additional.fields com key:Operation Mode.
ProxyId security_result.detection_fields.value O valor de ProxyId do registro bruto é atribuído a um objeto security_result.detection_fields com key:Proxy Id.
RequestUserAgent network.http.user_agent O valor de RequestUserAgent do registro bruto é atribuído ao campo network.http.user_agent.
SessionGuid network.session_id O valor de SessionGuid do registro bruto é atribuído ao campo network.session_id.
Tenant additional.fields.value.string_value O valor de Tenant do registro bruto é atribuído a um objeto additional.fields com key:Tenant.
ThreadType additional.fields.value.string_value O valor de ThreadType do registro bruto é atribuído a um objeto additional.fields com key:Thread Type.
UserType principal.user.attribute.roles.name O valor de UserType do registro bruto é atribuído ao campo principal.user.attribute.roles.name.
WhenOccurred metadata.event_timestamp O valor de WhenOccurred do registro bruto é analisado e atribuído ao campo metadata.event_timestamp. Esse campo também preenche o campo timestamp de nível superior. Valor codificado "SSO". Determinado pelo campo EventType. O padrão será STATUS_UPDATE se EventType não estiver presente ou não corresponder a nenhum critério específico. Pode ser USER_LOGIN, USER_CREATION, USER_RESOURCE_ACCESS, USER_LOGOUT ou USER_CHANGE_PASSWORD. Valor codificado "CENTRIFY_SSO". Valor codificado "SSO". Valor codificado "Centrify". Se o campo message contiver um ID de sessão, ele será extraído e usado. Caso contrário, o padrão é "1". Extraído do campo host, se disponível, que vem do cabeçalho do syslog. Extraído do campo pid, se disponível, que vem do cabeçalho do syslog. Se UserGuid estiver presente, o valor dele será usado. Caso contrário, se o campo message contiver um ID de usuário, ele será extraído e usado. Definido como "ALLOW" se Level for "Info" e "BLOCK" se FailReason estiver presente. Definido como "AUTH_VIOLATION" se FailReason estiver presente. Determinado pelo campo Level. Definido como "INFORMATIONAL" se Level for "Info", "MEDIUM" se Level for "Warning" e "ERROR" se Level for "Error".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.