Mengumpulkan log autentikasi Duo

Didukung di:

Dokumen ini menjelaskan cara menyerap log autentikasi Duo ke Google Security Operations menggunakan Amazon S3. Parser mengekstrak log dari pesan berformat JSON. Proses ini mengubah data log mentah menjadi Model Data Terpadu (UDM), memetakan kolom seperti pengguna, perangkat, aplikasi, lokasi, dan detail autentikasi, sekaligus menangani berbagai faktor dan hasil autentikasi untuk mengategorikan peristiwa keamanan. Parser juga melakukan pembersihan data, konversi jenis, dan penanganan error untuk memastikan kualitas dan konsistensi data.

Sebelum memulai

  • Instance Google SecOps
  • Akses istimewa ke tenant Duo (aplikasi Admin API)
  • Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)

Mengonfigurasi aplikasi Duo Admin API

  1. Login ke Panel Admin Duo.
  2. Buka Aplikasi > Lindungi Aplikasi.
  3. Tambahkan aplikasi Admin API.
  4. Salin dan simpan nilai berikut ke lokasi yang aman:
    • Kunci integrasi (ikey)
    • Kunci rahasia (skey)
    • Nama host API (misalnya, api-XXXXXXXX.duosecurity.com)
  5. Di Izin, aktifkan Berikan log baca (untuk membaca log autentikasi).
  6. Simpan aplikasi.

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, duo-auth-logs).
  3. Buat pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk digunakan nanti.
  12. Klik Selesai.
  13. Pilih tab Izin.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung
  17. Telusuri dan pilih kebijakan AmazonS3FullAccess.
  18. Klik Berikutnya.
  19. Klik Add permissions.

Mengonfigurasi kebijakan dan peran IAM untuk upload S3

  1. Buka konsol AWS > IAM > Policies > Create policy > tab JSON.
  2. Masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAuthObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json"
        }
      ]
    }
    
    
    • Ganti duo-auth-logs jika Anda memasukkan nama bucket yang berbeda.
  3. Klik Berikutnya > Buat kebijakan.

  4. Buka IAM > Roles > Create role > AWS service > Lambda.

  5. Lampirkan kebijakan yang baru dibuat.

  6. Beri nama peran WriteDuoAuthToS3Role, lalu klik Buat peran.

Buat fungsi Lambda

  1. Di Konsol AWS, buka Lambda > Functions > Create function.
  2. Klik Buat dari awal.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama duo_auth_to_s3
    Runtime Python 3.13
    Arsitektur x86_64
    Peran eksekusi WriteDuoAuthToS3Role
  4. Setelah fungsi dibuat, buka tab Code, hapus stub, lalu masukkan kode berikut (duo_auth_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages)
    # Notes:
    # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch).
    # - Pagination via metadata.next_offset ("<millis>,<txid>").
    # - We save state (mintime_ms) in ms to resume next run without gaps.
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json")
    LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000)  # default 100, max 1000
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method=method.upper())
            req.add_header("Accept", "application/json")
            for k, v in _sign(method, host, path, params).items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _read_state_ms() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            val = json.loads(obj["Body"].read()).get("mintime")
            if val is None:
                return None
            # Backward safety: if seconds were stored, convert to ms
            return int(val) * 1000 if len(str(int(val))) <= 10 else int(val)
        except Exception:
            return None
    
    def _write_state_ms(mintime_ms: int):
        body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _write_page(payload: dict, when_epoch_s: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = int(time.time())
        # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms)
        maxtime_ms = (now_s - 120) * 1000
        mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000)  # 1 hour on first run
    
        page = 0
        total = 0
        next_offset = None
    
        while True:
            params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT}
            if next_offset:
                params["next_offset"] = next_offset
    
            data = _http("GET", "/admin/v2/logs/authentication", params)
            _write_page(data, maxtime_ms // 1000, page)
            page += 1
    
            resp = data.get("response")
            items = resp if isinstance(resp, list) else []
            total += len(items)
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if not next_offset:
                break
    
        # Advance window to maxtime_ms for next run
        _write_state_ms(maxtime_ms)
        return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Buka Configuration > Environment variables > Edit > Add new environment variable.

  6. Masukkan variabel lingkungan berikut yang disediakan, lalu ganti dengan nilai Anda:

    Kunci Contoh
    S3_BUCKET duo-auth-logs
    S3_PREFIX duo/auth/
    STATE_KEY duo/auth/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 500
  7. Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your‑function).

  8. Pilih tab Configuration

  9. Di panel General configuration, klik Edit.

  10. Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.

Membuat jadwal EventBridge

  1. Buka Amazon EventBridge > Scheduler > Create schedule.
  2. Berikan detail konfigurasi berikut:
    • Jadwal berulang: Tarif (1 hour).
    • Target: fungsi Lambda Anda.
    • Name: duo-auth-1h.
  3. Klik Buat jadwal.

Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps

  1. Di Konsol AWS, buka IAM > Users, lalu klik Add users.
  2. Berikan detail konfigurasi berikut:
    • Pengguna: Masukkan nama unik (misalnya, secops-reader)
    • Jenis akses: Pilih Kunci akses - Akses terprogram
    • Klik Buat pengguna.
  3. Lampirkan kebijakan baca minimal (kustom): Pengguna > pilih secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan
  4. Di editor JSON, masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Tetapkan nama ke secops-reader-policy.

  6. Buka Buat kebijakan > telusuri/pilih > Berikutnya > Tambahkan izin.

  7. Buka Kredensial keamanan > Kunci akses > Buat kunci akses.

  8. Download CSV (nilai ini dimasukkan ke dalam feed).

Mengonfigurasi feed di Google SecOps untuk menyerap Log Autentikasi Duo

  1. Buka Setelan SIEM > Feed.
  2. Klik + Tambahkan Feed Baru.
  3. Di kolom Nama feed, masukkan nama untuk feed (misalnya, Duo Authentication Logs).
  4. Pilih Amazon S3 V2 sebagai Jenis sumber.
  5. Pilih Duo Auth sebagai Jenis log.
  6. Klik Berikutnya.
  7. Tentukan nilai untuk parameter input berikut:
    • URI S3: s3://duo-auth-logs/duo/auth/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
    • Usia File Maksimum: Default 180 Hari.
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
    • Namespace aset: namespace aset.
    • Label penyerapan: label yang diterapkan ke peristiwa dari feed ini.
  8. Klik Berikutnya.
  9. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Tabel Pemetaan UDM

Kolom Log Pemetaan UDM Logika
access_device.browser target.resource.attribute.labels.value Jika access_device.browser ada, nilainya dipetakan ke UDM.
access_device.hostname principal.hostname Jika access_device.hostname ada dan tidak kosong, nilainya dipetakan ke UDM. Jika kosong dan event_type adalah USER_CREATION, event_type akan diubah menjadi USER_UNCATEGORIZED. Jika access_device.hostname kosong dan kolom hostname ada, nilai hostname akan digunakan.
access_device.ip principal.ip Jika access_device.ip ada dan merupakan alamat IPv4 yang valid, nilainya akan dipetakan ke UDM. Jika bukan alamat IPv4 yang valid, alamat tersebut akan ditambahkan sebagai nilai string ke additional.fields dengan kunci access_device.ip.
access_device.location.city principal.location.city Jika ada, nilai dipetakan ke UDM.
access_device.location.country principal.location.country_or_region Jika ada, nilai dipetakan ke UDM.
access_device.location.state principal.location.state Jika ada, nilai dipetakan ke UDM.
access_device.os principal.platform Jika ada, nilai akan diterjemahkan ke nilai UDM yang sesuai (MAC, WINDOWS, LINUX).
access_device.os_version principal.platform_version Jika ada, nilai dipetakan ke UDM.
application.key target.resource.id Jika ada, nilai dipetakan ke UDM.
application.name target.application Jika ada, nilai dipetakan ke UDM.
auth_device.ip target.ip Jika ada dan bukan "None", nilai akan dipetakan ke UDM.
auth_device.location.city target.location.city Jika ada, nilai dipetakan ke UDM.
auth_device.location.country target.location.country_or_region Jika ada, nilai dipetakan ke UDM.
auth_device.location.state target.location.state Jika ada, nilai dipetakan ke UDM.
auth_device.name target.hostname ATAU target.user.phone_numbers Jika auth_device.name ada dan merupakan nomor telepon (setelah normalisasi), nomor tersebut akan ditambahkan ke target.user.phone_numbers. Jika tidak, nilai ini dipetakan ke target.hostname.
client_ip target.ip Jika ada dan bukan "None", nilai akan dipetakan ke UDM.
client_section target.resource.attribute.labels.value Jika client_section ada, nilainya akan dipetakan ke UDM dengan kunci client_section.
dn target.user.userid Jika dn ada dan user.name serta username tidak ada, userid akan diekstrak dari kolom dn menggunakan grok dan dipetakan ke UDM. event_type disetel ke USER_LOGIN.
event_type metadata.product_event_type DAN metadata.event_type Nilai dipetakan ke metadata.product_event_type. Tindakan ini juga digunakan untuk menentukan metadata.event_type: "authentication" menjadi USER_LOGIN, "enrollment" menjadi USER_CREATION, dan jika kosong atau bukan salah satunya, maka akan menjadi GENERIC_EVENT.
factor extensions.auth.mechanism DAN extensions.auth.auth_details Nilai diterjemahkan ke nilai UDM auth.mechanism yang sesuai (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). Nilai asli juga dipetakan ke extensions.auth.auth_details.
hostname principal.hostname Jika ada dan access_device.hostname kosong, nilai dipetakan ke UDM.
log_format target.resource.attribute.labels.value Jika log_format ada, nilainya akan dipetakan ke UDM dengan kunci log_format.
log_level.__class_uuid__ target.resource.attribute.labels.value Jika log_level.__class_uuid__ ada, nilainya akan dipetakan ke UDM dengan kunci __class_uuid__.
log_level.name target.resource.attribute.labels.value DAN security_result.severity Jika log_level.name ada, nilainya akan dipetakan ke UDM dengan kunci name. Jika nilainya adalah "info", security_result.severity ditetapkan ke INFORMATIONAL.
log_logger.unpersistable target.resource.attribute.labels.value Jika log_logger.unpersistable ada, nilainya akan dipetakan ke UDM dengan kunci unpersistable.
log_namespace target.resource.attribute.labels.value Jika log_namespace ada, nilainya akan dipetakan ke UDM dengan kunci log_namespace.
log_source target.resource.attribute.labels.value Jika log_source ada, nilainya akan dipetakan ke UDM dengan kunci log_source.
msg security_result.summary Jika ada dan reason kosong, nilai dipetakan ke UDM.
reason security_result.summary Jika ada, nilai dipetakan ke UDM.
result security_result.action_details DAN security_result.action Jika ada, nilai dipetakan ke security_result.action_details. "success" atau "SUCCESS" diterjemahkan menjadi security_result.action ALLOW, jika tidak, BLOCK.
server_section target.resource.attribute.labels.value Jika server_section ada, nilainya akan dipetakan ke UDM dengan kunci server_section.
server_section_ikey target.resource.attribute.labels.value Jika server_section_ikey ada, nilainya akan dipetakan ke UDM dengan kunci server_section_ikey.
status security_result.action_details DAN security_result.action Jika ada, nilai dipetakan ke security_result.action_details. "Izinkan" diterjemahkan menjadi security_result.action IZINKAN, "Tolak" diterjemahkan menjadi BLOKIR.
timestamp metadata.event_timestamp DAN event.timestamp Nilai dikonversi menjadi stempel waktu dan dipetakan ke metadata.event_timestamp dan event.timestamp.
txid metadata.product_log_id DAN network.session_id Nilai dipetakan ke metadata.product_log_id dan network.session_id.
user.groups target.user.group_identifiers Semua nilai dalam array ditambahkan ke target.user.group_identifiers.
user.key target.user.product_object_id Jika ada, nilai dipetakan ke UDM.
user.name target.user.userid Jika ada, nilai dipetakan ke UDM.
username target.user.userid Jika ada dan user.name tidak ada, nilai akan dipetakan ke UDM. event_type disetel ke USER_LOGIN.
(Logika Parser) metadata.vendor_name Selalu ditetapkan ke "DUO_SECURITY".
(Logika Parser) metadata.product_name Selalu ditetapkan ke "MULTI-FACTOR_AUTHENTICATION".
(Logika Parser) metadata.log_type Diambil dari kolom log_type tingkat teratas log mentah.
(Logika Parser) extensions.auth.type Selalu ditetapkan ke "SSO".

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.