Coletar registros de administrador do Duo

Compatível com:

Este documento explica como ingerir registros de administrador do Duo no Google Security Operations usando o Amazon S3. O analisador extrai campos dos registros (formato JSON) e os mapeia para o modelo de dados unificado (UDM). Ele processa vários tipos de action do Duo (login, gerenciamento de usuários, gerenciamento de grupos) de maneira diferente, preenchendo os campos relevantes do UDM com base na ação e nos dados disponíveis, incluindo detalhes do usuário, fatores de autenticação e resultados de segurança. Ele também realiza transformações de dados, como mesclar endereços IP, converter carimbos de data/hora e processar erros.

Antes de começar

  • Instância do Google SecOps
  • Acesso privilegiado ao locatário do Duo (aplicativo da API Admin)
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Configurar o aplicativo da API Admin do Duo

  1. Faça login no painel de administração do Duo.
  2. Acesse Aplicativos > Catálogo de aplicativos.
  3. Adicione o aplicativo API Admin.
  4. Anote os seguintes valores:
    • Chave de integração (ikey)
    • Chave secreta (skey)
    • Nome do host da API (por exemplo, api-XXXXXXXX.duosecurity.com)
  5. Em Permissões, ative Conceder registro de leitura (para ler registros de administrador).
  6. Salve o aplicativo.

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, duo-admin-logs).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clique em Próxima.
  19. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. Acesse Console da AWS > IAM > Políticas > Criar política > guia JSON.
  2. Insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAdminObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json"
        }
      ]
    }
    
    
    • Substitua duo-admin-logs se você inseriu um nome de bucket diferente:
  3. Clique em Próxima > Criar política.

  4. Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.

  5. Anexe a política recém-criada.

  6. Nomeie a função como WriteDuoAdminToS3Role e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome duo_admin_to_s3
    Ambiente de execução Python 3.13
    Arquitetura x86_64
    Função de execução WriteDuoAdminToS3Role
  4. Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (duo_admin_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages)
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    from datetime import datetime
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json")
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
    
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
        attempt, backoff = 0, 1.0
    
        while True:
            req = Request(url, method=method.upper())
            hdrs = _sign(method, host, path, params)
            req.add_header("Accept", "application/json")
            for k, v in hdrs.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # 429 or 5xx → exponential backoff
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _read_state() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return int(json.loads(obj["Body"].read()).get("mintime"))
        except Exception:
            return None
    
    def _write_state(mintime: int):
        body = json.dumps({"mintime": mintime}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _epoch_from_item(item: dict) -> int | None:
        # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts'
        ts_num = item.get("timestamp")
        if isinstance(ts_num, (int, float)):
            return int(ts_num)
        ts_iso = item.get("ts")
        if isinstance(ts_iso, str):
            try:
                # Accept "...Z" or with offset
                return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp())
            except Exception:
                return None
        return None
    
    def _write_page(payload: dict, when: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now = int(time.time())
        # Start from last checkpoint or now-3600 on first run
        mintime = _read_state() or (now - 3600)
    
        page = 0
        total = 0
        next_mintime = mintime
        max_seen_ts = mintime
    
        while True:
            data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime})
            _write_page(data, now, page)
            page += 1
    
            # Extract items
            resp = data.get("response")
            items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else [])
            items = items or []
    
            if not items:
                break
    
            total += len(items)
            # Track the newest timestamp in this batch
            for it in items:
                ts = _epoch_from_item(it)
                if ts and ts > max_seen_ts:
                    max_seen_ts = ts
    
            # Duo returns only the 1000 earliest events; page by advancing mintime
            if len(items) >= 1000 and max_seen_ts >= mintime:
                mintime = max_seen_ts
                next_mintime = max_seen_ts
                continue
            else:
                break
    
        # Save checkpoint: newest seen ts, or "now" if nothing new
        if max_seen_ts > next_mintime:
            _write_state(max_seen_ts)
            next_state = max_seen_ts
        else:
            _write_state(now)
            next_state = now
    
        return {"ok": True, "pages": page, "events": total, "next_mintime": next_state}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  6. Insira as seguintes variáveis de ambiente, substituindo pelos seus valores.

    Chave Exemplo
    S3_BUCKET duo-admin-logs
    S3_PREFIX duo/admin/
    STATE_KEY duo/admin/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
  7. Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.

  8. Selecione a guia Configuração.

  9. No painel Configuração geral, clique em Editar.

  10. Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour).
    • Destino: sua função Lambda.
    • Nome: duo-admin-1h.
  3. Clique em Criar programação.

Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps

  1. No console da AWS, acesse IAM > Usuários e clique em Adicionar usuários.
  2. Informe os seguintes detalhes de configuração:
    • Usuário: insira um nome exclusivo (por exemplo, secops-reader)
    • Tipo de acesso: selecione Chave de acesso - Acesso programático.
    • Clique em Criar usuário.
  3. Anexe a política de leitura mínima (personalizada): Usuários > selecione secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política
  4. No editor JSON, insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Defina o nome como secops-reader-policy.

  6. Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.

  7. Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  8. Faça o download do CSV (esses valores são inseridos no feed).

Configurar um feed no Google SecOps para ingerir registros de administrador do Duo

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, insira um nome para o feed (por exemplo, Duo Administrator Logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Registros do administrador do Duo como o Tipo de registro.
  6. Clique em Próxima.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://duo-admin-logs/duo/admin/
    • Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: padrão de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  8. Clique em Próxima.
  9. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento da UDM

Campo de registro Mapeamento do UDM Lógica
action metadata.product_event_type O valor do campo action do registro bruto.
desc metadata.description O valor do campo desc do objeto description do registro bruto.
description._status target.group.attribute.labels.value O valor do campo _status no objeto description do registro bruto, especificamente ao processar ações relacionadas a grupos. Esse valor é colocado em uma matriz "labels" com uma "key" correspondente de "status".
description.desc metadata.description O valor do campo desc do objeto description do registro bruto.
description.email target.user.email_addresses O valor do campo email do objeto description do registro bruto.
description.error security_result.summary O valor do campo error do objeto description do registro bruto.
description.factor extensions.auth.auth_details O valor do campo factor do objeto description do registro bruto.
description.groups.0._status target.group.attribute.labels.value O valor do campo _status do primeiro elemento na matriz groups dentro do objeto description do registro bruto. Esse valor é colocado em uma matriz "labels" com uma "key" correspondente de "status".
description.groups.0.name target.group.group_display_name O valor do campo name do primeiro elemento na matriz groups dentro do objeto description do registro bruto.
description.ip_address principal.ip O valor do campo ip_address do objeto description do registro bruto.
description.name target.group.group_display_name O valor do campo name do objeto description do registro bruto.
description.realname target.user.user_display_name O valor do campo realname do objeto description do registro bruto.
description.status target.user.attribute.labels.value O valor do campo status do objeto description do registro bruto. Esse valor é colocado em uma matriz "labels" com uma "key" correspondente de "status".
description.uname target.user.email_addresses ou target.user.userid O valor do campo uname do objeto description do registro bruto. Se ele corresponder a um formato de endereço de e-mail, será mapeado para email_addresses. Caso contrário, será mapeado para userid.
host principal.hostname O valor do campo host do registro bruto.
isotimestamp metadata.event_timestamp.seconds O valor do campo isotimestamp do registro bruto, convertido em segundos de época.
object target.group.group_display_name O valor do campo object do registro bruto.
timestamp metadata.event_timestamp.seconds O valor do campo timestamp do registro bruto.
username target.user.userid ou principal.user.userid Se o campo action contiver "login", o valor será mapeado para target.user.userid. Caso contrário, ele será mapeado para principal.user.userid. Defina como "USERNAME_PASSWORD" se o campo action contiver "login". Determinado pelo analisador com base no campo action. Valores possíveis: USER_LOGIN, GROUP_CREATION, USER_UNCATEGORIZED, GROUP_DELETION, USER_CREATION, GROUP_MODIFICATION, GENERIC_EVENT. Sempre definido como "DUO_ADMIN". Sempre definido como "MULTI-FACTOR_AUTHENTICATION". Sempre definido como "DUO_SECURITY". Definido como "ADMINISTRATOR" se o campo eventtype contiver "admin". Determinado pelo analisador com base no campo action. Defina como "BLOCK" se o campo action contiver "error". Caso contrário, defina como "ALLOW". Sempre defina como "status" ao preencher target.group.attribute.labels. Sempre defina como "status" ao preencher target.user.attribute.labels.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.