Recolha registos de autenticação do Duo

Compatível com:

Este documento explica como carregar registos de autenticação do Duo para o Google Security Operations através do Amazon S3. O analisador extrai os registos de mensagens formatadas em JSON. Transforma os dados de registo não processados no modelo de dados unificado (UDM), mapeando campos como utilizador, dispositivo, aplicação, localização e detalhes de autenticação, ao mesmo tempo que processa vários fatores e resultados de autenticação para categorizar eventos de segurança. O analisador também realiza a limpeza de dados, a conversão de tipos e o processamento de erros para garantir a qualidade e a consistência dos dados.

Antes de começar

  • Instância do Google SecOps
  • Acesso privilegiado ao inquilino do Duo (aplicação da API Admin)
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Configure a aplicação da API Duo Admin

  1. Inicie sessão no painel de administração do Duo.
  2. Aceda a Aplicações > Proteger uma aplicação.
  3. Adicione a aplicação API Admin.
  4. Copie e guarde os seguintes valores numa localização segura:
    • Chave de integração (ikey)
    • Chave secreta (skey)
    • Nome de anfitrião da API (por exemplo, api-XXXXXXXX.duosecurity.com)
  5. Em Autorizações, ative a opção Conceder registo de leitura (para ler registos de autenticação).
  6. Guarde a aplicação.

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, duo-auth-logs).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Aceda a AWS console > IAM > Policies > Create policy > separador JSON.
  2. Introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAuthObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json"
        }
      ]
    }
    
    
    • Substitua duo-auth-logs se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política criada recentemente.

  6. Dê o nome WriteDuoAuthToS3Role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:

    Definição Valor
    Nome duo_auth_to_s3
    Runtime Python 3.13
    Arquitetura x86_64
    Função de execução WriteDuoAuthToS3Role
  4. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (duo_auth_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages)
    # Notes:
    # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch).
    # - Pagination via metadata.next_offset ("<millis>,<txid>").
    # - We save state (mintime_ms) in ms to resume next run without gaps.
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json")
    LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000)  # default 100, max 1000
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method=method.upper())
            req.add_header("Accept", "application/json")
            for k, v in _sign(method, host, path, params).items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _read_state_ms() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            val = json.loads(obj["Body"].read()).get("mintime")
            if val is None:
                return None
            # Backward safety: if seconds were stored, convert to ms
            return int(val) * 1000 if len(str(int(val))) <= 10 else int(val)
        except Exception:
            return None
    
    def _write_state_ms(mintime_ms: int):
        body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _write_page(payload: dict, when_epoch_s: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = int(time.time())
        # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms)
        maxtime_ms = (now_s - 120) * 1000
        mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000)  # 1 hour on first run
    
        page = 0
        total = 0
        next_offset = None
    
        while True:
            params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT}
            if next_offset:
                params["next_offset"] = next_offset
    
            data = _http("GET", "/admin/v2/logs/authentication", params)
            _write_page(data, maxtime_ms // 1000, page)
            page += 1
    
            resp = data.get("response")
            items = resp if isinstance(resp, list) else []
            total += len(items)
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if not next_offset:
                break
    
        # Advance window to maxtime_ms for next run
        _write_state_ms(maxtime_ms)
        return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  6. Introduza as seguintes variáveis de ambiente fornecidas, substituindo-as pelos seus valores:

    Chave Exemplo
    S3_BUCKET duo-auth-logs
    S3_PREFIX duo/auth/
    STATE_KEY duo/auth/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 500
  7. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your‑function).

  8. Selecione o separador Configuração.

  9. No painel Configuração geral, clique em Editar.

  10. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Alvo: a sua função Lambda.
    • Nome: duo-auth-1h.
  3. Clique em Criar programação.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Users e, de seguida, clique em Add users.
  2. Indique os seguintes detalhes de configuração:
    • Utilizador: introduza um nome único (por exemplo, secops-reader)
    • Tipo de acesso: selecione Chave de acesso – Acesso programático
    • Clique em Criar utilizador.
  3. Anexe a política de leitura mínima (personalizada): Utilizadores > selecione secops-reader > Autorizações > Adicionar autorizações > Anexar políticas diretamente > Criar política
  4. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Defina o nome como secops-reader-policy.

  6. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  7. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  8. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar registos de autenticação do Duo

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Duo Authentication Logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Autenticação Duo como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://duo-auth-logs/duo/auth/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: predefinição de 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento de UDM Lógica
access_device.browser target.resource.attribute.labels.value Se access_device.browser estiver presente, o respetivo valor é mapeado para o UDM.
access_device.hostname principal.hostname Se access_device.hostname estiver presente e não estiver vazio, o respetivo valor é mapeado para o UDM. Se estiver vazio e o event_type for USER_CREATION, o event_type é alterado para USER_UNCATEGORIZED. Se access_device.hostname estiver vazio e o campo hostname existir, é usado o valor de hostname.
access_device.ip principal.ip Se access_device.ip existir e for um endereço IPv4 válido, o respetivo valor é mapeado para o UDM. Se não for um endereço IPv4 válido, é adicionado como um valor de string a additional.fields com a chave access_device.ip.
access_device.location.city principal.location.city Se estiver presente, o valor é mapeado para a UDM.
access_device.location.country principal.location.country_or_region Se estiver presente, o valor é mapeado para a UDM.
access_device.location.state principal.location.state Se estiver presente, o valor é mapeado para a UDM.
access_device.os principal.platform Se estiver presente, o valor é traduzido para o valor UDM correspondente (MAC, WINDOWS, LINUX).
access_device.os_version principal.platform_version Se estiver presente, o valor é mapeado para a UDM.
application.key target.resource.id Se estiver presente, o valor é mapeado para a UDM.
application.name target.application Se estiver presente, o valor é mapeado para a UDM.
auth_device.ip target.ip Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM.
auth_device.location.city target.location.city Se estiver presente, o valor é mapeado para a UDM.
auth_device.location.country target.location.country_or_region Se estiver presente, o valor é mapeado para a UDM.
auth_device.location.state target.location.state Se estiver presente, o valor é mapeado para a UDM.
auth_device.name target.hostname OU target.user.phone_numbers Se auth_device.name estiver presente e for um número de telefone (após a normalização), é adicionado a target.user.phone_numbers. Caso contrário, é mapeado para target.hostname.
client_ip target.ip Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM.
client_section target.resource.attribute.labels.value Se client_section estiver presente, o respetivo valor é mapeado para o UDM com a chave client_section.
dn target.user.userid Se dn estiver presente e user.name e username não estiverem, o userid é extraído do campo dn através do grok e mapeado para o UDM. O event_type está definido como USER_LOGIN.
event_type metadata.product_event_type E metadata.event_type O valor está mapeado para metadata.product_event_type. Também é usado para determinar o metadata.event_type: "authentication" torna-se USER_LOGIN, "enrollment" torna-se USER_CREATION e, se estiver vazio ou não for nenhum destes, torna-se GENERIC_EVENT.
factor extensions.auth.mechanism E extensions.auth.auth_details O valor é traduzido para o valor UDM auth.mechanism correspondente (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). O valor original também está mapeado para extensions.auth.auth_details.
hostname principal.hostname Se estiver presente e access_device.hostname estiver vazio, o valor é mapeado para os dados do utilizador.
log_format target.resource.attribute.labels.value Se log_format estiver presente, o respetivo valor é mapeado para o UDM com a chave log_format.
log_level.__class_uuid__ target.resource.attribute.labels.value Se log_level.__class_uuid__ estiver presente, o respetivo valor é mapeado para o UDM com a chave __class_uuid__.
log_level.name target.resource.attribute.labels.value E security_result.severity Se log_level.name estiver presente, o respetivo valor é mapeado para o UDM com a chave name. Se o valor for "info", security_result.severity é definido como INFORMATIONAL.
log_logger.unpersistable target.resource.attribute.labels.value Se log_logger.unpersistable estiver presente, o respetivo valor é mapeado para o UDM com a chave unpersistable.
log_namespace target.resource.attribute.labels.value Se log_namespace estiver presente, o respetivo valor é mapeado para o UDM com a chave log_namespace.
log_source target.resource.attribute.labels.value Se log_source estiver presente, o respetivo valor é mapeado para o UDM com a chave log_source.
msg security_result.summary Se estiver presente e reason estiver vazio, o valor é mapeado para os dados do utilizador.
reason security_result.summary Se estiver presente, o valor é mapeado para a UDM.
result security_result.action_details E security_result.action Se estiver presente, o valor é mapeado para security_result.action_details. "success" ou "SUCCESS" traduz-se em security_result.action ALLOW. Caso contrário, BLOCK.
server_section target.resource.attribute.labels.value Se server_section estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section.
server_section_ikey target.resource.attribute.labels.value Se server_section_ikey estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section_ikey.
status security_result.action_details E security_result.action Se estiver presente, o valor é mapeado para security_result.action_details. "Permitir" é traduzido como security_result.action ALLOW e "Rejeitar" é traduzido como BLOCK.
timestamp metadata.event_timestamp E event.timestamp O valor é convertido numa indicação de tempo e mapeado para metadata.event_timestamp e event.timestamp.
txid metadata.product_log_id E network.session_id O valor é mapeado para metadata.product_log_id e network.session_id.
user.groups target.user.group_identifiers Todos os valores na matriz são adicionados a target.user.group_identifiers.
user.key target.user.product_object_id Se estiver presente, o valor é mapeado para a UDM.
user.name target.user.userid Se estiver presente, o valor é mapeado para a UDM.
username target.user.userid Se estiver presente e user.name não estiver, o valor é mapeado para o UDM. O event_type está definido como USER_LOGIN.
(Lógica do analisador) metadata.vendor_name Está sempre definido como "DUO_SECURITY".
(Lógica do analisador) metadata.product_name Está sempre definido como "MULTI-FACTOR_AUTHENTICATION".
(Lógica do analisador) metadata.log_type Extraído do campo log_type de nível superior do registo não processado.
(Lógica do analisador) extensions.auth.type Está sempre definido como "SSO".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.