收集 PingOne Advanced Identity Cloud 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 PingOne Advanced Identity Cloud 記錄檔擷取至 Google Security Operations。

事前準備

  • Google SecOps 執行個體
  • PingOne Advanced Identity Cloud 租戶的特殊存取權
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)

取得 PingOne API 金鑰和租戶 FQDN

  1. 登入 Advanced Identity Cloud 管理控制台。
  2. 按一下使用者圖示,然後點選「租戶設定」
  3. 在「全域設定」分頁中,按一下「記錄 API 金鑰」
  4. 按一下「New Log API Key」(新增記錄 API 金鑰),然後為金鑰命名。
  5. 按一下 [Create Key] (建立金鑰)
  6. 複製並儲存 api_key_idapi_key_secret 值,並妥善保存。系統不會再次顯示 api_key_secret 值。
  7. 按一下「完成」
  8. 依序前往「租戶設定」>「詳細資料」,找出租戶 FQDN (例如 example.tomcat.pingone.com)。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照本使用指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 pingone-aic-logs)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 管理控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 輸入下列政策。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutPingOneAICObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::pingone-aic-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json"
        }
      ]
    }
    
    • 如果您輸入的值區名稱不同,請替換 pingone-aic-logs
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  5. 附加新建立的政策。

  6. 為角色命名 WritePingOneAICToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 pingone_aic_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WritePingOneAICToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (pingone_aic_to_s3.py):

    #!/usr/bin/env python3
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    FQDN = os.environ["AIC_TENANT_FQDN"].strip("/")
    API_KEY_ID = os.environ["AIC_API_KEY_ID"]
    API_KEY_SECRET = os.environ["AIC_API_SECRET"]
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "pingone-aic/logs/").strip("/")
    SOURCES = [s.strip() for s in os.environ.get("SOURCES", "am-everything,idm-everything").split(",") if s.strip()]
    PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "500")), 1000)  # hard cap per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "pingone-aic/logs/state.json")
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    s3 = boto3.client("s3")
    
    def _headers():
        return {"x-api-key": API_KEY_ID, "x-api-secret": API_KEY_SECRET}
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str, timeout: int = 60, max_retries: int = 5) -> dict:
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method="GET", headers=_headers())
            try:
                with urlopen(req, timeout=timeout) as r:
                    data = r.read()
                    return json.loads(data.decode("utf-8"))
            except HTTPError as e:
                # 429: respect X-RateLimit-Reset (epoch seconds) if present
                if e.code == 429 and attempt < max_retries:
                    reset = e.headers.get("X-RateLimit-Reset")
                    now = int(time.time())
                    delay = max(1, int(reset) - now) if (reset and reset.isdigit()) else int(backoff)
                    time.sleep(delay); attempt += 1; backoff *= 2; continue
                if 500 <= e.code <= 599 and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _load_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {"sources": {}}
    
    def _save_state(state: dict):
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY,
                      Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")
    
    def _write_page(payload: dict, source: str) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-pingone-aic-{source}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key,
                      Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")
        return key
    
    def _bounded_begin_time(last_ts: str | None, now: float) -> str:
        # beginTime must be <= 24h before endTime (now if endTime omitted)
        # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback
        twenty_four_h_ago = now - 24*3600
        if last_ts:
            try:
                t_struct = time.strptime(last_ts[:19] + "Z", "%Y-%m-%dT%H:%M:%SZ")
                t_epoch = int(time.mktime(t_struct))
            except Exception:
                t_epoch = int(now - LOOKBACK_SECONDS)
            begin_epoch = max(t_epoch, int(twenty_four_h_ago))
        else:
            begin_epoch = max(int(now - LOOKBACK_SECONDS), int(twenty_four_h_ago))
        return _iso(begin_epoch)
    
    def fetch_source(source: str, last_ts: str | None):
        base = f"https://{FQDN}/monitoring/logs"
        now = time.time()
        params = {
            "source": source,
            "_pageSize": str(PAGE_SIZE),
            "_sortKeys": "timestamp",
            "beginTime": _bounded_begin_time(last_ts, now)
        }
    
        pages = 0
        written = 0
        newest_ts = last_ts
        cookie = None
    
        while pages < MAX_PAGES:
            if cookie:
                params["_pagedResultsCookie"] = cookie
            qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
            data = _http_get(f"{base}?{qs}")
            _write_page(data, source)
    
            results = data.get("result") or data.get("results") or []
            for item in results:
                t = item.get("timestamp") or item.get("payload", {}).get("timestamp")
                if t and (newest_ts is None or t > newest_ts):
                    newest_ts = t
    
            written += len(results)
            cookie = data.get("pagedResultsCookie")
            pages += 1
            if not cookie:
                break
    
        return {"source": source, "pages": pages, "written": written, "newest_ts": newest_ts}
    
    def lambda_handler(event=None, context=None):
        state = _load_state()
        state.setdefault("sources", {})
        summary = []
        for source in SOURCES:
            last_ts = state["sources"].get(source, {}).get("last_ts")
            res = fetch_source(source, last_ts)
            if res.get("newest_ts"):
                state["sources"][source] = {"last_ts": res["newest_ts"]}
            summary.append(res)
        _save_state(state)
        return {"ok": True, "summary": summary}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  6. 輸入下列環境變數,並將 換成您的值:

    範例
    S3_BUCKET pingone-aic-logs
    S3_PREFIX pingone-aic/logs/
    STATE_KEY pingone-aic/logs/state.json
    AIC_TENANT_FQDN example.tomcat.pingone.com
    AIC_API_KEY_ID <api_key_id>
    AIC_API_SECRET <api_key_secret>
    SOURCES am-everything,idm-everything
    PAGE_SIZE 500
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  7. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  8. 選取「設定」分頁標籤。

  9. 在「一般設定」面板中,按一下「編輯」

  10. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式。
    • 名稱pingone-aic-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」
  2. 提供下列設定詳細資料:
    • 使用者:輸入不重複的名稱 (例如 secops-reader)
    • 存取權類型:選取「存取金鑰 - 程式輔助存取」
    • 按一下「建立使用者」
  3. 附加最低讀取權限政策 (自訂):依序選取「使用者」secops-reader「權限」「新增權限」「直接附加政策」「建立政策」
  4. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 將名稱設為 secops-reader-policy

  6. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  7. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  8. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定資訊提供,以便擷取 PingOne Advanced Identity Cloud 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 PingOne Advanced Identity Cloud)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「PingOne Advanced Identity Cloud」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://pingone-aic-logs/pingone-aic/logs/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。