收集 Code42 Incydr 核心数据集
支持的语言:
Google SecOps
SIEM
本文档介绍了如何使用 Amazon S3 将 Code42 Incydr 核心数据集(用户、会话、审核、支持请求和可选的文件事件)注入到 Google Security Operations。
准备工作
- Google SecOps 实例
- 对 Code42 Incydr 的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
收集源前提条件(ID、API 密钥、组织 ID、令牌)
- 登录 Code42 Incydr 网页界面。
- 依次前往管理 > 集成 > API 客户端。
- 创建新客户端。
- 请复制以下详细信息并将其保存在安全的位置:
- 客户端 ID。
- 客户端密钥。
- 基础网址(例如
https://api.us.code42.com
、https://api.us2.code42.com
、https://api.ie.code42.com
、https://api.gov.code42.com
)。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶。
- 保存存储桶名称和区域以备后用。
- 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
设置 AWS Lambda 以轮询 Code42 Incydr(无转换)
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
- 提供以下配置详细信息:
- 名称:输入一个唯一且有意义的名称(例如
code42-incydr-pull
) - 运行时:选择 Python 3.13。
- 权限:选择具有 s3:PutObject 和 Cloudwatch 的角色。
- 名称:输入一个唯一且有意义的名称(例如
- 点击创建函数。
- 依次选择配置 > 常规配置 > 修改。
- 配置 Timeout=5m 和 Memory=1024 MB。
- 点击保存。
- 依次选择配置 > 环境变量 > 修改 > 添加。
INCYDR_BASE_URL
=https://api.us.code42.com
INCYDR_CLIENT_ID
=<Client ID>
INCYDR_CLIENT_SECRET
=<Client Secret>
S3_BUCKET
=code42-incydr
S3_PREFIX
=code42/
PAGE_SIZE
=500
LOOKBACK_MINUTES
=60
STREAMS
=users,sessions,audit,cases
- 可选:
FE_ADV_QUERY_JSON
= `` - 可选:
FE_PAGE_SIZE
=1000
- 点击保存。
选择 Code,然后输入以下 Python 代码:
import base64, json, os, time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode from urllib.request import Request, urlopen import boto3 BASE = os.environ["INCYDR_BASE_URL"].rstrip("/") CID = os.environ["INCYDR_CLIENT_ID"] CSECRET = os.environ["INCYDR_CLIENT_SECRET"] BUCKET = os.environ["S3_BUCKET"] PREFIX_BASE = os.environ.get("S3_PREFIX", "code42/") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500")) LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60")) STREAMS = [s.strip() for s in os.environ.get("STREAMS", "users").split(",") if s.strip()] FE_ADV_QUERY_JSON = os.environ.get("FE_ADV_QUERY_JSON", "").strip() FE_PAGE_SIZE = int(os.environ.get("FE_PAGE_SIZE", "1000")) s3 = boto3.client("s3") def now_utc(): return datetime.now(timezone.utc) def iso_minus(minutes: int): return (now_utc() - timedelta(minutes=minutes)).strftime("%Y-%m-%dT%H:%M:%SZ") def put_bytes(key: str, body: bytes): s3.put_object(Bucket=BUCKET, Key=key, Body=body) def put_json(prefix: str, page_label: str, data): ts = now_utc().strftime("%Y/%m/%d/%H%M%S") key = f"{PREFIX_BASE}{prefix}{ts}-{page_label}.json" put_bytes(key, json.dumps(data).encode("utf-8")) return key def auth_header(): auth = base64.b64encode(f"{CID}:{CSECRET}".encode()).decode() req = Request(f"{BASE}/v1/oauth", data=b"", method="POST") req.add_header("Authorization", f"Basic {auth}") req.add_header("Accept", "application/json") with urlopen(req, timeout=30) as r: data = json.loads(r.read().decode()) return {"Authorization": f"Bearer {data['access_token']}", "Accept": "application/json"} def http_get(path: str, params: dict | None = None, headers: dict | None = None): url = f"{BASE}{path}" if params: url += ("?" + urlencode(params)) req = Request(url, method="GET") for k, v in (headers or {}).items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return r.read() def http_post_json(path: str, body: dict, headers: dict | None = None): url = f"{BASE}{path}" req = Request(url, data=json.dumps(body).encode("utf-8"), method="POST") req.add_header("Content-Type", "application/json") for k, v in (headers or {}).items(): req.add_header(k, v) with urlopen(req, timeout=120) as r: return r.read() # USERS (/v1/users) def pull_users(hdrs): next_token = None pages = 0 while True: params = {"active": "true", "blocked": "false", "pageSize": PAGE_SIZE} if next_token: params["pgToken"] = next_token raw = http_get("/v1/users", params, hdrs) data = json.loads(raw.decode()) put_json("users/", f"users-page-{pages}", data) pages += 1 next_token = data.get("nextPgToken") or data.get("next_pg_token") if not next_token: break return pages # SESSIONS (/v1/sessions) — alerts live inside sessions def pull_sessions(hdrs): start_iso = iso_minus(LOOKBACK_MINUTES) next_token = None pages = 0 while True: params = { "hasAlerts": "true", "startTime": start_iso, "pgSize": PAGE_SIZE, } if next_token: params["pgToken"] = next_token raw = http_get("/v1/sessions", params, hdrs) data = json.loads(raw.decode()) put_json("sessions/", f"sessions-page-{pages}", data) pages += 1 next_token = data.get("nextPgToken") or data.get("next_page_token") if not next_token: break return pages # AUDIT LOG (/v1/audit) — CSV export or paged JSON; write as received def pull_audit(hdrs): start_iso = iso_minus(LOOKBACK_MINUTES) next_token = None pages = 0 while True: params = {"startTime": start_iso, "pgSize": PAGE_SIZE} if next_token: params["pgToken"] = next_token raw = http_get("/v1/audit", params, hdrs) try: data = json.loads(raw.decode()) put_json("audit/", f"audit-page-{pages}", data) next_token = data.get("nextPgToken") or data.get("next_page_token") pages += 1 if not next_token: break except Exception: ts = now_utc().strftime("%Y/%m/%d/%H%M%S") key = f"{PREFIX_BASE}audit/{ts}-audit-export.bin" put_bytes(key, raw) pages += 1 break return pages # CASES (/v1/cases) def pull_cases(hdrs): next_token = None pages = 0 while True: params = {"pgSize": PAGE_SIZE} if next_token: params["pgToken"] = next_token raw = http_get("/v1/cases", params, hdrs) data = json.loads(raw.decode()) put_json("cases/", f"cases-page-{pages}", data) pages += 1 next_token = data.get("nextPgToken") or data.get("next_page_token") if not next_token: break return pages # FILE EVENTS (/v2/file-events/search) — enabled only if you provide FE_ADV_QUERY_JSON def pull_file_events(hdrs): if not FE_ADV_QUERY_JSON: return 0 try: base_query = json.loads(FE_ADV_QUERY_JSON) except Exception: raise RuntimeError("FE_ADV_QUERY_JSON is not valid JSON") pages = 0 next_token = None while True: body = dict(base_query) body["pgSize"] = FE_PAGE_SIZE if next_token: body["pgToken"] = next_token raw = http_post_json("/v2/file-events/search", body, hdrs) data = json.loads(raw.decode()) put_json("file_events/", f"fileevents-page-{pages}", data) pages += 1 next_token = ( data.get("nextPgToken") or data.get("next_page_token") or (data.get("file_events") or {}).get("nextPgToken") ) if not next_token: break return pages def handler(event, context): hdrs = auth_header() report = {} if "users" in STREAMS: report["users_pages"] = pull_users(hdrs) if "sessions" in STREAMS: report["sessions_pages"] = pull_sessions(hdrs) if "audit" in STREAMS: report["audit_pages"] = pull_audit(hdrs) if "cases" in STREAMS: report["cases_pages"] = pull_cases(hdrs) if "file_events" in STREAMS: report["file_events_pages"] = pull_file_events(hdrs) return report def lambda_handler(event, context): return handler(event, context)
点击部署。
创建 EventBridge 计划
- 在 AWS 控制台中,依次前往 Amazon EventBridge > 规则。
- 点击创建规则。
- 提供以下配置详细信息:
- 安排时间模式:选择固定费率(每
1
小时)。 - 名称:输入一个有意义的唯一名称(例如
code42-incydr-hourly
)。 - 目标:选择 Lambda 函数,然后选择
code42-incydr-pull
。
- 安排时间模式:选择固定费率(每
- 点击创建规则
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > Users,然后点击 Add users。
- 提供以下配置详细信息:
- 用户:输入唯一名称(例如
secops-reader
) - 访问类型:选择访问密钥 - 以程序化方式访问
- 点击创建用户。
- 用户:输入唯一名称(例如
- 附加最低限度的读取政策(自定义):用户 > 选择
secops-reader
> 权限 > 添加权限 > 直接附加政策 > 创建政策 在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值将输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Code42 Incydr 日志
- 依次前往 SIEM 设置> Feed。
- 点击添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Code42 Incydr Datasets
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Code42 Incydr 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://code42-incydr/code42/
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件最长保留时间:默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:要应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。