Mengumpulkan log audit DigiCert

Didukung di:

Dokumen ini menjelaskan cara menyerap log audit DigiCert ke Google Security Operations menggunakan Amazon S3.

Sebelum memulai

  • Instance Google SecOps
  • Akses istimewa ke DigiCert CertCentral (kunci API dengan peran Administrator)
  • Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)

Mendapatkan kunci API DigiCert dan ID laporan

  1. Di CertCentral, buka Account > API Keys, lalu buat kunci API (X-DC-DEVKEY).
  2. Di Laporan > Pustaka Laporan, buat laporan Log audit dengan format JSON dan catat ID Laporan (UUID).
  3. Anda juga dapat menemukan ID laporan yang ada menggunakan histori laporan.

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, digicert-logs).
  3. Buat pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk digunakan nanti.
  12. Klik Selesai.
  13. Pilih tab Izin.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung
  17. Telusuri dan pilih kebijakan AmazonS3FullAccess.
  18. Klik Berikutnya.
  19. Klik Add permissions.

Mengonfigurasi kebijakan dan peran IAM untuk upload S3

  1. Buka konsol AWS > IAM > Policies > Create policy > tab JSON.
  2. Masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDigiCertObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::digicert-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json"
        }
      ]
    }
    
    
    • Ganti digicert-logs jika Anda memasukkan nama bucket yang berbeda.
  3. Klik Berikutnya > Buat kebijakan.

  4. Buka IAM > Roles > Create role > AWS service > Lambda.

  5. Lampirkan kebijakan yang baru dibuat.

  6. Beri nama peran WriteDigicertToS3Role, lalu klik Buat peran.

Buat fungsi Lambda

  1. Di Konsol AWS, buka Lambda > Functions > Create function.
  2. Klik Buat dari awal.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama digicert_audit_logs_to_s3
    Runtime Python 3.13
    Arsitektur x86_64
    Peran eksekusi WriteDigicertToS3Role
  4. Setelah fungsi dibuat, buka tab Code, hapus stub, lalu masukkan kode berikut (digicert_audit_logs_to_s3.py):

    #!/usr/bin/env python3
    
    import datetime as dt, gzip, io, json, os, time, uuid, zipfile
    from typing import Any, Dict, Iterable, List, Tuple
    from urllib import request, parse, error
    import boto3
    from botocore.exceptions import ClientError
    
    API_BASE = "https://api.digicert.com/reports/v1"
    USER_AGENT = "secops-digicert-reports/1.0"
    s3 = boto3.client("s3")
    
    def _now() -> dt.datetime:
        return dt.datetime.now(dt.timezone.utc)
    
    def _http(method: str, url: str, api_key: str, body: bytes | None = None,
              timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]:
        headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT}
        attempt, backoff = 0, 1.0
        while True:
            req = request.Request(url=url, method=method, headers=headers, data=body)
            try:
                with request.urlopen(req, timeout=timeout) as resp:
                    status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()}
                    data = resp.read()
                    if 500 <= status <= 599 and attempt < max_retries:
                        attempt += 1; time.sleep(backoff); backoff *= 2; continue
                    return status, h, data
            except error.HTTPError as e:
                status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()}
                if status == 429 and attempt < max_retries:
                    ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff
                    attempt += 1; time.sleep(delay); backoff *= 2; continue
                if 500 <= status <= 599 and attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
            except error.URLError:
                if attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
    
    def start_report_run(api_key: str, report_id: str, timeout: int) -> None:
        st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout)
        if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}")
    
    def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None,
                            limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC",
                            timeout: int = 30, offset: int = 0) -> Dict[str, Any]:
        qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction}
        if status_filter: qs["status"] = status_filter
        if report_type: qs["report_type"] = report_type
        st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}")
        return json.loads(body.decode("utf-8"))
    
    def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime,
                      timeout: int, max_wait_seconds: int, poll_interval: int) -> str:
        deadline = time.time() + max_wait_seconds
        while time.time() < deadline:
            hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs",
                                      limit=200, timeout=timeout).get("report_history", [])
            for it in hist:
                if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue
                try:
                    rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc)
                except Exception:
                    rsd = started_not_before
                if rsd + dt.timedelta(seconds=60) >= started_not_before:
                    return it["report_run_identifier"]
            time.sleep(poll_interval)
        raise TimeoutError("READY run not found in time")
    
    def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]:
        st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}")
        if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK":
            with zipfile.ZipFile(io.BytesIO(body)) as zf:
                name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None)
                if not name: raise RuntimeError("ZIP has no JSON")
                rows = json.loads(zf.read(name).decode("utf-8"))
        else:
            rows = json.loads(body.decode("utf-8"))
        if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format")
        return rows
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        try:
            return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8"))
        except ClientError as e:
            if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {}
            raise
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None:
        s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json")
    
    def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str:
        ts = _now().strftime("%Y/%m/%d/%H%M%S")
        key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
            for r in rows:
                gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8"))
        s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(),
                      ContentType="application/x-ndjson", ContentEncoding="gzip")
        return key
    
    def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
        api_key  = os.environ["DIGICERT_API_KEY"]
        report_id = os.environ["DIGICERT_REPORT_ID"]
        bucket   = os.environ["S3_BUCKET"]
        prefix   = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/")
        state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json")
        max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300"))
        poll_int = int(os.environ.get("POLL_INTERVAL", "10"))
        timeout  = int(os.environ.get("REQUEST_TIMEOUT", "30"))
    
        state = load_state(bucket, state_key) if state_key else {}
        last_run = state.get("last_run_id")
    
        started = _now()
        start_report_run(api_key, report_id, timeout)
        run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int)
        if last_run and last_run == run_id:
            return {"status":"skip", "report_run_identifier": run_id}
    
        rows = get_json_rows(api_key, report_id, run_id, timeout)
        key = write_ndjson_gz(bucket, prefix, rows, run_id)
        if state_key:
            save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(),
                                          "last_s3_key": key, "rows_count": len(rows)})
        return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id,
                "rows": len(rows), "s3_key": key}
    
  5. Buka Configuration > Environment variables > Edit > Add new environment variable.

  6. Masukkan variabel lingkungan berikut, ganti dengan nilai Anda:

    Kunci Contoh
    S3_BUCKET digicert-logs
    S3_PREFIX digicert/logs/
    STATE_KEY digicert/logs/state.json
    DIGICERT_API_KEY xxxxxxxxxxxxxxxxxxxxxxxx
    DIGICERT_REPORT_ID 88de5e19-ec57-4d70-865d-df953b062574
    REQUEST_TIMEOUT 30
    POLL_INTERVAL 10
    MAX_WAIT_SECONDS 300
  7. Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).

  8. Pilih tab Configuration

  9. Di panel General configuration, klik Edit.

  10. Ubah Waktu tunggu menjadi 15 menit (900 detik), lalu klik Simpan.

Membuat jadwal EventBridge

  1. Buka Amazon EventBridge > Scheduler > Create schedule.
  2. Berikan detail konfigurasi berikut:
    • Jadwal berulang: Tarif (1 hour).
    • Target: fungsi Lambda Anda.
    • Name: digicert-audit-1h.
  3. Klik Buat jadwal.

Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps

  1. Di Konsol AWS, buka IAM > Users, lalu klik Add users.
  2. Berikan detail konfigurasi berikut:
    • Pengguna: Masukkan nama unik (misalnya, secops-reader)
    • Jenis akses: Pilih Kunci akses - Akses terprogram
    • Klik Buat pengguna.
  3. Lampirkan kebijakan baca minimal (kustom): Pengguna > pilih secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan
  4. Di editor JSON, masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Nama = secops-reader-policy.

  6. Klik Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.

  7. Buat kunci akses untuk secops-reader: Security credentials > Access keys > Create access key Create access key**.

  8. Download CSV (nilai ini dimasukkan ke dalam feed).

Mengonfigurasi feed di Google SecOps untuk menyerap log DigiCert

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Di kolom Nama feed, masukkan nama untuk feed (misalnya, DigiCert Audit Logs).
  4. Pilih Amazon S3 V2 sebagai Jenis sumber.
  5. Pilih Digicert sebagai Jenis log.
  6. Klik Berikutnya.
  7. Tentukan nilai untuk parameter input berikut:
    • URI S3: s3://digicert-logs/digicert/logs/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
    • Usia File Maksimum: Default 180 Hari.
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
    • Namespace aset: Namespace aset.
    • Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.
  8. Klik Berikutnya.
  9. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.