收集 Jamf Pro 環境記錄
本文說明如何使用 AWS S3,透過 Lambda 和 EventBridge 排程,將 Jamf Pro 情境記錄 (裝置和使用者情境) 擷取至 Google Security Operations。
事前準備
- Google SecOps 執行個體
- Jamf Pro 租戶的特殊存取權
- AWS (S3、IAM、Lambda、EventBridge) 的具備權限存取權
設定 Jamf API 角色
- 登入 Jamf 網頁版使用者介面。
- 依序前往「設定」>「系統」部分 >「API 角色和用戶端」。
- 選取「API 角色」分頁標籤。
- 點按「New」(新增)。
- 輸入 API 角色顯示名稱 (例如
context_role
)。 在「Jamf Pro API role privileges」(Jamf Pro API 角色權限) 中輸入權限名稱,然後從選單中選取該權限。
- 電腦清單
- 行動裝置清單
按一下 [儲存]。
設定 Jamf API 用戶端
- 在 Jamf Pro 中,依序前往「設定」>「系統」部分 >「API 角色和用戶端」。
- 選取「API 用戶端」分頁標籤。
- 點按「New」(新增)。
- 輸入 API 用戶端的顯示名稱 (例如
context_client
)。 - 在「API Roles」欄位中,新增您先前建立的
context_role
角色。 - 在「存取權杖有效時間」下方,輸入存取權杖的有效時間 (以秒為單位)。
- 按一下 [儲存]。
- 按一下 [編輯]。
- 按一下「啟用 API 用戶端」。
- 按一下 [儲存]。
設定 Jamf 用戶端密鑰
- 在 Jamf Pro 中,前往新建立的 API 用戶端。
- 按一下「產生用戶端密鑰」。
- 在確認畫面中,按一下「建立密鑰」。
- 將下列參數儲存在安全的位置:
- 基本網址:
https://<your>.jamfcloud.com
- 用戶端 ID:UUID。
- 用戶端密鑰:這個值只會顯示一次。
- 基本網址:
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 值區:建立值區。
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
jamfpro
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
政策 JSON (如果您輸入的 bucket 名稱不同,請替換
jamfpro
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutJamfObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::jamfpro/*" } ] }
依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁。
複製並貼上政策。
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
WriteJamfToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
- 請提供下列設定詳細資料:
設定 | 值 |
---|---|
名稱 | jamf_pro_to_s3 |
執行階段 | Python 3.13 |
架構 | x86_64 |
權限 | WriteJamfToS3Role |
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
jamf_pro_to_s3.py
):import os import io import json import gzip import time import logging from datetime import datetime, timezone import boto3 import requests log = logging.getLogger() log.setLevel(logging.INFO) BASE_URL = os.environ.get("JAMF_BASE_URL", "").rstrip("/") CLIENT_ID = os.environ.get("JAMF_CLIENT_ID") CLIENT_SECRET = os.environ.get("JAMF_CLIENT_SECRET") S3_BUCKET = os.environ.get("S3_BUCKET") S3_PREFIX = os.environ.get("S3_PREFIX", "jamf-pro/context/") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "200")) SECTIONS = [ "GENERAL", "HARDWARE", "OPERATING_SYSTEM", "USER_AND_LOCATION", "DISK_ENCRYPTION", "SECURITY", "EXTENSION_ATTRIBUTES", "APPLICATIONS", "CONFIGURATION_PROFILES", "LOCAL_USER_ACCOUNTS", "CERTIFICATES", "SERVICES", "PRINTERS", "SOFTWARE_UPDATES", "GROUP_MEMBERSHIPS", "CONTENT_CACHING", "STORAGE", "FONTS", "PACKAGE_RECEIPTS", "PLUGINS", "ATTACHMENTS", "LICENSED_SOFTWARE", "IBEACONS", "PURCHASING", ] s3 = boto3.client("s3") def _now_iso(): return datetime.now(timezone.utc).isoformat() def get_token(): """OAuth2 client credentials > access_token""" url = f"{BASE_URL}/api/oauth/token" data = { "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, } headers = {"Content-Type": "application/x-www-form-urlencoded"} r = requests.post(url, data=data, headers=headers, timeout=30) r.raise_for_status() j = r.json() return j["access_token"], int(j.get("expires_in", 1200)) def fetch_page(token: str, page: int): """GET /api/v1/computers-inventory with sections & pagination""" url = f"{BASE_URL}/api/v1/computers-inventory" params = [("page", page), ("page-size", PAGE_SIZE)] + [("section", s) for s in SECTIONS] hdrs = {"Authorization": f"Bearer {token}", "Accept": "application/json"} r = requests.get(url, params=params, headers=hdrs, timeout=60) r.raise_for_status() return r.json() def to_context_event(item: dict) -> dict: inv = item.get("inventory", {}) or {} general = inv.get("general", {}) or {} hardware = inv.get("hardware", {}) or {} osinfo = inv.get("operatingSystem", {}) or {} loc = inv.get("location", {}) or inv.get("userAndLocation", {}) or {} computer = { "udid": general.get("udid") or hardware.get("udid"), "deviceName": general.get("name") or general.get("deviceName"), "serialNumber": hardware.get("serialNumber") or general.get("serialNumber"), "model": hardware.get("model") or general.get("model"), "osVersion": osinfo.get("version") or general.get("osVersion"), "osBuild": osinfo.get("build") or general.get("osBuild"), "macAddress": hardware.get("macAddress"), "alternateMacAddress": hardware.get("wifiMacAddress"), "ipAddress": general.get("ipAddress"), "reportedIpV4Address": general.get("reportedIpV4Address"), "reportedIpV6Address": general.get("reportedIpV6Address"), "modelIdentifier": hardware.get("modelIdentifier"), "assetTag": general.get("assetTag"), } user_block = { "userDirectoryID": loc.get("username") or loc.get("userDirectoryId"), "emailAddress": loc.get("emailAddress"), "realName": loc.get("realName"), "phone": loc.get("phone") or loc.get("phoneNumber"), "position": loc.get("position"), "department": loc.get("department"), "building": loc.get("building"), "room": loc.get("room"), } return { "webhook": {"name": "api.inventory"}, "event_type": "ComputerInventory", "event_action": "snapshot", "event_timestamp": _now_iso(), "event_data": { "computer": {k: v for k, v in computer.items() if v not in (None, "")}, **{k: v for k, v in user_block.items() if v not in (None, "")}, }, "_jamf": { "id": item.get("id"), "inventory": inv, }, } def write_ndjson_gz(objs, when: datetime): buf = io.BytesIO() with gzip.GzipFile(filename="-", mode="wb", fileobj=buf, mtime=int(time.time())) as gz: for obj in objs: line = json.dumps(obj, separators=(",", ":")) + "\n" gz.write(line.encode("utf-8")) buf.seek(0) prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else "" key = f"{prefix}{when:%Y/%m/%d}/jamf_pro_context_{int(when.timestamp())}.ndjson.gz" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=buf.getvalue()) return key def lambda_handler(event, context): assert BASE_URL and CLIENT_ID and CLIENT_SECRET and S3_BUCKET, "Missing required env vars" token, _ttl = get_token() page = 0 total = 0 batch = [] now = datetime.now(timezone.utc) while True: payload = fetch_page(token, page) results = payload.get("results") or payload.get("computerInventoryList") or [] if not results: break for item in results: batch.append(to_context_event(item)) total += 1 if len(batch) >= 5000: key = write_ndjson_gz(batch, now) log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key) batch = [] if len(results) < PAGE_SIZE: break page += 1 if batch: key = write_ndjson_gz(batch, now) log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key) return {"ok": True, "count": total}
依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」。
輸入下列環境變數,並將 換成您的值。
環境變數
鍵 範例 S3_BUCKET
jamfpro
S3_PREFIX
jamf-pro/context/
AWS_REGION
選取你的所在地區 JAMF_CLIENT_ID
輸入 Jamf 用戶端 ID JAMF_CLIENT_SECRET
輸入 Jamf 用戶端密鑰 JAMF_BASE_URL
輸入 Jamf 網址,將 https://<your>.jamfcloud.com
中的<your>
取代為PAGE_SIZE
200
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式。
- 名稱:
jamfpro-context-schedule-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
在 Google SecOps 中設定資訊提供,擷取 Jamf Pro 內容記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Jamf Pro Context logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Jamf Pro 內容」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:值區 URI
s3://jamfpro/jamf-pro/context/
- 請將
jamfpro
替換為實際值區名稱。
- 請將
- 來源刪除選項:根據偏好選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:要套用至這個動態饋給事件的標籤。
- S3 URI:值區 URI
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。