Coletar registros de administrador do Duo
Este documento explica como ingerir registros de administrador do Duo no Google Security Operations usando o Amazon S3. O analisador extrai campos dos registros (formato JSON) e os mapeia para o modelo de dados unificado (UDM). Ele processa vários tipos de action
do Duo (login, gerenciamento de usuários, gerenciamento de grupos) de maneira diferente, preenchendo os campos relevantes do UDM com base na ação e nos dados disponíveis, incluindo detalhes do usuário, fatores de autenticação e resultados de segurança. Ele também realiza transformações de dados, como mesclar endereços IP, converter carimbos de data/hora e processar erros.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao locatário do Duo (aplicativo da API Admin)
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Configurar o aplicativo da API Admin do Duo
- Faça login no painel de administração do Duo.
- Acesse Aplicativos > Catálogo de aplicativos.
- Adicione o aplicativo API Admin.
- Anote os seguintes valores:
- Chave de integração (ikey)
- Chave secreta (skey)
- Nome do host da API (por exemplo,
api-XXXXXXXX.duosecurity.com
)
- Em Permissões, ative Conceder registro de leitura (para ler registros de administrador).
- Salve o aplicativo.
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
duo-admin-logs
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- Acesse Console da AWS > IAM > Políticas > Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }
- Substitua
duo-admin-logs
se você inseriu um nome de bucket diferente:
- Substitua
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada.
Nomeie a função como
WriteDuoAdminToS3Role
e clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome duo_admin_to_s3
Ambiente de execução Python 3.13 Arquitetura x86_64 Função de execução WriteDuoAdminToS3Role
Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
duo_admin_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Insira as seguintes variáveis de ambiente, substituindo pelos seus valores.
Chave Exemplo S3_BUCKET
duo-admin-logs
S3_PREFIX
duo/admin/
STATE_KEY
duo/admin/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour
). - Destino: sua função Lambda.
- Nome:
duo-admin-1h
.
- Programação recorrente: Taxa (
- Clique em Criar programação.
Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários e clique em Adicionar usuários.
- Informe os seguintes detalhes de configuração:
- Usuário: insira um nome exclusivo (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso - Acesso programático.
- Clique em Criar usuário.
- Usuário: insira um nome exclusivo (por exemplo,
- Anexe a política de leitura mínima (personalizada): Usuários > selecione
secops-reader
> Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Defina o nome como
secops-reader-policy
.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros de administrador do Duo
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Duo Administrator Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Registros do administrador do Duo como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://duo-admin-logs/duo/admin/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: padrão de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento da UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
action |
metadata.product_event_type |
O valor do campo action do registro bruto. |
desc |
metadata.description |
O valor do campo desc do objeto description do registro bruto. |
description._status |
target.group.attribute.labels.value |
O valor do campo _status no objeto description do registro bruto, especificamente ao processar ações relacionadas a grupos. Esse valor é colocado em uma matriz "labels" com uma "key" correspondente de "status". |
description.desc |
metadata.description |
O valor do campo desc do objeto description do registro bruto. |
description.email |
target.user.email_addresses |
O valor do campo email do objeto description do registro bruto. |
description.error |
security_result.summary |
O valor do campo error do objeto description do registro bruto. |
description.factor |
extensions.auth.auth_details |
O valor do campo factor do objeto description do registro bruto. |
description.groups.0._status |
target.group.attribute.labels.value |
O valor do campo _status do primeiro elemento na matriz groups dentro do objeto description do registro bruto. Esse valor é colocado em uma matriz "labels" com uma "key" correspondente de "status". |
description.groups.0.name |
target.group.group_display_name |
O valor do campo name do primeiro elemento na matriz groups dentro do objeto description do registro bruto. |
description.ip_address |
principal.ip |
O valor do campo ip_address do objeto description do registro bruto. |
description.name |
target.group.group_display_name |
O valor do campo name do objeto description do registro bruto. |
description.realname |
target.user.user_display_name |
O valor do campo realname do objeto description do registro bruto. |
description.status |
target.user.attribute.labels.value |
O valor do campo status do objeto description do registro bruto. Esse valor é colocado em uma matriz "labels" com uma "key" correspondente de "status". |
description.uname |
target.user.email_addresses ou target.user.userid |
O valor do campo uname do objeto description do registro bruto. Se ele corresponder a um formato de endereço de e-mail, será mapeado para email_addresses . Caso contrário, será mapeado para userid . |
host |
principal.hostname |
O valor do campo host do registro bruto. |
isotimestamp |
metadata.event_timestamp.seconds |
O valor do campo isotimestamp do registro bruto, convertido em segundos de época. |
object |
target.group.group_display_name |
O valor do campo object do registro bruto. |
timestamp |
metadata.event_timestamp.seconds |
O valor do campo timestamp do registro bruto. |
username |
target.user.userid ou principal.user.userid |
Se o campo action contiver "login", o valor será mapeado para target.user.userid . Caso contrário, ele será mapeado para principal.user.userid . Defina como "USERNAME_PASSWORD" se o campo action contiver "login". Determinado pelo analisador com base no campo action . Valores possíveis: USER_LOGIN , GROUP_CREATION , USER_UNCATEGORIZED , GROUP_DELETION , USER_CREATION , GROUP_MODIFICATION , GENERIC_EVENT . Sempre definido como "DUO_ADMIN". Sempre definido como "MULTI-FACTOR_AUTHENTICATION". Sempre definido como "DUO_SECURITY". Definido como "ADMINISTRATOR" se o campo eventtype contiver "admin". Determinado pelo analisador com base no campo action . Defina como "BLOCK" se o campo action contiver "error". Caso contrário, defina como "ALLOW". Sempre defina como "status" ao preencher target.group.attribute.labels . Sempre defina como "status" ao preencher target.user.attribute.labels . |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.