收集 Box 協作 JSON 記錄

支援的國家/地區:

本文說明如何使用 AWS S3,透過 LambdaEventBridge 排程,將 Box Collaboration JSON 記錄擷取至 Google Security Operations。剖析器會處理 JSON 格式的 Box 事件記錄,並將其對應至統一資料模型 (UDM)。這項服務會從原始記錄中擷取相關欄位,執行重新命名和合併等資料轉換作業,並在輸出結構化事件資料前,以中繼資訊擴充資料。

事前準備

  • Google SecOps 執行個體
  • Box 的特殊存取權 (管理員和開發人員控制台)
  • 在您打算儲存記錄的相同區域中,具備 AWS (S3、IAM、Lambda、EventBridge) 的特殊權限

設定 Box 開發人員控制台 (用戶端憑證)

  1. 登入 Box 開發人員控制台
  2. 使用伺服器驗證 (用戶端憑證授權) 建立自訂應用程式
  3. 將「Application Access」設為「App + Enterprise Access」
  4. 在「應用程式範圍」中,啟用「管理企業資源」
  5. 管理控制台 > 應用程式 > 自訂應用程式管理工具中,使用用戶端 ID 授權應用程式。
  6. 複製並儲存「用戶端 ID」和「用戶端密鑰」*,並妥善保管。
  7. 依序前往「管理控制台」>「帳戶和帳單」>「帳戶資訊」
  8. 複製並儲存企業 ID,並妥善保管。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照本使用指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 box-collaboration-logs)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 管理控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutBoxObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::box-collaboration-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::box-collaboration-logs/box/collaboration/state.json"
        }
      ]
    }
    
    
    • 如果您輸入的值區名稱不同,請替換 box-collaboration-logs
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  5. 附加新建立的政策。

  6. 為角色命名 WriteBoxToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 box_collaboration_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WriteBoxToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (box_collaboration_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Box Enterprise Events to S3 (no transform)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    TOKEN_URL = "https://api.box.com/oauth2/token"
    EVENTS_URL = "https://api.box.com/2.0/events"
    
    CID         = os.environ["BOX_CLIENT_ID"]
    CSECRET     = os.environ["BOX_CLIENT_SECRET"]
    ENT_ID      = os.environ["BOX_ENTERPRISE_ID"]
    STREAM_TYPE = os.environ.get("STREAM_TYPE", "admin_logs_streaming")
    LIMIT       = int(os.environ.get("LIMIT", "500"))
    BUCKET      = os.environ["S3_BUCKET"]
    PREFIX      = os.environ.get("S3_PREFIX", "box/collaboration/")
    STATE_KEY   = os.environ.get("STATE_KEY", "box/collaboration/state.json")
    
    s3 = boto3.client("s3")
    
    def get_state():
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            data = json.loads(obj["Body"].read())
            return data.get("stream_position")
        except Exception:
            return None
    
    def put_state(pos):
        body = json.dumps({"stream_position": pos}, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def get_token():
        body = urllib.parse.urlencode({
            "grant_type": "client_credentials",
            "client_id": CID,
            "client_secret": CSECRET,
            "box_subject_type": "enterprise",
            "box_subject_id": ENT_ID,
        }).encode()
        req = Request(TOKEN_URL, data=body, method="POST")
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
        with urlopen(req, timeout=30) as r:
            tok = json.loads(r.read().decode())
        return tok["access_token"]
    
    def fetch_events(token, stream_position=None, timeout=60, max_retries=5):
        params = {"stream_type": STREAM_TYPE, "limit": LIMIT, "stream_position": stream_position or "now"}
        qs = urllib.parse.urlencode(params)
        attempt, backoff = 0, 1.0
        while True:
            try:
                req = Request(f"{EVENTS_URL}?{qs}", method="GET")
                req.add_header("Authorization", f"Bearer {token}")
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode())
            except HTTPError as e:
                if e.code == 429 and attempt < max_retries:
                    ra = e.headers.get("Retry-After")
                    delay = int(ra) if (ra and ra.isdigit()) else int(backoff)
                    time.sleep(max(1, delay)); attempt += 1; backoff *= 2; continue
                if 500 <= e.code <= 599 and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def write_chunk(data):
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX}/{ts}-box-events.json"  
        s3.put_object(Bucket=BUCKET, Key=key,
                      Body=json.dumps(data, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")  
        return key
    
    def lambda_handler(event=None, context=None):
        token = get_token()
        pos = get_state()
        total, idx = 0, 0
        while True:
            page = fetch_events(token, pos)
            entries = page.get("entries") or []
            if not entries:
                next_pos = page.get("next_stream_position") or pos
                if next_pos and next_pos != pos:
                    put_state(next_pos)
                break
    
            # уникальный ключ
            ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
            key = f"{PREFIX}/{ts}-box-events-{idx:03d}.json"
            s3.put_object(Bucket=BUCKET, Key=key,
                          Body=json.dumps(page, separators=(",", ":")).encode("utf-8"),
                          ContentType="application/json")
            idx += 1
            total += len(entries)
    
            pos = page.get("next_stream_position") or pos
            if pos:
                put_state(pos)
    
            if len(entries) < LIMIT:
                break
    
        return {"ok": True, "written": total, "next_stream_position": pos}
    
    
  5. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  6. 輸入下列環境變數,並將 換成您的值:

    範例
    S3_BUCKET box-collaboration-logs
    S3_PREFIX box/collaboration/
    STATE_KEY box/collaboration/state.json
    BOX_CLIENT_ID 輸入 Box 用戶端 ID
    BOX_CLIENT_SECRET 輸入 Box 用戶端密鑰
    BOX_ENTERPRISE_ID 輸入 Box 企業 ID
    STREAM_TYPE admin_logs_streaming
    LIMIT 500
  7. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  8. 選取「設定」分頁標籤。

  9. 在「一般設定」面板中,按一下「編輯」

  10. 將「Timeout」(逾時) 變更為「10 minutes (600 seconds)」(10 分鐘 (600 秒)),然後按一下「Save」(儲存)

排定 Lambda 函式 (EventBridge 排程器)

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (15 min)。
    • 目標:您的 Lambda 函式。
    • 名稱box-collaboration-schedule-15min
  3. 按一下「建立時間表」

在 Google SecOps 中設定動態饋給,擷取 Box 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Box Collaboration)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Box」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URI:值區 URI (格式應為 s3://box-collaboration-logs/box/collaboration/)。 取代 box-collaboration-logs:使用值區的實際名稱。
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

UDM 對應表

記錄欄位 UDM 對應 邏輯
additional_details.ekm_id additional.fields 取自 additional_details.ekm_id 的值
additional_details.service_id additional.fields 取自 additional_details.service_id 的值
additional_details.service_name additional.fields 取自 additional_details.service_name 的值
additional_details.shared_link_id additional.fields 取自 additional_details.shared_link_id 的值
additional_details.size target.file.size 取自 additional_details.size 的值
additional_details.version_id additional.fields 取自 additional_details.version_id 的值
created_at metadata.event_timestamp 取自 created_at 的值
created_by.id principal.user.userid 取自 created_by.id 的值
created_by.login principal.user.email_addresses 取自 created_by.login 的值
created_by.name principal.user.user_display_name 取自 created_by.name 的值
event_id metadata.product_log_id 取自 event_id 的值
event_type metadata.product_event_type 從 event_type 取得的值
ip_address principal.ip 取自 ip_address 的值
source.item_id target.file.product_object_id 取自 source.item_id 的值
source.item_name target.file.full_path 取自 source.item_name 的值
source.item_type 未對應
source.login target.user.email_addresses 取自 source.login 的值
source.name target.user.user_display_name 取自 source.name 的值
source.owned_by.id target.user.userid 取自 source.owned_by.id 的值
source.owned_by.login target.user.email_addresses 取自 source.owned_by.login 的值
source.owned_by.name target.user.user_display_name 取自 source.owned_by.name 的值
source.parent.id 未對應
source.parent.name 未對應
source.parent.type 未對應
source.type 未對應
類型 metadata.log_type 從類型取得的值
metadata.vendor_name 硬式編碼值
metadata.product_name 硬式編碼值
security_result.action 衍生自 event_type。如果 event_type 為 FAILED_LOGIN,則為 BLOCK;如果 event_type 為 USER_LOGIN,則為 ALLOW;否則為 UNSPECIFIED。
extensions.auth.type 衍生自 event_type。如果 event_type 為 USER_LOGIN 或 ADMIN_LOGIN,則為 MACHINE,否則為 UNSPECIFIED。
extensions.auth.mechanism 衍生自 event_type。如果 event_type 為 USER_LOGIN 或 ADMIN_LOGIN,則為 USERNAME_PASSWORD,否則為 UNSPECIFIED。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。