Recolha registos de autenticação do Duo
Este documento explica como carregar registos de autenticação do Duo para o Google Security Operations através do Amazon S3. O analisador extrai os registos de mensagens formatadas em JSON. Transforma os dados de registo não processados no modelo de dados unificado (UDM), mapeando campos como utilizador, dispositivo, aplicação, localização e detalhes de autenticação, ao mesmo tempo que processa vários fatores e resultados de autenticação para categorizar eventos de segurança. O analisador também realiza a limpeza de dados, a conversão de tipos e o processamento de erros para garantir a qualidade e a consistência dos dados.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao inquilino do Duo (aplicação da API Admin)
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Configure a aplicação da API Duo Admin
- Inicie sessão no painel de administração do Duo.
- Aceda a Aplicações > Proteger uma aplicação.
- Adicione a aplicação API Admin.
- Copie e guarde os seguintes valores numa localização segura:
- Chave de integração (ikey)
- Chave secreta (skey)
- Nome de anfitrião da API (por exemplo,
api-XXXXXXXX.duosecurity.com
)
- Em Autorizações, ative a opção Conceder registo de leitura (para ler registos de autenticação).
- Guarde a aplicação.
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
duo-auth-logs
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Aceda a AWS console > IAM > Policies > Create policy > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }
- Substitua
duo-auth-logs
se tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
WriteDuoAuthToS3Role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome duo_auth_to_s3
Runtime Python 3.13 Arquitetura x86_64 Função de execução WriteDuoAuthToS3Role
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
duo_auth_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente fornecidas, substituindo-as pelos seus valores:
Chave Exemplo S3_BUCKET
duo-auth-logs
S3_PREFIX
duo/auth/
STATE_KEY
duo/auth/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
LIMIT
500
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your‑function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda.
- Nome:
duo-auth-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Users e, de seguida, clique em Add users.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza um nome único (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso – Acesso programático
- Clique em Criar utilizador.
- Utilizador: introduza um nome único (por exemplo,
- Anexe a política de leitura mínima (personalizada): Utilizadores > selecione
secops-reader
> Autorizações > Adicionar autorizações > Anexar políticas diretamente > Criar política No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Defina o nome como
secops-reader-policy
.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos de autenticação do Duo
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Duo Authentication Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Autenticação Duo como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://duo-auth-logs/duo/auth/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: predefinição de 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
Campo de registo | Mapeamento de UDM | Lógica |
---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
Se access_device.browser estiver presente, o respetivo valor é mapeado para o UDM. |
access_device.hostname |
principal.hostname |
Se access_device.hostname estiver presente e não estiver vazio, o respetivo valor é mapeado para o UDM. Se estiver vazio e o event_type for USER_CREATION, o event_type é alterado para USER_UNCATEGORIZED. Se access_device.hostname estiver vazio e o campo hostname existir, é usado o valor de hostname . |
access_device.ip |
principal.ip |
Se access_device.ip existir e for um endereço IPv4 válido, o respetivo valor é mapeado para o UDM. Se não for um endereço IPv4 válido, é adicionado como um valor de string a additional.fields com a chave access_device.ip . |
access_device.location.city |
principal.location.city |
Se estiver presente, o valor é mapeado para a UDM. |
access_device.location.country |
principal.location.country_or_region |
Se estiver presente, o valor é mapeado para a UDM. |
access_device.location.state |
principal.location.state |
Se estiver presente, o valor é mapeado para a UDM. |
access_device.os |
principal.platform |
Se estiver presente, o valor é traduzido para o valor UDM correspondente (MAC, WINDOWS, LINUX). |
access_device.os_version |
principal.platform_version |
Se estiver presente, o valor é mapeado para a UDM. |
application.key |
target.resource.id |
Se estiver presente, o valor é mapeado para a UDM. |
application.name |
target.application |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.ip |
target.ip |
Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM. |
auth_device.location.city |
target.location.city |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.location.country |
target.location.country_or_region |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.location.state |
target.location.state |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.name |
target.hostname OU target.user.phone_numbers |
Se auth_device.name estiver presente e for um número de telefone (após a normalização), é adicionado a target.user.phone_numbers . Caso contrário, é mapeado para target.hostname . |
client_ip |
target.ip |
Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM. |
client_section |
target.resource.attribute.labels.value |
Se client_section estiver presente, o respetivo valor é mapeado para o UDM com a chave client_section . |
dn |
target.user.userid |
Se dn estiver presente e user.name e username não estiverem, o userid é extraído do campo dn através do grok e mapeado para o UDM. O event_type está definido como USER_LOGIN. |
event_type |
metadata.product_event_type E metadata.event_type |
O valor está mapeado para metadata.product_event_type . Também é usado para determinar o metadata.event_type : "authentication" torna-se USER_LOGIN, "enrollment" torna-se USER_CREATION e, se estiver vazio ou não for nenhum destes, torna-se GENERIC_EVENT. |
factor |
extensions.auth.mechanism E extensions.auth.auth_details |
O valor é traduzido para o valor UDM auth.mechanism correspondente (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). O valor original também está mapeado para extensions.auth.auth_details . |
hostname |
principal.hostname |
Se estiver presente e access_device.hostname estiver vazio, o valor é mapeado para os dados do utilizador. |
log_format |
target.resource.attribute.labels.value |
Se log_format estiver presente, o respetivo valor é mapeado para o UDM com a chave log_format . |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
Se log_level.__class_uuid__ estiver presente, o respetivo valor é mapeado para o UDM com a chave __class_uuid__ . |
log_level.name |
target.resource.attribute.labels.value E security_result.severity |
Se log_level.name estiver presente, o respetivo valor é mapeado para o UDM com a chave name . Se o valor for "info", security_result.severity é definido como INFORMATIONAL. |
log_logger.unpersistable |
target.resource.attribute.labels.value |
Se log_logger.unpersistable estiver presente, o respetivo valor é mapeado para o UDM com a chave unpersistable . |
log_namespace |
target.resource.attribute.labels.value |
Se log_namespace estiver presente, o respetivo valor é mapeado para o UDM com a chave log_namespace . |
log_source |
target.resource.attribute.labels.value |
Se log_source estiver presente, o respetivo valor é mapeado para o UDM com a chave log_source . |
msg |
security_result.summary |
Se estiver presente e reason estiver vazio, o valor é mapeado para os dados do utilizador. |
reason |
security_result.summary |
Se estiver presente, o valor é mapeado para a UDM. |
result |
security_result.action_details E security_result.action |
Se estiver presente, o valor é mapeado para security_result.action_details . "success" ou "SUCCESS" traduz-se em security_result.action ALLOW. Caso contrário, BLOCK. |
server_section |
target.resource.attribute.labels.value |
Se server_section estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section . |
server_section_ikey |
target.resource.attribute.labels.value |
Se server_section_ikey estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section_ikey . |
status |
security_result.action_details E security_result.action |
Se estiver presente, o valor é mapeado para security_result.action_details . "Permitir" é traduzido como security_result.action ALLOW e "Rejeitar" é traduzido como BLOCK. |
timestamp |
metadata.event_timestamp E event.timestamp |
O valor é convertido numa indicação de tempo e mapeado para metadata.event_timestamp e event.timestamp . |
txid |
metadata.product_log_id E network.session_id |
O valor é mapeado para metadata.product_log_id e network.session_id . |
user.groups |
target.user.group_identifiers |
Todos os valores na matriz são adicionados a target.user.group_identifiers . |
user.key |
target.user.product_object_id |
Se estiver presente, o valor é mapeado para a UDM. |
user.name |
target.user.userid |
Se estiver presente, o valor é mapeado para a UDM. |
username |
target.user.userid |
Se estiver presente e user.name não estiver, o valor é mapeado para o UDM. O event_type está definido como USER_LOGIN. |
(Lógica do analisador) | metadata.vendor_name |
Está sempre definido como "DUO_SECURITY". |
(Lógica do analisador) | metadata.product_name |
Está sempre definido como "MULTI-FACTOR_AUTHENTICATION". |
(Lógica do analisador) | metadata.log_type |
Extraído do campo log_type de nível superior do registo não processado. |
(Lógica do analisador) | extensions.auth.type |
Está sempre definido como "SSO". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.