收集 Code42 Incydr 核心資料集
支援的國家/地區:
Google SecOps
SIEM
本文說明如何使用 Amazon S3,將 Code42 Incydr 核心資料集 (使用者、工作階段、稽核、案件和檔案事件 (選用)) 擷取至 Google Security Operations。
事前準備
- Google SecOps 執行個體
- Code42 Incydr 的特殊存取權
- AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)
收集來源必要條件 (ID、API 金鑰、機構 ID、權杖)
- 登入 Code42 Incydr 網頁版 UI。
- 依序前往「Administration」(管理) >「Integrations」(整合) >「API Clients」(API 用戶端)。
- 建立新的用戶端。
- 複製下列詳細資料並儲存至安全位置:
- 用戶端 ID。
- 用戶端密鑰。
- 基準網址:(例如
https://api.us.code42.com
、https://api.us2.code42.com
、https://api.ie.code42.com
、https://api.gov.code42.com
)。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 值區:建立值區。
- 儲存 bucket 的「名稱」和「區域」,稍後會用到。
- 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 AWS Lambda,輪詢 Code42 Incydr (不轉換)
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
- 提供下列設定詳細資料:
- 名稱:輸入有意義的不重複名稱 (例如
code42-incydr-pull
) - 執行階段:選取「Python 3.13」。
- 權限:選取具有 s3:PutObject 和 Cloudwatch 的角色。
- 名稱:輸入有意義的不重複名稱 (例如
- 按一下「建立函式」。
- 依序選取「設定」>「一般設定」>「編輯」。
- 設定 Timeout=5m 和 Memory=1024 MB。
- 按一下 [儲存]。
- 依序選取「Configuration」>「Environment variables」>「Edit」>「Add」。
INCYDR_BASE_URL
=https://api.us.code42.com
INCYDR_CLIENT_ID
=<Client ID>
INCYDR_CLIENT_SECRET
=<Client Secret>
S3_BUCKET
=code42-incydr
S3_PREFIX
=code42/
PAGE_SIZE
=500
LOOKBACK_MINUTES
=60
STREAMS
=users,sessions,audit,cases
- 選用:
FE_ADV_QUERY_JSON
= `` - 選用:
FE_PAGE_SIZE
=1000
- 按一下 [儲存]。
選取「程式碼」,然後輸入下列 Python 程式碼:
import base64, json, os, time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode from urllib.request import Request, urlopen import boto3 BASE = os.environ["INCYDR_BASE_URL"].rstrip("/") CID = os.environ["INCYDR_CLIENT_ID"] CSECRET = os.environ["INCYDR_CLIENT_SECRET"] BUCKET = os.environ["S3_BUCKET"] PREFIX_BASE = os.environ.get("S3_PREFIX", "code42/") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500")) LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60")) STREAMS = [s.strip() for s in os.environ.get("STREAMS", "users").split(",") if s.strip()] FE_ADV_QUERY_JSON = os.environ.get("FE_ADV_QUERY_JSON", "").strip() FE_PAGE_SIZE = int(os.environ.get("FE_PAGE_SIZE", "1000")) s3 = boto3.client("s3") def now_utc(): return datetime.now(timezone.utc) def iso_minus(minutes: int): return (now_utc() - timedelta(minutes=minutes)).strftime("%Y-%m-%dT%H:%M:%SZ") def put_bytes(key: str, body: bytes): s3.put_object(Bucket=BUCKET, Key=key, Body=body) def put_json(prefix: str, page_label: str, data): ts = now_utc().strftime("%Y/%m/%d/%H%M%S") key = f"{PREFIX_BASE}{prefix}{ts}-{page_label}.json" put_bytes(key, json.dumps(data).encode("utf-8")) return key def auth_header(): auth = base64.b64encode(f"{CID}:{CSECRET}".encode()).decode() req = Request(f"{BASE}/v1/oauth", data=b"", method="POST") req.add_header("Authorization", f"Basic {auth}") req.add_header("Accept", "application/json") with urlopen(req, timeout=30) as r: data = json.loads(r.read().decode()) return {"Authorization": f"Bearer {data['access_token']}", "Accept": "application/json"} def http_get(path: str, params: dict | None = None, headers: dict | None = None): url = f"{BASE}{path}" if params: url += ("?" + urlencode(params)) req = Request(url, method="GET") for k, v in (headers or {}).items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return r.read() def http_post_json(path: str, body: dict, headers: dict | None = None): url = f"{BASE}{path}" req = Request(url, data=json.dumps(body).encode("utf-8"), method="POST") req.add_header("Content-Type", "application/json") for k, v in (headers or {}).items(): req.add_header(k, v) with urlopen(req, timeout=120) as r: return r.read() # USERS (/v1/users) def pull_users(hdrs): next_token = None pages = 0 while True: params = {"active": "true", "blocked": "false", "pageSize": PAGE_SIZE} if next_token: params["pgToken"] = next_token raw = http_get("/v1/users", params, hdrs) data = json.loads(raw.decode()) put_json("users/", f"users-page-{pages}", data) pages += 1 next_token = data.get("nextPgToken") or data.get("next_pg_token") if not next_token: break return pages # SESSIONS (/v1/sessions) — alerts live inside sessions def pull_sessions(hdrs): start_iso = iso_minus(LOOKBACK_MINUTES) next_token = None pages = 0 while True: params = { "hasAlerts": "true", "startTime": start_iso, "pgSize": PAGE_SIZE, } if next_token: params["pgToken"] = next_token raw = http_get("/v1/sessions", params, hdrs) data = json.loads(raw.decode()) put_json("sessions/", f"sessions-page-{pages}", data) pages += 1 next_token = data.get("nextPgToken") or data.get("next_page_token") if not next_token: break return pages # AUDIT LOG (/v1/audit) — CSV export or paged JSON; write as received def pull_audit(hdrs): start_iso = iso_minus(LOOKBACK_MINUTES) next_token = None pages = 0 while True: params = {"startTime": start_iso, "pgSize": PAGE_SIZE} if next_token: params["pgToken"] = next_token raw = http_get("/v1/audit", params, hdrs) try: data = json.loads(raw.decode()) put_json("audit/", f"audit-page-{pages}", data) next_token = data.get("nextPgToken") or data.get("next_page_token") pages += 1 if not next_token: break except Exception: ts = now_utc().strftime("%Y/%m/%d/%H%M%S") key = f"{PREFIX_BASE}audit/{ts}-audit-export.bin" put_bytes(key, raw) pages += 1 break return pages # CASES (/v1/cases) def pull_cases(hdrs): next_token = None pages = 0 while True: params = {"pgSize": PAGE_SIZE} if next_token: params["pgToken"] = next_token raw = http_get("/v1/cases", params, hdrs) data = json.loads(raw.decode()) put_json("cases/", f"cases-page-{pages}", data) pages += 1 next_token = data.get("nextPgToken") or data.get("next_page_token") if not next_token: break return pages # FILE EVENTS (/v2/file-events/search) — enabled only if you provide FE_ADV_QUERY_JSON def pull_file_events(hdrs): if not FE_ADV_QUERY_JSON: return 0 try: base_query = json.loads(FE_ADV_QUERY_JSON) except Exception: raise RuntimeError("FE_ADV_QUERY_JSON is not valid JSON") pages = 0 next_token = None while True: body = dict(base_query) body["pgSize"] = FE_PAGE_SIZE if next_token: body["pgToken"] = next_token raw = http_post_json("/v2/file-events/search", body, hdrs) data = json.loads(raw.decode()) put_json("file_events/", f"fileevents-page-{pages}", data) pages += 1 next_token = ( data.get("nextPgToken") or data.get("next_page_token") or (data.get("file_events") or {}).get("nextPgToken") ) if not next_token: break return pages def handler(event, context): hdrs = auth_header() report = {} if "users" in STREAMS: report["users_pages"] = pull_users(hdrs) if "sessions" in STREAMS: report["sessions_pages"] = pull_sessions(hdrs) if "audit" in STREAMS: report["audit_pages"] = pull_audit(hdrs) if "cases" in STREAMS: report["cases_pages"] = pull_cases(hdrs) if "file_events" in STREAMS: report["file_events_pages"] = pull_file_events(hdrs) return report def lambda_handler(event, context): return handler(event, context)
按一下 [Deploy] (部署)。
建立 EventBridge 排程
- 在 AWS 控制台中,前往「Amazon EventBridge」>「規則」。
- 按一下「建立規則」。
- 提供下列設定詳細資料:
- 排程模式:選取「固定費率 (每
1
小時)」。 - 名稱:輸入不重複且有意義的名稱 (例如
code42-incydr-hourly
)。 - 目標:選取「Lambda 函式」並選擇
code42-incydr-pull
。
- 排程模式:選取「固定費率 (每
- 按一下「建立規則」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」。
- 提供下列設定詳細資料:
- 使用者:輸入不重複的名稱 (例如
secops-reader
) - 存取權類型:選取「存取金鑰 - 程式輔助存取」
- 按一下「建立使用者」。
- 使用者:輸入不重複的名稱 (例如
- 附加最低讀取權限政策 (自訂):依序選取「使用者」
secops-reader
「權限」「新增權限」「直接附加政策」「建立政策」 在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定資訊提供,擷取 Code42 Incydr 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Code42 Incydr Datasets
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Code42 Incydr」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://code42-incydr/code42/
- 來源刪除選項:根據偏好選取刪除選項。
- 檔案存在時間上限:預設為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:要套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」。
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。