收集 Code42 Incydr 核心資料集

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Code42 Incydr 核心資料集 (使用者、工作階段、稽核、案件和檔案事件 (選用)) 擷取至 Google Security Operations。

事前準備

  • Google SecOps 執行個體
  • Code42 Incydr 的特殊存取權
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)

收集來源必要條件 (ID、API 金鑰、機構 ID、權杖)

  1. 登入 Code42 Incydr 網頁版 UI。
  2. 依序前往「Administration」(管理) >「Integrations」(整合) >「API Clients」(API 用戶端)
  3. 建立新的用戶端。
  4. 複製下列詳細資料並儲存至安全位置:
    1. 用戶端 ID
    2. 用戶端密鑰
    3. 基準網址:(例如 https://api.us.code42.comhttps://api.us2.code42.comhttps://api.ie.code42.comhttps://api.gov.code42.com)。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,稍後會用到。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 AWS Lambda,輪詢 Code42 Incydr (不轉換)

  1. 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 提供下列設定詳細資料:
    • 名稱:輸入有意義的不重複名稱 (例如 code42-incydr-pull)
    • 執行階段:選取「Python 3.13」
    • 權限:選取具有 s3:PutObjectCloudwatch 的角色。
  4. 按一下「建立函式」
  5. 依序選取「設定」>「一般設定」>「編輯」
  6. 設定 Timeout=5mMemory=1024 MB
  7. 按一下 [儲存]
  8. 依序選取「Configuration」>「Environment variables」>「Edit」>「Add」
    1. INCYDR_BASE_URL = https://api.us.code42.com
    2. INCYDR_CLIENT_ID = <Client ID>
    3. INCYDR_CLIENT_SECRET = <Client Secret>
    4. S3_BUCKET = code42-incydr
    5. S3_PREFIX = code42/
    6. PAGE_SIZE = 500
    7. LOOKBACK_MINUTES = 60
    8. STREAMS = users,sessions,audit,cases
    9. 選用:FE_ADV_QUERY_JSON = ``
    10. 選用:FE_PAGE_SIZE = 1000
  9. 按一下 [儲存]
  10. 選取「程式碼」,然後輸入下列 Python 程式碼:

    import base64, json, os, time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    from urllib.request import Request, urlopen
    import boto3
    
    BASE = os.environ["INCYDR_BASE_URL"].rstrip("/")
    CID = os.environ["INCYDR_CLIENT_ID"]
    CSECRET = os.environ["INCYDR_CLIENT_SECRET"]
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX_BASE = os.environ.get("S3_PREFIX", "code42/")
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60"))
    STREAMS = [s.strip() for s in os.environ.get("STREAMS", "users").split(",") if s.strip()]
    FE_ADV_QUERY_JSON = os.environ.get("FE_ADV_QUERY_JSON", "").strip()
    FE_PAGE_SIZE = int(os.environ.get("FE_PAGE_SIZE", "1000"))
    
    s3 = boto3.client("s3")
    
    def now_utc():
        return datetime.now(timezone.utc)
    
    def iso_minus(minutes: int):
        return (now_utc() - timedelta(minutes=minutes)).strftime("%Y-%m-%dT%H:%M:%SZ")
    
    def put_bytes(key: str, body: bytes):
        s3.put_object(Bucket=BUCKET, Key=key, Body=body)
    
    def put_json(prefix: str, page_label: str, data):
        ts = now_utc().strftime("%Y/%m/%d/%H%M%S")
        key = f"{PREFIX_BASE}{prefix}{ts}-{page_label}.json"
        put_bytes(key, json.dumps(data).encode("utf-8"))
        return key
    
    def auth_header():
        auth = base64.b64encode(f"{CID}:{CSECRET}".encode()).decode()
        req = Request(f"{BASE}/v1/oauth", data=b"", method="POST")
        req.add_header("Authorization", f"Basic {auth}")
        req.add_header("Accept", "application/json")
        with urlopen(req, timeout=30) as r:
            data = json.loads(r.read().decode())
        return {"Authorization": f"Bearer {data['access_token']}", "Accept": "application/json"}
    
    def http_get(path: str, params: dict | None = None, headers: dict | None = None):
        url = f"{BASE}{path}"
        if params:
            url += ("?" + urlencode(params))
        req = Request(url, method="GET")
        for k, v in (headers or {}).items():
            req.add_header(k, v)
        with urlopen(req, timeout=60) as r:
            return r.read()
    
    def http_post_json(path: str, body: dict, headers: dict | None = None):
        url = f"{BASE}{path}"
        req = Request(url, data=json.dumps(body).encode("utf-8"), method="POST")
        req.add_header("Content-Type", "application/json")
        for k, v in (headers or {}).items():
            req.add_header(k, v)
        with urlopen(req, timeout=120) as r:
            return r.read()
    
    # USERS (/v1/users)
    
    def pull_users(hdrs):
        next_token = None
        pages = 0
        while True:
            params = {"active": "true", "blocked": "false", "pageSize": PAGE_SIZE}
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/users", params, hdrs)
            data = json.loads(raw.decode())
            put_json("users/", f"users-page-{pages}", data)
            pages += 1
            next_token = data.get("nextPgToken") or data.get("next_pg_token")
            if not next_token:
                break
        return pages
    
    # SESSIONS (/v1/sessions) — alerts live inside sessions
    
    def pull_sessions(hdrs):
        start_iso = iso_minus(LOOKBACK_MINUTES)
        next_token = None
        pages = 0
        while True:
            params = {
                "hasAlerts": "true",
                "startTime": start_iso,
                "pgSize": PAGE_SIZE,
            }
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/sessions", params, hdrs)
            data = json.loads(raw.decode())
            put_json("sessions/", f"sessions-page-{pages}", data)
            pages += 1
            next_token = data.get("nextPgToken") or data.get("next_page_token")
            if not next_token:
                break
        return pages
    
    # AUDIT LOG (/v1/audit) — CSV export or paged JSON; write as received
    
    def pull_audit(hdrs):
        start_iso = iso_minus(LOOKBACK_MINUTES)
        next_token = None
        pages = 0
        while True:
            params = {"startTime": start_iso, "pgSize": PAGE_SIZE}
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/audit", params, hdrs)
            try:
                data = json.loads(raw.decode())
                put_json("audit/", f"audit-page-{pages}", data)
                next_token = data.get("nextPgToken") or data.get("next_page_token")
                pages += 1
                if not next_token:
                    break
            except Exception:
                ts = now_utc().strftime("%Y/%m/%d/%H%M%S")
                key = f"{PREFIX_BASE}audit/{ts}-audit-export.bin"
                put_bytes(key, raw)
                pages += 1
                break
        return pages
    
    # CASES (/v1/cases)
    
    def pull_cases(hdrs):
        next_token = None
        pages = 0
        while True:
            params = {"pgSize": PAGE_SIZE}
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/cases", params, hdrs)
            data = json.loads(raw.decode())
            put_json("cases/", f"cases-page-{pages}", data)
            pages += 1
            next_token = data.get("nextPgToken") or data.get("next_page_token")
            if not next_token:
                break
        return pages
    
    # FILE EVENTS (/v2/file-events/search) — enabled only if you provide FE_ADV_QUERY_JSON
    
    def pull_file_events(hdrs):
        if not FE_ADV_QUERY_JSON:
            return 0
        try:
            base_query = json.loads(FE_ADV_QUERY_JSON)
        except Exception:
            raise RuntimeError("FE_ADV_QUERY_JSON is not valid JSON")
    
        pages = 0
        next_token = None
        while True:
            body = dict(base_query)
            body["pgSize"] = FE_PAGE_SIZE
            if next_token:
                body["pgToken"] = next_token
            raw = http_post_json("/v2/file-events/search", body, hdrs)
            data = json.loads(raw.decode())
            put_json("file_events/", f"fileevents-page-{pages}", data)
            pages += 1
            next_token = (
                data.get("nextPgToken")
                or data.get("next_page_token")
                or (data.get("file_events") or {}).get("nextPgToken")
            )
            if not next_token:
                break
        return pages
    
    def handler(event, context):
        hdrs = auth_header()
        report = {}
        if "users" in STREAMS:
            report["users_pages"] = pull_users(hdrs)
        if "sessions" in STREAMS:
            report["sessions_pages"] = pull_sessions(hdrs)
        if "audit" in STREAMS:
            report["audit_pages"] = pull_audit(hdrs)
        if "cases" in STREAMS:
            report["cases_pages"] = pull_cases(hdrs)
        if "file_events" in STREAMS:
            report["file_events_pages"] = pull_file_events(hdrs)
        return report
    
    def lambda_handler(event, context):
        return handler(event, context)
    
  11. 按一下 [Deploy] (部署)

建立 EventBridge 排程

  1. 在 AWS 控制台中,前往「Amazon EventBridge」>「規則」
  2. 按一下「建立規則」
  3. 提供下列設定詳細資料:
    • 排程模式:選取「固定費率 (每 1 小時)」
    • 名稱:輸入不重複且有意義的名稱 (例如 code42-incydr-hourly)。
    • 目標:選取「Lambda 函式」並選擇 code42-incydr-pull
  4. 按一下「建立規則」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」
  2. 提供下列設定詳細資料:
    • 使用者:輸入不重複的名稱 (例如 secops-reader)
    • 存取權類型:選取「存取金鑰 - 程式輔助存取」
    • 按一下「建立使用者」
  3. 附加最低讀取權限政策 (自訂):依序選取「使用者」secops-reader「權限」「新增權限」「直接附加政策」「建立政策」
  4. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 將名稱設為 secops-reader-policy

  6. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  7. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  8. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定資訊提供,擷取 Code42 Incydr 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Code42 Incydr Datasets)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Code42 Incydr」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://code42-incydr/code42/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。