Coletar registros de autenticação do Duo
Este documento explica como ingerir registros de autenticação do Duo no Google Security Operations usando o Amazon S3. O analisador extrai os registros de mensagens formatadas em JSON. Ele transforma os dados de registro brutos no Modelo Unificado de Dados (UDM, na sigla em inglês), mapeando campos como usuário, dispositivo, aplicativo, local e detalhes de autenticação, além de processar vários fatores e resultados de autenticação para categorizar eventos de segurança. O analisador também realiza limpeza de dados, conversão de tipos e tratamento de erros para garantir a qualidade e a consistência dos dados.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao locatário do Duo (aplicativo da API Admin)
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Configurar o aplicativo da API Admin do Duo
- Faça login no painel de administração do Duo.
- Acesse Aplicativos > Proteger um aplicativo.
- Adicione o aplicativo API Admin.
- Copie e salve os seguintes valores em um local seguro:
- Chave de integração (ikey)
- Chave secreta (skey)
- Nome do host da API (por exemplo,
api-XXXXXXXX.duosecurity.com
)
- Em Permissões, ative Conceder registro de leitura (para ler registros de autenticação).
- Salve o aplicativo.
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
duo-auth-logs
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- Acesse Console da AWS > IAM > Políticas > Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }
- Substitua
duo-auth-logs
se você tiver inserido um nome de bucket diferente.
- Substitua
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada.
Nomeie a função como
WriteDuoAuthToS3Role
e clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome duo_auth_to_s3
Ambiente de execução Python 3.13 Arquitetura x86_64 Função de execução WriteDuoAuthToS3Role
Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
duo_auth_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Insira as seguintes variáveis de ambiente fornecidas, substituindo pelos seus valores:
Chave Exemplo S3_BUCKET
duo-auth-logs
S3_PREFIX
duo/auth/
STATE_KEY
duo/auth/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
LIMIT
500
Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua‑função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour
). - Destino: sua função Lambda.
- Nome:
duo-auth-1h
.
- Programação recorrente: Taxa (
- Clique em Criar programação.
Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários e clique em Adicionar usuários.
- Informe os seguintes detalhes de configuração:
- Usuário: insira um nome exclusivo (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso - Acesso programático.
- Clique em Criar usuário.
- Usuário: insira um nome exclusivo (por exemplo,
- Anexe a política de leitura mínima (personalizada): Usuários > selecione
secops-reader
> Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Defina o nome como
secops-reader-policy
.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros de autenticação do Duo
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Duo Authentication Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Autenticação do Duo como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://duo-auth-logs/duo/auth/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: padrão de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento da UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
Se access_device.browser estiver presente, o valor dele será mapeado para a UDM. |
access_device.hostname |
principal.hostname |
Se access_device.hostname estiver presente e não estiver vazio, o valor será mapeado para a UDM. Se estiver vazio e o event_type for USER_CREATION, o event_type será mudado para USER_UNCATEGORIZED. Se access_device.hostname estiver vazio e o campo hostname existir, o valor de hostname será usado. |
access_device.ip |
principal.ip |
Se access_device.ip existir e for um endereço IPv4 válido, o valor dele será mapeado para a UDM. Se não for um endereço IPv4 válido, ele será adicionado como um valor de string a additional.fields com a chave access_device.ip . |
access_device.location.city |
principal.location.city |
Se presente, o valor será mapeado para a UDM. |
access_device.location.country |
principal.location.country_or_region |
Se presente, o valor será mapeado para a UDM. |
access_device.location.state |
principal.location.state |
Se presente, o valor será mapeado para a UDM. |
access_device.os |
principal.platform |
Se presente, o valor será traduzido para o valor correspondente do UDM (MAC, WINDOWS, LINUX). |
access_device.os_version |
principal.platform_version |
Se presente, o valor será mapeado para a UDM. |
application.key |
target.resource.id |
Se presente, o valor será mapeado para a UDM. |
application.name |
target.application |
Se presente, o valor será mapeado para a UDM. |
auth_device.ip |
target.ip |
Se estiver presente e não for "None", o valor será mapeado para a UDM. |
auth_device.location.city |
target.location.city |
Se presente, o valor será mapeado para a UDM. |
auth_device.location.country |
target.location.country_or_region |
Se presente, o valor será mapeado para a UDM. |
auth_device.location.state |
target.location.state |
Se presente, o valor será mapeado para a UDM. |
auth_device.name |
target.hostname OU target.user.phone_numbers |
Se auth_device.name estiver presente e for um número de telefone (após a normalização), ele será adicionado a target.user.phone_numbers . Caso contrário, ele será mapeado para target.hostname . |
client_ip |
target.ip |
Se estiver presente e não for "None", o valor será mapeado para a UDM. |
client_section |
target.resource.attribute.labels.value |
Se client_section estiver presente, o valor dele será mapeado para a UDM com a chave client_section . |
dn |
target.user.userid |
Se dn estiver presente e user.name e username não estiverem, o userid será extraído do campo dn usando grok e mapeado para a UDM. O event_type é definido como USER_LOGIN. |
event_type |
metadata.product_event_type E metadata.event_type |
O valor é mapeado para metadata.product_event_type . Ele também é usado para determinar o metadata.event_type : "authentication" se torna USER_LOGIN, "enrollment" se torna USER_CREATION e, se estiver vazio ou nenhum dos dois, se torna GENERIC_EVENT. |
factor |
extensions.auth.mechanism E extensions.auth.auth_details |
O valor é traduzido para o valor auth.mechanism correspondente da UDM (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). O valor original também é mapeado para extensions.auth.auth_details . |
hostname |
principal.hostname |
Se estiver presente e access_device.hostname estiver vazio, o valor será mapeado para a UDM. |
log_format |
target.resource.attribute.labels.value |
Se log_format estiver presente, o valor dele será mapeado para a UDM com a chave log_format . |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
Se log_level.__class_uuid__ estiver presente, o valor dele será mapeado para a UDM com a chave __class_uuid__ . |
log_level.name |
target.resource.attribute.labels.value E security_result.severity |
Se log_level.name estiver presente, o valor dele será mapeado para a UDM com a chave name . Se o valor for "info", security_result.severity será definido como INFORMATIONAL. |
log_logger.unpersistable |
target.resource.attribute.labels.value |
Se log_logger.unpersistable estiver presente, o valor dele será mapeado para a UDM com a chave unpersistable . |
log_namespace |
target.resource.attribute.labels.value |
Se log_namespace estiver presente, o valor dele será mapeado para a UDM com a chave log_namespace . |
log_source |
target.resource.attribute.labels.value |
Se log_source estiver presente, o valor dele será mapeado para a UDM com a chave log_source . |
msg |
security_result.summary |
Se estiver presente e reason estiver vazio, o valor será mapeado para a UDM. |
reason |
security_result.summary |
Se presente, o valor será mapeado para a UDM. |
result |
security_result.action_details E security_result.action |
Se presente, o valor será mapeado para security_result.action_details . "success" ou "SUCCESS" significa security_result.action ALLOW. Caso contrário, BLOCK. |
server_section |
target.resource.attribute.labels.value |
Se server_section estiver presente, o valor dele será mapeado para a UDM com a chave server_section . |
server_section_ikey |
target.resource.attribute.labels.value |
Se server_section_ikey estiver presente, o valor dele será mapeado para a UDM com a chave server_section_ikey . |
status |
security_result.action_details E security_result.action |
Se presente, o valor será mapeado para security_result.action_details . "Permitir" significa security_result.action ALLOW, e "Rejeitar" significa BLOCK. |
timestamp |
metadata.event_timestamp E event.timestamp |
O valor é convertido em um carimbo de data/hora e mapeado para metadata.event_timestamp e event.timestamp . |
txid |
metadata.product_log_id E network.session_id |
O valor é mapeado para metadata.product_log_id e network.session_id . |
user.groups |
target.user.group_identifiers |
Todos os valores na matriz são adicionados a target.user.group_identifiers . |
user.key |
target.user.product_object_id |
Se presente, o valor será mapeado para a UDM. |
user.name |
target.user.userid |
Se presente, o valor será mapeado para a UDM. |
username |
target.user.userid |
Se estiver presente e user.name não estiver, o valor será mapeado para a UDM. O event_type é definido como USER_LOGIN. |
(Lógica do analisador) | metadata.vendor_name |
Sempre definido como "DUO_SECURITY". |
(Lógica do analisador) | metadata.product_name |
Sempre definido como "MULTI-FACTOR_AUTHENTICATION". |
(Lógica do analisador) | metadata.log_type |
Extraído do campo log_type de nível superior do registro bruto. |
(Lógica do analisador) | extensions.auth.type |
Sempre definido como "SSO". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.