Mengumpulkan log PingOne Advanced Identity Cloud

Didukung di:

Dokumen ini menjelaskan cara menyerap log PingOne Advanced Identity Cloud ke Google Security Operations menggunakan Amazon S3.

Sebelum memulai

  • Instance Google SecOps
  • Akses istimewa ke tenant PingOne Advanced Identity Cloud
  • Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)

Mendapatkan kunci API PingOne dan FQDN tenant

  1. Login ke konsol admin Advanced Identity Cloud.
  2. Klik ikon pengguna, lalu klik Tenant Settings.
  3. Di tab Setelan Global, klik Kunci API Log.
  4. Klik New Log API Key, berikan nama untuk kunci.
  5. Klik Buat Kunci.
  6. Salin dan simpan nilai api_key_id dan api_key_secret di lokasi yang aman. Nilai api_key_secret tidak akan ditampilkan lagi.
  7. Klik Done
  8. Buka Tenant Settings > Details, lalu temukan FQDN tenant Anda (misalnya, example.tomcat.pingone.com).

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, pingone-aic-logs).
  3. Buat pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk digunakan nanti.
  12. Klik Selesai.
  13. Pilih tab Izin.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung
  17. Telusuri dan pilih kebijakan AmazonS3FullAccess.
  18. Klik Berikutnya.
  19. Klik Add permissions.

Mengonfigurasi kebijakan dan peran IAM untuk upload S3

  1. Di konsol AWS, buka IAM > Policies > Create policy > JSON tab.
  2. Masukkan kebijakan berikut.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutPingOneAICObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::pingone-aic-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json"
        }
      ]
    }
    
    • Ganti pingone-aic-logs jika Anda memasukkan nama bucket yang berbeda.
  3. Klik Berikutnya > Buat kebijakan.

  4. Buka IAM > Roles > Create role > AWS service > Lambda.

  5. Lampirkan kebijakan yang baru dibuat.

  6. Beri nama peran WritePingOneAICToS3Role, lalu klik Buat peran.

Buat fungsi Lambda

  1. Di Konsol AWS, buka Lambda > Functions > Create function.
  2. Klik Buat dari awal.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama pingone_aic_to_s3
    Runtime Python 3.13
    Arsitektur x86_64
    Peran eksekusi WritePingOneAICToS3Role
  4. Setelah fungsi dibuat, buka tab Code, hapus stub, lalu masukkan kode berikut (pingone_aic_to_s3.py):

    #!/usr/bin/env python3
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    FQDN = os.environ["AIC_TENANT_FQDN"].strip("/")
    API_KEY_ID = os.environ["AIC_API_KEY_ID"]
    API_KEY_SECRET = os.environ["AIC_API_SECRET"]
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "pingone-aic/logs/").strip("/")
    SOURCES = [s.strip() for s in os.environ.get("SOURCES", "am-everything,idm-everything").split(",") if s.strip()]
    PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "500")), 1000)  # hard cap per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "pingone-aic/logs/state.json")
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    s3 = boto3.client("s3")
    
    def _headers():
        return {"x-api-key": API_KEY_ID, "x-api-secret": API_KEY_SECRET}
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str, timeout: int = 60, max_retries: int = 5) -> dict:
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method="GET", headers=_headers())
            try:
                with urlopen(req, timeout=timeout) as r:
                    data = r.read()
                    return json.loads(data.decode("utf-8"))
            except HTTPError as e:
                # 429: respect X-RateLimit-Reset (epoch seconds) if present
                if e.code == 429 and attempt < max_retries:
                    reset = e.headers.get("X-RateLimit-Reset")
                    now = int(time.time())
                    delay = max(1, int(reset) - now) if (reset and reset.isdigit()) else int(backoff)
                    time.sleep(delay); attempt += 1; backoff *= 2; continue
                if 500 <= e.code <= 599 and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _load_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {"sources": {}}
    
    def _save_state(state: dict):
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY,
                      Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")
    
    def _write_page(payload: dict, source: str) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-pingone-aic-{source}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key,
                      Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")
        return key
    
    def _bounded_begin_time(last_ts: str | None, now: float) -> str:
        # beginTime must be <= 24h before endTime (now if endTime omitted)
        # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback
        twenty_four_h_ago = now - 24*3600
        if last_ts:
            try:
                t_struct = time.strptime(last_ts[:19] + "Z", "%Y-%m-%dT%H:%M:%SZ")
                t_epoch = int(time.mktime(t_struct))
            except Exception:
                t_epoch = int(now - LOOKBACK_SECONDS)
            begin_epoch = max(t_epoch, int(twenty_four_h_ago))
        else:
            begin_epoch = max(int(now - LOOKBACK_SECONDS), int(twenty_four_h_ago))
        return _iso(begin_epoch)
    
    def fetch_source(source: str, last_ts: str | None):
        base = f"https://{FQDN}/monitoring/logs"
        now = time.time()
        params = {
            "source": source,
            "_pageSize": str(PAGE_SIZE),
            "_sortKeys": "timestamp",
            "beginTime": _bounded_begin_time(last_ts, now)
        }
    
        pages = 0
        written = 0
        newest_ts = last_ts
        cookie = None
    
        while pages < MAX_PAGES:
            if cookie:
                params["_pagedResultsCookie"] = cookie
            qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
            data = _http_get(f"{base}?{qs}")
            _write_page(data, source)
    
            results = data.get("result") or data.get("results") or []
            for item in results:
                t = item.get("timestamp") or item.get("payload", {}).get("timestamp")
                if t and (newest_ts is None or t > newest_ts):
                    newest_ts = t
    
            written += len(results)
            cookie = data.get("pagedResultsCookie")
            pages += 1
            if not cookie:
                break
    
        return {"source": source, "pages": pages, "written": written, "newest_ts": newest_ts}
    
    def lambda_handler(event=None, context=None):
        state = _load_state()
        state.setdefault("sources", {})
        summary = []
        for source in SOURCES:
            last_ts = state["sources"].get(source, {}).get("last_ts")
            res = fetch_source(source, last_ts)
            if res.get("newest_ts"):
                state["sources"][source] = {"last_ts": res["newest_ts"]}
            summary.append(res)
        _save_state(state)
        return {"ok": True, "summary": summary}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Buka Configuration > Environment variables > Edit > Add new environment variable.

  6. Masukkan variabel lingkungan berikut, ganti dengan nilai Anda:

    Kunci Contoh
    S3_BUCKET pingone-aic-logs
    S3_PREFIX pingone-aic/logs/
    STATE_KEY pingone-aic/logs/state.json
    AIC_TENANT_FQDN example.tomcat.pingone.com
    AIC_API_KEY_ID <api_key_id>
    AIC_API_SECRET <api_key_secret>
    SOURCES am-everything,idm-everything
    PAGE_SIZE 500
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  7. Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).

  8. Pilih tab Configuration

  9. Di panel General configuration, klik Edit.

  10. Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.

Membuat jadwal EventBridge

  1. Buka Amazon EventBridge > Scheduler > Create schedule.
  2. Berikan detail konfigurasi berikut:
    • Jadwal berulang: Tarif (1 hour).
    • Target: fungsi Lambda Anda.
    • Name: pingone-aic-1h.
  3. Klik Buat jadwal.

Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps

  1. Di Konsol AWS, buka IAM > Users, lalu klik Add users.
  2. Berikan detail konfigurasi berikut:
    • Pengguna: Masukkan nama unik (misalnya, secops-reader)
    • Jenis akses: Pilih Kunci akses - Akses terprogram
    • Klik Buat pengguna.
  3. Lampirkan kebijakan baca minimal (kustom): Pengguna > pilih secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan
  4. Di editor JSON, masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Tetapkan nama ke secops-reader-policy.

  6. Buka Buat kebijakan > telusuri/pilih > Berikutnya > Tambahkan izin.

  7. Buka Kredensial keamanan > Kunci akses > Buat kunci akses.

  8. Download CSV (nilai ini dimasukkan ke dalam feed).

Mengonfigurasi feed di Google SecOps untuk menyerap log PingOne Advanced Identity Cloud

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Di kolom Nama feed, masukkan nama untuk feed (misalnya, PingOne Advanced Identity Cloud).
  4. Pilih Amazon S3 V2 sebagai Jenis sumber.
  5. Pilih PingOne Advanced Identity Cloud sebagai Jenis log.
  6. Klik Berikutnya.
  7. Tentukan nilai untuk parameter input berikut:
    • URI S3: s3://pingone-aic-logs/pingone-aic/logs/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
    • Usia File Maksimum: Default 180 Hari.
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
    • Namespace aset: Namespace aset.
    • Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.
  8. Klik Berikutnya.
  9. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.