Recolha registos de SSO da Delinea
Este documento explica como carregar registos de início de sessão único (SSO) da Delinea (anteriormente Centrify) para o Google Security Operations através do Amazon S3. O analisador extrai os registos, processando os formatos JSON e syslog. Analisa pares de chave-valor, datas/horas e outros campos relevantes, mapeando-os para o modelo de UDM, com uma lógica específica para processar falhas de início de sessão, agentes do utilizador, níveis de gravidade, mecanismos de autenticação e vários tipos de eventos. Prioriza FailUserName
em vez de NormalizedUser
para endereços de email de destino em eventos de falha.
Antes de começar
Certifique-se de que cumpre os seguintes pré-requisitos:
- Uma instância do Google SecOps.
- Acesso privilegiado ao inquilino do SSO da Delinea (Centrify).
- Acesso privilegiado à AWS (S3, Identity and Access Management [IAM], Lambda e EventBridge).
Recolha os pré-requisitos do SSO da Delinea (Centrify) (IDs, chaves da API, IDs da organização e tokens)
- Inicie sessão no portal de administração da Delinea.
- Aceda a Apps > Adicionar apps.
- Pesquise Cliente OAuth2 e clique em Adicionar.
- Clique em Sim na caixa de diálogo Adicionar app Web.
- Clique em Fechar na caixa de diálogo Adicionar apps Web.
- Na página Configuração da aplicação, configure o seguinte:
- Separador Geral:
- ID da aplicação: introduza um identificador exclusivo (por exemplo,
secops-oauth-client
) - Nome da aplicação: introduza um nome descritivo (por exemplo,
SecOps Data Export
) - Descrição da aplicação: introduza a descrição (por exemplo,
OAuth client for exporting audit events to SecOps
)
- ID da aplicação: introduza um identificador exclusivo (por exemplo,
- Separador Confiança:
- A aplicação é confidencial: selecione esta opção
- Tipo de ID de cliente: selecione Confidencial
- ID de cliente emitido: copie e guarde este valor
- Segredo do cliente emitido: copie e guarde este valor
- Separador Tokens:
- Métodos de autorização: selecione Credenciais do cliente
- Tipo de token: selecione Jwt RS256
- Separador Âmbito:
- Adicione o âmbito siem com a descrição Acesso à integração do SIEM.
- Adicione o âmbito redrock/query com a descrição Acesso à API de consultas.
- Separador Geral:
- Clique em Guardar para criar o cliente OAuth.
- Aceda a Serviços principais > Utilizadores > Adicionar utilizador.
- Configure o utilizador do serviço:
- Nome de início de sessão: introduza o ID de cliente do passo 6.
- Endereço de email: introduza um email válido (campo obrigatório).
- Nome a apresentar: introduza um nome descritivo (por exemplo,
SecOps Service User
). - Palavra-passe e Confirmar palavra-passe: introduza o segredo do cliente do passo 6
- Estado: selecione É um cliente confidencial OAuth.
- Clique em Criar utilizador.
- Aceda a Acesso > Funções e atribua o utilizador do serviço a uma função com as autorizações adequadas para consultar eventos de auditoria.
- Copie e guarde numa localização segura os seguintes detalhes:
- URL do inquilino: o URL do seu inquilino do Centrify (por exemplo,
https://yourtenant.my.centrify.com
) - ID de cliente: do passo 6
- Segredo do cliente: do passo 6
- ID da aplicação OAuth: a partir da configuração da aplicação
- URL do inquilino: o URL do seu inquilino do Centrify (por exemplo,
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor.
- Guarde o nome e a região do contentor para referência futura (por exemplo,
delinea-centrify-logs-bucket
). - Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro .CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas.
- Clique em Criar política > separador JSON.
- Copie e cole a seguinte política.
JSON da política (substitua
delinea-centrify-logs-bucket
se tiver introduzido um nome de contentor diferente):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
Clique em Seguinte > Criar política.
Aceda a IAM > Funções.
Clique em Criar função > Serviço AWS > Lambda.
Anexe a política recém-criada e a política gerida AWSLambdaBasicExecutionRole (para registo do CloudWatch).
Dê o nome
CentrifySSOLogExportRole
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome CentrifySSOLogExport
Runtime Python 3.13 Arquitetura x86_64 Função de execução CentrifySSOLogExportRole
Depois de criar a função, abra o separador Código, elimine o stub e cole o seguinte código (
CentrifySSOLogExport.py
).import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
Aceda a Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Introduza as variáveis de ambiente fornecidas na tabela seguinte, substituindo os valores de exemplo pelos seus valores.
Variáveis de ambiente
Chave Valor de exemplo S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > a sua função).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda
CentrifySSOLogExport
. - Nome:
CentrifySSOLogExport-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
(Opcional) Crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Utilizadores.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza
secops-reader
. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Utilizador: introduza
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Utilizadores > secops-reader > Autorizações.
- Clique em Adicionar autorizações > Anexar políticas diretamente.
- Selecione Criar política.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
Nome =
secops-reader-policy
.Clique em Criar política > pesquise/selecione > Seguinte.
Clique em Adicionar autorizações.
Crie uma chave de acesso para
secops-reader
: Credenciais de segurança > Chaves de acesso.Clique em Criar chave de acesso.
Transfira o
.CSV
. (Vai colar estes valores no feed).
Configure um feed no Google SecOps para carregar registos de SSO da Delinea (Centrify)
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Delinea Centrify SSO logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Centrify como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento do UDM
Campo de registo | Mapeamento do UDM | Lógica |
---|---|---|
AccountID |
security_result.detection_fields.value |
O valor de AccountID do registo não processado é atribuído a um objeto security_result.detection_fields com key :Account ID . |
ApplicationName |
target.application |
O valor de ApplicationName do registo não processado é atribuído ao campo target.application . |
AuthorityFQDN |
target.asset.network_domain |
O valor de AuthorityFQDN do registo não processado é atribuído ao campo target.asset.network_domain . |
AuthorityID |
target.asset.asset_id |
O valor de AuthorityID do registo não processado é atribuído ao campo target.asset.asset_id , com o prefixo "AuthorityID:". |
AzDeploymentId |
security_result.detection_fields.value |
O valor de AzDeploymentId do registo não processado é atribuído a um objeto security_result.detection_fields com key :AzDeploymentId . |
AzRoleId |
additional.fields.value.string_value |
O valor de AzRoleId do registo não processado é atribuído a um objeto additional.fields com key :AzRole Id . |
AzRoleName |
target.user.attribute.roles.name |
O valor de AzRoleName do registo não processado é atribuído ao campo target.user.attribute.roles.name . |
ComputerFQDN |
principal.asset.network_domain |
O valor de ComputerFQDN do registo não processado é atribuído ao campo principal.asset.network_domain . |
ComputerID |
principal.asset.asset_id |
O valor de ComputerID do registo não processado é atribuído ao campo principal.asset.asset_id , com o prefixo "ComputerId:". |
ComputerName |
about.hostname |
O valor de ComputerName do registo não processado é atribuído ao campo about.hostname . |
CredentialId |
security_result.detection_fields.value |
O valor de CredentialId do registo não processado é atribuído a um objeto security_result.detection_fields com key :Credential Id . |
DirectoryServiceName |
security_result.detection_fields.value |
O valor de DirectoryServiceName do registo não processado é atribuído a um objeto security_result.detection_fields com key :Directory Service Name . |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
O valor de DirectoryServiceNameLocalized do registo não processado é atribuído a um objeto security_result.detection_fields com key :Directory Service Name Localized . |
DirectoryServiceUuid |
security_result.detection_fields.value |
O valor de DirectoryServiceUuid do registo não processado é atribuído a um objeto security_result.detection_fields com key :Directory Service Uuid . |
EventMessage |
security_result.summary |
O valor de EventMessage do registo não processado é atribuído ao campo security_result.summary . |
EventType |
metadata.product_event_type |
O valor de EventType do registo não processado é atribuído ao campo metadata.product_event_type . Também é usado para determinar o metadata.event_type . |
FailReason |
security_result.summary |
O valor de FailReason do registo não processado é atribuído ao campo security_result.summary quando presente. |
FailUserName |
target.user.email_addresses |
O valor de FailUserName do registo não processado é atribuído ao campo target.user.email_addresses quando presente. |
FromIPAddress |
principal.ip |
O valor de FromIPAddress do registo não processado é atribuído ao campo principal.ip . |
ID |
security_result.detection_fields.value |
O valor de ID do registo não processado é atribuído a um objeto security_result.detection_fields com key :ID . |
InternalTrackingID |
metadata.product_log_id |
O valor de InternalTrackingID do registo não processado é atribuído ao campo metadata.product_log_id . |
JumpType |
additional.fields.value.string_value |
O valor de JumpType do registo não processado é atribuído a um objeto additional.fields com key :Jump Type . |
NormalizedUser |
target.user.email_addresses |
O valor de NormalizedUser do registo não processado é atribuído ao campo target.user.email_addresses . |
OperationMode |
additional.fields.value.string_value |
O valor de OperationMode do registo não processado é atribuído a um objeto additional.fields com key :Operation Mode . |
ProxyId |
security_result.detection_fields.value |
O valor de ProxyId do registo não processado é atribuído a um objeto security_result.detection_fields com key :Proxy Id . |
RequestUserAgent |
network.http.user_agent |
O valor de RequestUserAgent do registo não processado é atribuído ao campo network.http.user_agent . |
SessionGuid |
network.session_id |
O valor de SessionGuid do registo não processado é atribuído ao campo network.session_id . |
Tenant |
additional.fields.value.string_value |
O valor de Tenant do registo não processado é atribuído a um objeto additional.fields com key :Tenant . |
ThreadType |
additional.fields.value.string_value |
O valor de ThreadType do registo não processado é atribuído a um objeto additional.fields com key :Thread Type . |
UserType |
principal.user.attribute.roles.name |
O valor de UserType do registo não processado é atribuído ao campo principal.user.attribute.roles.name . |
WhenOccurred |
metadata.event_timestamp |
O valor de WhenOccurred do registo não processado é analisado e atribuído ao campo metadata.event_timestamp . Este campo também preenche o campo timestamp de nível superior. Valor codificado "SSO". Determinado pelo campo EventType . A predefinição é STATUS_UPDATE se EventType não estiver presente ou não corresponder a nenhum critério específico. Pode ser USER_LOGIN , USER_CREATION , USER_RESOURCE_ACCESS , USER_LOGOUT ou USER_CHANGE_PASSWORD . Valor codificado "CENTRIFY_SSO". Valor codificado "SSO". Valor codificado "Centrify". Se o campo message contiver um ID da sessão, este é extraído e usado. Caso contrário, a predefinição é "1". Extraído do campo host , se disponível, que provém do cabeçalho syslog. Extraído do campo pid , se disponível, que provém do cabeçalho syslog. Se UserGuid estiver presente, é usado o respetivo valor. Caso contrário, se o campo message contiver um ID do utilizador, este é extraído e usado. Definido como "ALLOW" se Level for "Info" e "BLOCK" se FailReason estiver presente. Definido como "AUTH_VIOLATION" se FailReason estiver presente. Determinado pelo campo Level . Definido como "INFORMATIONAL" se Level for "Info", "MEDIUM" se Level for "Warning" e "ERROR" se Level for "Error". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.