Coletar registros do IAM do SailPoint

Compatível com:

Este documento explica como ingerir registros do Identity and Access Management (IAM) do SailPoint no Google Security Operations usando o Amazon S3. O analisador processa os registros nos formatos JSON e XML, transformando-os no modelo de dados unificado (UDM). Ele diferencia entre eventos únicos da UDM (ProvisioningPlan, AccountRequest, SOAP-ENV), eventos múltiplos da UDM (ProvisioningProject) e entidades da UDM (Identity), aplicando uma lógica de análise e mapeamentos de campo específicos para cada um, incluindo o processamento genérico de eventos para dados não XML.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps.
  • Acesso privilegiado ao SailPoint Identity Security Cloud.
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge).

Coletar pré-requisitos do IAM do SailPoint (IDs, chaves de API, IDs da organização, tokens)

  1. Faça login no Console de administração do SailPoint Identity Security Cloud como administrador.
  2. Acesse Global > Configurações de segurança > Gerenciamento de API.
  3. Clique em Criar cliente de API.
  4. Selecione Credenciais do cliente como o tipo de concessão.
  5. Informe os seguintes detalhes de configuração:
    • Nome: insira um nome descritivo, por exemplo, Google SecOps Export API.
    • Descrição: insira uma descrição para o cliente de API.
    • Escopos: selecione sp:scopes:all.
  6. Clique em Criar e salve as credenciais de API geradas em um local seguro.
  7. Registre o URL base do locatário do SailPoint (por exemplo, https://tenant.api.identitynow.com).
  8. Copie e salve em um local seguro os seguintes detalhes:
    • IDN_CLIENT_ID.
    • IDN_CLIENT_SECRET.
    • IDN_BASE.

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, sailpoint-iam-logs).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise a política AmazonS3FullAccess.
  18. Selecione a política.
  19. Clique em Próxima.
  20. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. No console da AWS, acesse IAM > Políticas.
  2. Clique em Criar política > guia JSON.
  3. Copie e cole a política a seguir.
  4. JSON da política (substitua sailpoint-iam-logs se você tiver inserido um nome de bucket diferente):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json"
        }
      ]
    }
    
  5. Clique em Próxima > Criar política.

  6. Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.

  7. Anexe a política recém-criada.

  8. Nomeie a função como SailPointIamToS3Role e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome sailpoint_iam_to_s3
    Ambiente de execução Python 3.13
    Arquitetura x86_64
    Função de execução SailPointIamToS3Role
  4. Depois que a função for criada, abra a guia Código, exclua o stub e cole o código a seguir (sailpoint_iam_to_s3.py).

    #!/usr/bin/env python3
    # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSON payloads to S3
    # - Uses /v3/search API with pagination for audit events.
    # - Preserves vendor-native JSON format for identity events.
    # - Retries with exponential backoff; unique S3 keys to avoid overwrites.
    
    import os, json, time, uuid, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import URLError, HTTPError
    
    import boto3
    
    S3_BUCKET   = os.environ["S3_BUCKET"]
    S3_PREFIX   = os.environ.get("S3_PREFIX", "sailpoint/iam/")
    STATE_KEY   = os.environ.get("STATE_KEY", "sailpoint/iam/state.json")
    WINDOW_SEC  = int(os.environ.get("WINDOW_SECONDS", "3600"))  # default 1h
    HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60"))
    IDN_BASE    = os.environ["IDN_BASE"]  # e.g. https://tenant.api.identitynow.com
    CLIENT_ID   = os.environ["IDN_CLIENT_ID"]
    CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"]
    SCOPE       = os.environ.get("IDN_SCOPE", "sp:scopes:all")
    PAGE_SIZE   = int(os.environ.get("PAGE_SIZE", "250"))
    MAX_PAGES   = int(os.environ.get("MAX_PAGES", "20"))
    MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
    USER_AGENT  = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0")
    
    s3 = boto3.client("s3")
    
    def _load_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def _save_state(st):
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _get_oauth_token() -> str:
        """Get OAuth2 access token using Client Credentials flow"""
        token_url = f"{IDN_BASE.rstrip('/')}/oauth/token"
    
        data = urllib.parse.urlencode({
            'grant_type': 'client_credentials',
            'client_id': CLIENT_ID,
            'client_secret': CLIENT_SECRET,
            'scope': SCOPE
        }).encode('utf-8')
    
        req = Request(token_url, data=data, method="POST")
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
        req.add_header("User-Agent", USER_AGENT)
    
        with urlopen(req, timeout=HTTP_TIMEOUT) as r:
            response = json.loads(r.read())
            return response["access_token"]
    
    def _search_events(access_token: str, created_from: str, search_after: list = None) -> list:
        """Search for audit events using SailPoint's /v3/search API"""
        search_url = f"{IDN_BASE.rstrip('/')}/v3/search"
    
        # Build search query for events created after specified time
        query_str = f'created:">={created_from}"'
    
        payload = {
            "indices": ["events"],
            "query": {"query": query_str},
            "sort": ["created", "+id"],
            "limit": PAGE_SIZE
        }
    
        if search_after:
            payload["searchAfter"] = search_after
    
        attempt = 0
        while True:
            req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST")
            req.add_header("Content-Type", "application/json")
            req.add_header("Accept", "application/json")
            req.add_header("Authorization", f"Bearer {access_token}")
            req.add_header("User-Agent", USER_AGENT)
    
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    response = json.loads(r.read())
                    # Handle different response formats
                    if isinstance(response, list):
                        return response
                    return response.get("results", response.get("data", []))
            except (HTTPError, URLError) as e:
                attempt += 1
                print(f"HTTP error on attempt {attempt}: {e}")
                if attempt > MAX_RETRIES:
                    raise
                # exponential backoff with jitter
                time.sleep(min(60, 2 ** attempt) + (time.time() % 1))
    
    def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str:
        # Create unique S3 key for events data
        ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts))
        uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}"
        key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.json"
    
        s3.put_object(
            Bucket=S3_BUCKET, 
            Key=key, 
            Body=json.dumps(events, separators=(",", ":")).encode("utf-8"), 
            ContentType="application/json",
            Metadata={
                'source': 'sailpoint-iam',
                'from_timestamp': str(int(from_ts)),
                'to_timestamp': str(int(to_ts)),
                'page_number': str(page_num),
                'events_count': str(len(events))
            }
        )
        return key
    
    def _get_item_id(item: dict) -> str:
        """Extract ID from event item, trying multiple possible fields"""
        for field in ("id", "uuid", "eventId", "_id"):
            if field in item and item[field]:
                return str(item[field])
        return ""
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        now = time.time()
        from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC))
        to_ts = now
    
        # Get OAuth token
        access_token = _get_oauth_token()
    
        created_from = _iso(from_ts)
        print(f"Fetching SailPoint IAM events from: {created_from}")
    
        # Handle pagination state
        last_created = st.get("last_created")
        last_id = st.get("last_id")
        search_after = [last_created, last_id] if (last_created and last_id) else None
    
        pages = 0
        total_events = 0
        written_keys = []
        newest_created = last_created or created_from
        newest_id = last_id or ""
    
        while pages < MAX_PAGES:
            events = _search_events(access_token, created_from, search_after)
    
            if not events:
                break
    
            # Write page to S3
            key = _put_events_data(events, from_ts, to_ts, pages + 1)
            written_keys.append(key)
            total_events += len(events)
    
            # Update pagination state from last item
            last_event = events[-1]
            last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created")
            last_event_id = _get_item_id(last_event)
    
            if last_event_created:
                newest_created = last_event_created
            if last_event_id:
                newest_id = last_event_id
    
            search_after = [newest_created, newest_id]
            pages += 1
    
            # If we got less than page size, we're done
            if len(events) < PAGE_SIZE:
                break
    
        print(f"Successfully retrieved {total_events} events across {pages} pages")
    
        # Save state for next run
        st["last_to_ts"] = to_ts
        st["last_created"] = newest_created
        st["last_id"] = newest_id
        st["last_successful_run"] = now
        _save_state(st)
    
        return {
            "statusCode": 200,
            "body": {
                "success": True,
                "pages": pages,
                "total_events": total_events,
                "s3_keys": written_keys,
                "from_timestamp": from_ts,
                "to_timestamp": to_ts,
                "last_created": newest_created,
                "last_id": newest_id
            }
        }
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. Acesse Configuração > Variáveis de ambiente.

  6. Clique em Editar > Adicionar nova variável de ambiente.

  7. Insira as variáveis de ambiente fornecidas na tabela a seguir, substituindo os valores de exemplo pelos seus.

    Variáveis de ambiente

    Chave Valor de exemplo
    S3_BUCKET sailpoint-iam-logs
    S3_PREFIX sailpoint/iam/
    STATE_KEY sailpoint/iam/state.json
    WINDOW_SECONDS 3600
    HTTP_TIMEOUT 60
    MAX_RETRIES 3
    USER_AGENT sailpoint-iam-to-s3/1.0
    IDN_BASE https://tenant.api.identitynow.com
    IDN_CLIENT_ID your-client-id (da etapa 2)
    IDN_CLIENT_SECRET your-client-secret (da etapa 2)
    IDN_SCOPE sp:scopes:all
    PAGE_SIZE 250
    MAX_PAGES 20
  8. Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.

  9. Selecione a guia Configuração.

  10. No painel Configuração geral, clique em Editar.

  11. Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour).
    • Destino: sua função Lambda sailpoint_iam_to_s3.
    • Nome: sailpoint-iam-1h.
  3. Clique em Criar programação.

(Opcional) Criar um usuário e chaves do IAM somente leitura para o Google SecOps

  1. Acesse Console da AWS > IAM > Usuários.
  2. Clique em Add users.
  3. Informe os seguintes detalhes de configuração:
    • Usuário: insira secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar usuário.
  5. Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs"
        }
      ]
    }
    
  7. Name = secops-reader-policy.

  8. Clique em Criar política > pesquisar/selecionar > Próxima > Adicionar permissões.

  9. Crie uma chave de acesso para secops-reader: Credenciais de segurança > Chaves de acesso.

  10. Clique em Criar chave de acesso.

  11. Faça o download do .CSV. Cole esses valores no feed.

Configurar um feed no Google SecOps para ingerir registros do IAM do SailPoint

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, insira um nome para o feed (por exemplo, SailPoint IAM logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione SailPoint IAM como o Tipo de registro.
  6. Clique em Próxima.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://sailpoint-iam-logs/sailpoint/iam/
    • Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  8. Clique em Próxima.
  9. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
action metadata.description O valor do campo action do registro bruto.
actor.name principal.user.user_display_name O valor do campo actor.name do registro bruto.
attributes.accountName principal.user.group_identifiers O valor do campo attributes.accountName do registro bruto.
attributes.appId target.asset_id "ID do app: " concatenado com o valor do campo attributes.appId do registro bruto.
attributes.attributeName additional.fields[0].value.string_value O valor do campo attributes.attributeName do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Nome do atributo".
attributes.attributeValue additional.fields[1].value.string_value O valor do campo attributes.attributeValue do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Valor do atributo".
attributes.cloudAppName target.application O valor do campo attributes.cloudAppName do registro bruto.
attributes.hostName target.hostname, target.asset.hostname O valor do campo attributes.hostName do registro bruto.
attributes.interface additional.fields[2].value.string_value O valor do campo attributes.interface do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Interface".
attributes.operation security_result.action_details O valor do campo attributes.operation do registro bruto.
attributes.previousValue additional.fields[3].value.string_value O valor do campo attributes.previousValue do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Valor anterior".
attributes.provisioningResult security_result.detection_fields.value O valor do campo attributes.provisioningResult do registro bruto, colocado em um objeto security_result.detection_fields. A chave é definida como "Provisioning Result".
attributes.sourceId principal.labels[0].value O valor do campo attributes.sourceId do registro bruto, colocado em um objeto principal.labels. A chave é definida como "ID da origem".
attributes.sourceName principal.labels[1].value O valor do campo attributes.sourceName do registro bruto, colocado em um objeto principal.labels. A chave é definida como "Nome da origem".
auditClassName metadata.product_event_type O valor do campo auditClassName do registro bruto.
created metadata.event_timestamp.seconds, metadata.event_timestamp.nanos O valor do campo created do registro bruto, convertido em carimbo de data/hora se instant.epochSecond não estiver presente.
id metadata.product_log_id O valor do campo id do registro bruto.
instant.epochSecond metadata.event_timestamp.seconds O valor do campo instant.epochSecond do registro bruto, usado para carimbo de data/hora.
ipAddress principal.asset.ip, principal.ip O valor do campo ipAddress do registro bruto.
interface additional.fields[0].value.string_value O valor do campo interface do registro bruto, colocado em um objeto additional.fields. A chave é definida como "interface".
loggerName intermediary.application O valor do campo loggerName do registro bruto.
message metadata.description, security_result.description Usado para várias finalidades, incluindo definir a descrição em metadados e security_result, além de extrair conteúdo XML.
name security_result.description O valor do campo name do registro bruto.
operation target.resource.attribute.labels[0].value, metadata.product_event_type O valor do campo operation do registro bruto, colocado em um objeto target.resource.attribute.labels. A chave é definida como "operation". Também usado para metadata.product_event_type.
org principal.administrative_domain O valor do campo org do registro bruto.
pod principal.location.name O valor do campo pod do registro bruto.
referenceClass additional.fields[1].value.string_value O valor do campo referenceClass do registro bruto, colocado em um objeto additional.fields. A chave é definida como "referenceClass".
referenceId additional.fields[2].value.string_value O valor do campo referenceId do registro bruto, colocado em um objeto additional.fields. A chave é definida como "referenceId".
sailPointObjectName additional.fields[3].value.string_value O valor do campo sailPointObjectName do registro bruto, colocado em um objeto additional.fields. A chave é definida como "sailPointObjectName".
serverHost principal.hostname, principal.asset.hostname O valor do campo serverHost do registro bruto.
stack additional.fields[4].value.string_value O valor do campo stack do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Stack".
status security_result.severity_details O valor do campo status do registro bruto.
target additional.fields[4].value.string_value O valor do campo target do registro bruto, colocado em um objeto additional.fields. A chave é definida como "target".
target.name principal.user.userid O valor do campo target.name do registro bruto.
technicalName security_result.summary O valor do campo technicalName do registro bruto.
thrown.cause.message xml_body, detailed_message O valor do campo thrown.cause.message do registro bruto, usado para extrair conteúdo XML.
thrown.message xml_body, detailed_message O valor do campo thrown.message do registro bruto, usado para extrair conteúdo XML.
trackingNumber additional.fields[5].value.string_value O valor do campo trackingNumber do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Número de rastreamento".
type metadata.product_event_type O valor do campo type do registro bruto.
_version metadata.product_version O valor do campo _version do registro bruto.
N/A metadata.event_timestamp Derivado dos campos instant.epochSecond ou created.
N/A metadata.event_type Determinado pela lógica do analisador com base em vários campos, incluindo has_principal_user, has_target_application, technicalName e action. O valor padrão é "GENERIC_EVENT".
N/A metadata.log_type Defina como "SAILPOINT_IAM".
N/A metadata.product_name Defina como IAM.
N/A metadata.vendor_name Defina como "SAILPOINT".
N/A extensions.auth.type Definido como "AUTHTYPE_UNSPECIFIED" em determinadas condições.
N/A target.resource.attribute.labels[0].key Defina como "operation".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.