Recolha registos de administrador do Duo
Este documento explica como carregar registos de administrador do Duo para o Google Security Operations através do Amazon S3. O analisador extrai campos dos registos (formato JSON) e mapeia-os para o modelo de dados unificado (UDM). Processa vários tipos de action
do Duo (início de sessão, gestão de utilizadores, gestão de grupos) de forma diferente, preenchendo os campos UDM relevantes com base na ação e nos dados disponíveis, incluindo detalhes do utilizador, fatores de autenticação e resultados de segurança. Também realiza transformações de dados, como a união de endereços IP, a conversão de datas/horas e o processamento de erros.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao inquilino do Duo (aplicação da API Admin)
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Configure a aplicação da API Duo Admin
- Inicie sessão no painel de administração do Duo.
- Aceda a Aplicações > Catálogo de aplicações.
- Adicione a aplicação API Admin.
- Registe os seguintes valores:
- Chave de integração (ikey)
- Chave secreta (skey)
- Nome de anfitrião da API (por exemplo,
api-XXXXXXXX.duosecurity.com
)
- Em Autorizações, ative a opção Conceder registo de leitura (para ler os registos de administrador).
- Guarde a aplicação.
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
duo-admin-logs
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Aceda a AWS console > IAM > Policies > Create policy > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }
- Substitua
duo-admin-logs
se tiver introduzido um nome de contentor diferente:
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
WriteDuoAdminToS3Role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome duo_admin_to_s3
Runtime Python 3.13 Arquitetura x86_64 Função de execução WriteDuoAdminToS3Role
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
duo_admin_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores.
Chave Exemplo S3_BUCKET
duo-admin-logs
S3_PREFIX
duo/admin/
STATE_KEY
duo/admin/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda.
- Nome:
duo-admin-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Users e, de seguida, clique em Add users.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza um nome único (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso – Acesso programático
- Clique em Criar utilizador.
- Utilizador: introduza um nome único (por exemplo,
- Anexe a política de leitura mínima (personalizada): Utilizadores > selecione
secops-reader
> Autorizações > Adicionar autorizações > Anexar políticas diretamente > Criar política No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Defina o nome como
secops-reader-policy
.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos de administrador do Duo
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Duo Administrator Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Registos de administrador do Duo como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://duo-admin-logs/duo/admin/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: predefinição de 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
Campo de registo | Mapeamento de UDM | Lógica |
---|---|---|
action |
metadata.product_event_type |
O valor do campo action do registo não processado. |
desc |
metadata.description |
O valor do campo desc do objeto description do registo não processado. |
description._status |
target.group.attribute.labels.value |
O valor do campo _status no objeto description do registo não processado, especificamente quando processa ações relacionadas com grupos. Este valor é colocado numa matriz "labels" com uma "chave" correspondente de "status". |
description.desc |
metadata.description |
O valor do campo desc do objeto description do registo não processado. |
description.email |
target.user.email_addresses |
O valor do campo email do objeto description do registo não processado. |
description.error |
security_result.summary |
O valor do campo error do objeto description do registo não processado. |
description.factor |
extensions.auth.auth_details |
O valor do campo factor do objeto description do registo não processado. |
description.groups.0._status |
target.group.attribute.labels.value |
O valor do campo _status do primeiro elemento na matriz groups no objeto description do registo não processado. Este valor é colocado numa matriz "labels" com uma "chave" correspondente de "status". |
description.groups.0.name |
target.group.group_display_name |
O valor do campo name do primeiro elemento na matriz groups no objeto description do registo não processado. |
description.ip_address |
principal.ip |
O valor do campo ip_address do objeto description do registo não processado. |
description.name |
target.group.group_display_name |
O valor do campo name do objeto description do registo não processado. |
description.realname |
target.user.user_display_name |
O valor do campo realname do objeto description do registo não processado. |
description.status |
target.user.attribute.labels.value |
O valor do campo status do objeto description do registo não processado. Este valor é colocado numa matriz "labels" com uma "chave" correspondente de "status". |
description.uname |
target.user.email_addresses ou target.user.userid |
O valor do campo uname do objeto description do registo não processado. Se corresponder a um formato de endereço de email, é mapeado para email_addresses ; caso contrário, é mapeado para userid . |
host |
principal.hostname |
O valor do campo host do registo não processado. |
isotimestamp |
metadata.event_timestamp.seconds |
O valor do campo isotimestamp do registo não processado, convertido em segundos desde epoch. |
object |
target.group.group_display_name |
O valor do campo object do registo não processado. |
timestamp |
metadata.event_timestamp.seconds |
O valor do campo timestamp do registo não processado. |
username |
target.user.userid ou principal.user.userid |
Se o campo action contiver "login", o valor é mapeado para target.user.userid . Caso contrário, é mapeado para principal.user.userid . Definido como "USERNAME_PASSWORD" se o campo action contiver "login". Determinado pelo analisador com base no campo action . Valores possíveis: USER_LOGIN , GROUP_CREATION , USER_UNCATEGORIZED , GROUP_DELETION , USER_CREATION , GROUP_MODIFICATION e GENERIC_EVENT . Está sempre definido como "DUO_ADMIN". Está sempre definido como "MULTI-FACTOR_AUTHENTICATION". Está sempre definido como "DUO_SECURITY". Definido como "ADMINISTRATOR" se o campo eventtype contiver "admin". Determinado pelo analisador com base no campo action . Definido como "BLOCK" se o campo action contiver "error"; caso contrário, definido como "ALLOW". Definido sempre como "status" quando preenche target.group.attribute.labels . Definido sempre como "status" quando preenche target.user.attribute.labels . |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.