Recolha registos do PingOne Advanced Identity Cloud
Este documento explica como carregar registos do PingOne Advanced Identity Cloud para o Google Security Operations através do Amazon S3.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao inquilino do PingOne Advanced Identity Cloud
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Obtenha a chave da API do PingOne e o FQDN do inquilino
- Inicie sessão na consola do administrador do Advanced Identity Cloud.
- Clique no ícone do utilizador e, de seguida, em Definições do inquilino.
- No separador Definições globais, clique em Registar chaves da API.
- Clique em Nova chave da API de registo e indique um nome para a chave.
- Clique em Criar chave.
- Copie e guarde os valores api_key_id e api_key_secret num local seguro. O valor api_key_secret não é apresentado novamente.
- Clique em Concluído
- Aceda a Tenant Settings > Details e encontre o seu FQDN do inquilino (por exemplo,
example.tomcat.pingone.com
).
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
pingone-aic-logs
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
Introduza a seguinte política.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutPingOneAICObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::pingone-aic-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json" } ] }
- Substitua
pingone-aic-logs
se tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
WritePingOneAICToS3Role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome pingone_aic_to_s3
Runtime Python 3.13 Arquitetura x86_64 Função de execução WritePingOneAICToS3Role
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
pingone_aic_to_s3.py
):#!/usr/bin/env python3 import os, json, time, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 FQDN = os.environ["AIC_TENANT_FQDN"].strip("/") API_KEY_ID = os.environ["AIC_API_KEY_ID"] API_KEY_SECRET = os.environ["AIC_API_SECRET"] S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "pingone-aic/logs/").strip("/") SOURCES = [s.strip() for s in os.environ.get("SOURCES", "am-everything,idm-everything").split(",") if s.strip()] PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "500")), 1000) # hard cap per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "pingone-aic/logs/state.json") LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) s3 = boto3.client("s3") def _headers(): return {"x-api-key": API_KEY_ID, "x-api-secret": API_KEY_SECRET} def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str, timeout: int = 60, max_retries: int = 5) -> dict: attempt, backoff = 0, 1.0 while True: req = Request(url, method="GET", headers=_headers()) try: with urlopen(req, timeout=timeout) as r: data = r.read() return json.loads(data.decode("utf-8")) except HTTPError as e: # 429: respect X-RateLimit-Reset (epoch seconds) if present if e.code == 429 and attempt < max_retries: reset = e.headers.get("X-RateLimit-Reset") now = int(time.time()) delay = max(1, int(reset) - now) if (reset and reset.isdigit()) else int(backoff) time.sleep(delay); attempt += 1; backoff *= 2; continue if 500 <= e.code <= 599 and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _load_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {"sources": {}} def _save_state(state: dict): s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json") def _write_page(payload: dict, source: str) -> str: ts = time.gmtime() key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-pingone-aic-{source}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json") return key def _bounded_begin_time(last_ts: str | None, now: float) -> str: # beginTime must be <= 24h before endTime (now if endTime omitted) # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback twenty_four_h_ago = now - 24*3600 if last_ts: try: t_struct = time.strptime(last_ts[:19] + "Z", "%Y-%m-%dT%H:%M:%SZ") t_epoch = int(time.mktime(t_struct)) except Exception: t_epoch = int(now - LOOKBACK_SECONDS) begin_epoch = max(t_epoch, int(twenty_four_h_ago)) else: begin_epoch = max(int(now - LOOKBACK_SECONDS), int(twenty_four_h_ago)) return _iso(begin_epoch) def fetch_source(source: str, last_ts: str | None): base = f"https://{FQDN}/monitoring/logs" now = time.time() params = { "source": source, "_pageSize": str(PAGE_SIZE), "_sortKeys": "timestamp", "beginTime": _bounded_begin_time(last_ts, now) } pages = 0 written = 0 newest_ts = last_ts cookie = None while pages < MAX_PAGES: if cookie: params["_pagedResultsCookie"] = cookie qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) data = _http_get(f"{base}?{qs}") _write_page(data, source) results = data.get("result") or data.get("results") or [] for item in results: t = item.get("timestamp") or item.get("payload", {}).get("timestamp") if t and (newest_ts is None or t > newest_ts): newest_ts = t written += len(results) cookie = data.get("pagedResultsCookie") pages += 1 if not cookie: break return {"source": source, "pages": pages, "written": written, "newest_ts": newest_ts} def lambda_handler(event=None, context=None): state = _load_state() state.setdefault("sources", {}) summary = [] for source in SOURCES: last_ts = state["sources"].get(source, {}).get("last_ts") res = fetch_source(source, last_ts) if res.get("newest_ts"): state["sources"][source] = {"last_ts": res["newest_ts"]} summary.append(res) _save_state(state) return {"ok": True, "summary": summary} if __name__ == "__main__": print(lambda_handler())
Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:
Chave Exemplo S3_BUCKET
pingone-aic-logs
S3_PREFIX
pingone-aic/logs/
STATE_KEY
pingone-aic/logs/state.json
AIC_TENANT_FQDN
example.tomcat.pingone.com
AIC_API_KEY_ID
<api_key_id>
AIC_API_SECRET
<api_key_secret>
SOURCES
am-everything,idm-everything
PAGE_SIZE
500
MAX_PAGES
20
LOOKBACK_SECONDS
3600
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda.
- Nome:
pingone-aic-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Users e, de seguida, clique em Add users.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza um nome único (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso – Acesso programático
- Clique em Criar utilizador.
- Utilizador: introduza um nome único (por exemplo,
- Anexe a política de leitura mínima (personalizada): Utilizadores > selecione
secops-reader
> Autorizações > Adicionar autorizações > Anexar políticas diretamente > Criar política No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Defina o nome como
secops-reader-policy
.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos do PingOne Advanced Identity Cloud
- Aceda a Definições do SIEM > Feeds.
- Clique em Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
PingOne Advanced Identity Cloud
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione PingOne Advanced Identity Cloud como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://pingone-aic-logs/pingone-aic/logs/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: predefinição de 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta a aplicar aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.