收集 Jamf Pro 環境記錄

支援的國家/地區:

本文說明如何使用 AWS S3,透過 LambdaEventBridge 排程,將 Jamf Pro 情境記錄 (裝置和使用者情境) 擷取至 Google Security Operations。

事前準備

  • Google SecOps 執行個體
  • Jamf Pro 租戶的特殊存取權
  • AWS (S3、IAM、Lambda、EventBridge) 的具備權限存取權

設定 Jamf API 角色

  1. 登入 Jamf 網頁版使用者介面。
  2. 依序前往「設定」>「系統」部分 >「API 角色和用戶端」
  3. 選取「API 角色」分頁標籤。
  4. 點按「New」(新增)
  5. 輸入 API 角色顯示名稱 (例如 context_role)。
  6. 在「Jamf Pro API role privileges」(Jamf Pro API 角色權限) 中輸入權限名稱,然後從選單中選取該權限。

    • 電腦清單
    • 行動裝置清單
  7. 按一下 [儲存]

設定 Jamf API 用戶端

  1. Jamf Pro 中,依序前往「設定」>「系統」部分 >「API 角色和用戶端」
  2. 選取「API 用戶端」分頁標籤。
  3. 點按「New」(新增)
  4. 輸入 API 用戶端的顯示名稱 (例如 context_client)。
  5. 在「API Roles」欄位中,新增您先前建立的 context_role 角色。
  6. 在「存取權杖有效時間」下方,輸入存取權杖的有效時間 (以秒為單位)。
  7. 按一下 [儲存]
  8. 按一下 [編輯]
  9. 按一下「啟用 API 用戶端」
  10. 按一下 [儲存]

設定 Jamf 用戶端密鑰

  1. Jamf Pro 中,前往新建立的 API 用戶端
  2. 按一下「產生用戶端密鑰」
  3. 在確認畫面中,按一下「建立密鑰」
  4. 將下列參數儲存在安全的位置:
    • 基本網址https://<your>.jamfcloud.com
    • 用戶端 ID:UUID。
    • 用戶端密鑰:這個值只會顯示一次。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 jamfpro)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 政策 JSON (如果您輸入的 bucket 名稱不同,請替換 jamfpro):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutJamfObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::jamfpro/*"
        }
      ]
    }
    
  2. 依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁

  3. 複製並貼上政策。

  4. 依序點選「Next」>「Create policy」

  5. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  6. 附加新建立的政策。

  7. 為角色命名 WriteJamfToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:
設定
名稱 jamf_pro_to_s3
執行階段 Python 3.13
架構 x86_64
權限 WriteJamfToS3Role
  1. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (jamf_pro_to_s3.py):

    import os
    import io
    import json
    import gzip
    import time
    import logging
    from datetime import datetime, timezone
    
    import boto3
    import requests
    
    log = logging.getLogger()
    log.setLevel(logging.INFO)
    
    BASE_URL = os.environ.get("JAMF_BASE_URL", "").rstrip("/")
    CLIENT_ID = os.environ.get("JAMF_CLIENT_ID")
    CLIENT_SECRET = os.environ.get("JAMF_CLIENT_SECRET")
    S3_BUCKET = os.environ.get("S3_BUCKET")
    S3_PREFIX = os.environ.get("S3_PREFIX", "jamf-pro/context/")
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "200"))
    
    SECTIONS = [
        "GENERAL",
        "HARDWARE",
        "OPERATING_SYSTEM",
        "USER_AND_LOCATION",
        "DISK_ENCRYPTION",
        "SECURITY",
        "EXTENSION_ATTRIBUTES",
        "APPLICATIONS",
        "CONFIGURATION_PROFILES",
        "LOCAL_USER_ACCOUNTS",
        "CERTIFICATES",
        "SERVICES",
        "PRINTERS",
        "SOFTWARE_UPDATES",
        "GROUP_MEMBERSHIPS",
        "CONTENT_CACHING",
        "STORAGE",
        "FONTS",
        "PACKAGE_RECEIPTS",
        "PLUGINS",
        "ATTACHMENTS",
        "LICENSED_SOFTWARE",
        "IBEACONS",
        "PURCHASING",
    ]
    
    s3 = boto3.client("s3")
    
    def _now_iso():
        return datetime.now(timezone.utc).isoformat()
    
    def get_token():
        """OAuth2 client credentials > access_token"""
        url = f"{BASE_URL}/api/oauth/token"
        data = {
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        r = requests.post(url, data=data, headers=headers, timeout=30)
        r.raise_for_status()
        j = r.json()
        return j["access_token"], int(j.get("expires_in", 1200))
    
    def fetch_page(token: str, page: int):
        """GET /api/v1/computers-inventory with sections & pagination"""
        url = f"{BASE_URL}/api/v1/computers-inventory"
        params = [("page", page), ("page-size", PAGE_SIZE)] + [("section", s) for s in SECTIONS]
        hdrs = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        r = requests.get(url, params=params, headers=hdrs, timeout=60)
        r.raise_for_status()
        return r.json()
    
    def to_context_event(item: dict) -> dict:
        inv = item.get("inventory", {}) or {}
        general = inv.get("general", {}) or {}
        hardware = inv.get("hardware", {}) or {}
        osinfo = inv.get("operatingSystem", {}) or {}
        loc = inv.get("location", {}) or inv.get("userAndLocation", {}) or {}
    
        computer = {
            "udid": general.get("udid") or hardware.get("udid"),
            "deviceName": general.get("name") or general.get("deviceName"),
            "serialNumber": hardware.get("serialNumber") or general.get("serialNumber"),
            "model": hardware.get("model") or general.get("model"),
            "osVersion": osinfo.get("version") or general.get("osVersion"),
            "osBuild": osinfo.get("build") or general.get("osBuild"),
            "macAddress": hardware.get("macAddress"),
            "alternateMacAddress": hardware.get("wifiMacAddress"),
            "ipAddress": general.get("ipAddress"),
            "reportedIpV4Address": general.get("reportedIpV4Address"),
            "reportedIpV6Address": general.get("reportedIpV6Address"),
            "modelIdentifier": hardware.get("modelIdentifier"),
            "assetTag": general.get("assetTag"),
        }
    
        user_block = {
            "userDirectoryID": loc.get("username") or loc.get("userDirectoryId"),
            "emailAddress": loc.get("emailAddress"),
            "realName": loc.get("realName"),
            "phone": loc.get("phone") or loc.get("phoneNumber"),
            "position": loc.get("position"),
            "department": loc.get("department"),
            "building": loc.get("building"),
            "room": loc.get("room"),
        }
    
        return {
            "webhook": {"name": "api.inventory"},
            "event_type": "ComputerInventory",
            "event_action": "snapshot",
            "event_timestamp": _now_iso(),
            "event_data": {
                "computer": {k: v for k, v in computer.items() if v not in (None, "")},
                **{k: v for k, v in user_block.items() if v not in (None, "")},
            },
            "_jamf": {
                "id": item.get("id"),
                "inventory": inv,
            },
        }
    
    def write_ndjson_gz(objs, when: datetime):
        buf = io.BytesIO()
        with gzip.GzipFile(filename="-", mode="wb", fileobj=buf, mtime=int(time.time())) as gz:
            for obj in objs:
                line = json.dumps(obj, separators=(",", ":")) + "\n"
                gz.write(line.encode("utf-8"))
        buf.seek(0)
    
        prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else ""
        key = f"{prefix}{when:%Y/%m/%d}/jamf_pro_context_{int(when.timestamp())}.ndjson.gz"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=buf.getvalue())
        return key
    
    def lambda_handler(event, context):
        assert BASE_URL and CLIENT_ID and CLIENT_SECRET and S3_BUCKET, "Missing required env vars"
    
        token, _ttl = get_token()
        page = 0
        total = 0
        batch = []
        now = datetime.now(timezone.utc)
    
        while True:
            payload = fetch_page(token, page)
            results = payload.get("results") or payload.get("computerInventoryList") or []
            if not results:
                break
    
            for item in results:
                batch.append(to_context_event(item))
                total += 1
    
            if len(batch) >= 5000:
                key = write_ndjson_gz(batch, now)
                log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key)
                batch = []
    
            if len(results) < PAGE_SIZE:
                break
            page += 1
    
        if batch:
            key = write_ndjson_gz(batch, now)
            log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key)
    
        return {"ok": True, "count": total}
    
  2. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  3. 輸入下列環境變數,並將 換成您的值。

    環境變數

    範例
    S3_BUCKET jamfpro
    S3_PREFIX jamf-pro/context/
    AWS_REGION 選取你的所在地區
    JAMF_CLIENT_ID 輸入 Jamf 用戶端 ID
    JAMF_CLIENT_SECRET 輸入 Jamf 用戶端密鑰
    JAMF_BASE_URL 輸入 Jamf 網址,將 https://<your>.jamfcloud.com 中的 <your> 取代為
    PAGE_SIZE 200
  4. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  5. 選取「設定」分頁標籤。

  6. 在「一般設定」面板中,按一下「編輯」

  7. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表:費率 (1 hour)。
    • 目標:您的 Lambda 函式。
    • 名稱:jamfpro-context-schedule-1h
  3. 按一下「建立時間表」

在 Google SecOps 中設定資訊提供,擷取 Jamf Pro 內容記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Jamf Pro Context logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Jamf Pro 內容」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URI:值區 URI
      • s3://jamfpro/jamf-pro/context/
        • 請將 jamfpro 替換為實際值區名稱。
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。