Mengumpulkan log administrator Duo
Dokumen ini menjelaskan cara menyerap log administrator Duo ke Google Security Operations menggunakan Amazon S3. Parser mengekstrak kolom dari log (format JSON) dan memetakannya ke Model Data Terpadu (UDM). UDM menangani berbagai jenis action
Duo (login, pengelolaan pengguna, pengelolaan grup) secara berbeda, dengan mengisi kolom UDM yang relevan berdasarkan tindakan dan data yang tersedia, termasuk detail pengguna, faktor autentikasi, dan hasil keamanan. Selain itu, layanan ini juga melakukan transformasi data, seperti menggabungkan alamat IP, mengonversi stempel waktu, dan menangani error.
Sebelum memulai
- Instance Google SecOps
- Akses istimewa ke tenant Duo (aplikasi Admin API)
- Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)
Mengonfigurasi aplikasi Duo Admin API
- Login ke Panel Admin Duo.
- Buka Applications > Application Catalog.
- Tambahkan aplikasi Admin API.
- Catat nilai berikut:
- Kunci integrasi (ikey)
- Kunci rahasia (skey)
- Nama host API (misalnya,
api-XXXXXXXX.duosecurity.com
)
- Di Izin, aktifkan Berikan log baca (untuk membaca log administrator).
- Simpan aplikasi.
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
duo-admin-logs
). - Buat pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk digunakan nanti.
- Klik Selesai.
- Pilih tab Izin.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung
- Telusuri dan pilih kebijakan AmazonS3FullAccess.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Buka konsol AWS > IAM > Policies > Create policy > tab JSON.
Masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }
- Ganti
duo-admin-logs
jika Anda memasukkan nama bucket yang berbeda:
- Ganti
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat.
Beri nama peran
WriteDuoAdminToS3Role
, lalu klik Buat peran.
Buat fungsi Lambda
- Di Konsol AWS, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama duo_admin_to_s3
Runtime Python 3.13 Arsitektur x86_64 Peran eksekusi WriteDuoAdminToS3Role
Setelah fungsi dibuat, buka tab Code, hapus stub, lalu masukkan kode berikut (
duo_admin_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Buka Configuration > Environment variables > Edit > Add new environment variable.
Masukkan variabel lingkungan berikut, lalu ganti dengan nilai Anda.
Kunci Contoh S3_BUCKET
duo-admin-logs
S3_PREFIX
duo/admin/
STATE_KEY
duo/admin/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).
Pilih tab Configuration
Di panel General configuration, klik Edit.
Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour
). - Target: fungsi Lambda Anda.
- Name:
duo-admin-1h
.
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Di Konsol AWS, buka IAM > Users, lalu klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan nama unik (misalnya,
secops-reader
) - Jenis akses: Pilih Kunci akses - Akses terprogram
- Klik Buat pengguna.
- Pengguna: Masukkan nama unik (misalnya,
- Lampirkan kebijakan baca minimal (kustom): Pengguna > pilih
secops-reader
> Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan Di editor JSON, masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Tetapkan nama ke
secops-reader-policy
.Buka Buat kebijakan > telusuri/pilih > Berikutnya > Tambahkan izin.
Buka Kredensial keamanan > Kunci akses > Buat kunci akses.
Download CSV (nilai ini dimasukkan ke dalam feed).
Mengonfigurasi feed di Google SecOps untuk memproses Log Administrator Duo
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
Duo Administrator Logs
). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Log Administrator Duo sebagai Jenis log.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://duo-admin-logs/duo/admin/
- Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Default 180 Hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: namespace aset.
- Label penyerapan: label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Tabel Pemetaan UDM
Kolom Log | Pemetaan UDM | Logika |
---|---|---|
action |
metadata.product_event_type |
Nilai kolom action dari log mentah. |
desc |
metadata.description |
Nilai kolom desc dari objek description log mentah. |
description._status |
target.group.attribute.labels.value |
Nilai kolom _status dalam objek description dari log mentah, khususnya saat memproses tindakan terkait grup. Nilai ini ditempatkan dalam array "labels" dengan "key" yang sesuai, yaitu "status". |
description.desc |
metadata.description |
Nilai kolom desc dari objek description log mentah. |
description.email |
target.user.email_addresses |
Nilai kolom email dari objek description log mentah. |
description.error |
security_result.summary |
Nilai kolom error dari objek description log mentah. |
description.factor |
extensions.auth.auth_details |
Nilai kolom factor dari objek description log mentah. |
description.groups.0._status |
target.group.attribute.labels.value |
Nilai kolom _status dari elemen pertama dalam array groups dalam objek description log mentah. Nilai ini ditempatkan dalam array "labels" dengan "key" yang sesuai, yaitu "status". |
description.groups.0.name |
target.group.group_display_name |
Nilai kolom name dari elemen pertama dalam array groups dalam objek description log mentah. |
description.ip_address |
principal.ip |
Nilai kolom ip_address dari objek description log mentah. |
description.name |
target.group.group_display_name |
Nilai kolom name dari objek description log mentah. |
description.realname |
target.user.user_display_name |
Nilai kolom realname dari objek description log mentah. |
description.status |
target.user.attribute.labels.value |
Nilai kolom status dari objek description log mentah. Nilai ini ditempatkan dalam array "labels" dengan "key" yang sesuai, yaitu "status". |
description.uname |
target.user.email_addresses atau target.user.userid |
Nilai kolom uname dari objek description log mentah. Jika cocok dengan format alamat email, maka akan dipetakan ke email_addresses ; jika tidak, akan dipetakan ke userid . |
host |
principal.hostname |
Nilai kolom host dari log mentah. |
isotimestamp |
metadata.event_timestamp.seconds |
Nilai kolom isotimestamp dari log mentah, dikonversi menjadi detik epoch. |
object |
target.group.group_display_name |
Nilai kolom object dari log mentah. |
timestamp |
metadata.event_timestamp.seconds |
Nilai kolom timestamp dari log mentah. |
username |
target.user.userid atau principal.user.userid |
Jika kolom action berisi "login", nilai akan dipetakan ke target.user.userid . Jika tidak, nilai ini dipetakan ke principal.user.userid . Tetapkan ke "USERNAME_PASSWORD" jika kolom action berisi "login". Ditentukan oleh parser berdasarkan kolom action . Nilai yang mungkin: USER_LOGIN , GROUP_CREATION , USER_UNCATEGORIZED , GROUP_DELETION , USER_CREATION , GROUP_MODIFICATION , GENERIC_EVENT . Selalu ditetapkan ke "DUO_ADMIN". Selalu ditetapkan ke "MULTI-FACTOR_AUTHENTICATION". Selalu ditetapkan ke "DUO_SECURITY". Tetapkan ke "ADMINISTRATOR" jika kolom eventtype berisi "admin". Ditentukan oleh parser berdasarkan kolom action . Tetapkan ke "BLOCK" jika kolom action berisi "error"; jika tidak, tetapkan ke "ALLOW". Selalu ditetapkan ke "status" saat mengisi target.group.attribute.labels . Selalu ditetapkan ke "status" saat mengisi target.user.attribute.labels . |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.