收集 Zoom 作業記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Zoom 作業記錄擷取至 Google Security Operations。剖析器會將原始記錄轉換為統一資料模型 (UDM)。這項功能會從原始記錄訊息中擷取欄位、執行資料清理和正規化作業,並將擷取的資訊對應至相應的 UDM 欄位,最終在 SIEM 系統中擴充資料,以利分析和關聯。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • Zoom 的特殊存取權
  • AWS (S3、IAM、Lambda、EventBridge) 的具備權限存取權

收集 Zoom 作業記錄的必要條件 (ID、API 金鑰、機構 ID、權杖)

  1. 登入 Zoom 應用程式市集
  2. 依序前往「Develop」(開發) >「Build App」(建立應用程式) >「Server-to-Server OAuth」(伺服器對伺服器 OAuth)
  3. 建立應用程式並新增下列範圍:report:read:operation_logs:admin (或 report:read:admin)。
  4. 在「應用程式憑證」中,複製並將下列詳細資料儲存在安全位置:
    • 帳戶 ID
    • 用戶端 ID
    • 用戶端密鑰

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 zoom-operation-logs)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutZoomOperationLogs",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/*"
        },
        {
          "Sid": "AllowStateReadWrite",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/state.json"
        }
      ]
    }
    
    • 如果您輸入的值區名稱不同,請替換 zoom-operation-logs
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  5. 附加新建立的政策。

  6. 為角色命名 WriteZoomOperationLogsToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:
設定
名稱 zoom_operationlogs_to_s3
執行階段 Python 3.13
架構 x86_64
執行角色 WriteZoomOperationLogsToS3Role
  1. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼(zoom_operationlogs_to_s3.py):

    #!/usr/bin/env python3
    import os, json, gzip, io, uuid, datetime as dt, base64, urllib.parse, urllib.request
    import boto3
    
    # ---- Environment ----
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "zoom/operationlogs/")
    STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json")
    ZOOM_ACCOUNT_ID = os.environ["ZOOM_ACCOUNT_ID"]
    ZOOM_CLIENT_ID = os.environ["ZOOM_CLIENT_ID"]
    ZOOM_CLIENT_SECRET = os.environ["ZOOM_CLIENT_SECRET"]
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "300"))  # API default 30; max may vary
    TIMEOUT = int(os.environ.get("TIMEOUT", "30"))
    
    TOKEN_URL = "https://zoom.us/oauth/token"
    REPORT_URL = "https://api.zoom.us/v2/report/operationlogs"
    
    s3 = boto3.client("s3")
    
    # ---- Helpers ----
    
    def _http(req: urllib.request.Request):
        return urllib.request.urlopen(req, timeout=TIMEOUT)
    
    def get_token() -> str:
        params = urllib.parse.urlencode({
            "grant_type": "account_credentials",
            "account_id": ZOOM_ACCOUNT_ID,
        }).encode()
        basic = base64.b64encode(f"{ZOOM_CLIENT_ID}:{ZOOM_CLIENT_SECRET}".encode()).decode()
        req = urllib.request.Request(
            TOKEN_URL,
            data=params,
            headers={
                "Authorization": f"Basic {basic}",
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": "application/json",
                "Host": "zoom.us",
            },
            method="POST",
        )
        with _http(req) as r:
            body = json.loads(r.read())
            return body["access_token"]
    
    def get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            # initial state: start today
            today = dt.date.today().isoformat()
            return {"cursor_date": today, "next_page_token": None}
    
    def put_state(state: dict):
        state["updated_at"] = dt.datetime.utcnow().isoformat() + "Z"
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode())
    
    def write_chunk(items: list[dict], ts: dt.datetime) -> str:
        key = f"{S3_PREFIX}{ts:%Y/%m/%d}/zoom-operationlogs-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for rec in items:
                gz.write((json.dumps(rec) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(buf, S3_BUCKET, key)
        return key
    
    def fetch_page(token: str, from_date: str, to_date: str, next_page_token: str | None) -> dict:
        q = {
            "from": from_date,
            "to": to_date,
            "page_size": str(PAGE_SIZE),
        }
        if next_page_token:
            q["next_page_token"] = next_page_token
        url = REPORT_URL + "?" + urllib.parse.urlencode(q)
        req = urllib.request.Request(url, headers={
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        })
        with _http(req) as r:
            return json.loads(r.read())
    
    def lambda_handler(event=None, context=None):
        token = get_token()
        state = get_state()
    
        cursor_date = state.get("cursor_date")  # YYYY-MM-DD
        # API requires from/to in yyyy-mm-dd, max one month per request
        from_date = cursor_date
        to_date = cursor_date
    
        total_written = 0
        next_token = state.get("next_page_token")
    
        while True:
            page = fetch_page(token, from_date, to_date, next_token)
            items = page.get("operation_logs", []) or []
            if items:
                write_chunk(items, dt.datetime.utcnow())
                total_written += len(items)
            next_token = page.get("next_page_token")
            if not next_token:
                break
    
        # Advance to next day if we've finished this date
        today = dt.date.today().isoformat()
        if cursor_date < today:
            nxt = (dt.datetime.fromisoformat(cursor_date) + dt.timedelta(days=1)).date().isoformat()
            state["cursor_date"] = nxt
            state["next_page_token"] = None
        else:
            # stay on today; continue later with next_page_token=None
            state["next_page_token"] = None
    
        put_state(state)
        return {"ok": True, "written": total_written, "date": from_date}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  3. 輸入下列環境變數,並將 換成您的值:

    範例值
    S3_BUCKET zoom-operation-logs
    S3_PREFIX zoom/operationlogs/
    STATE_KEY zoom/operationlogs/state.json
    ZOOM_ACCOUNT_ID <your-zoom-account-id>
    ZOOM_CLIENT_ID <your-zoom-client-id>
    ZOOM_CLIENT_SECRET <your-zoom-client-secret>
    PAGE_SIZE 300
    TIMEOUT 30
  4. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  5. 選取「設定」分頁標籤。

  6. 在「一般設定」面板中,按一下「編輯」

  7. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 前往 Amazon EventBridge > Scheduler
  2. 按一下「建立時間表」
  3. 提供下列設定詳細資料:
    • 週期性時間表費率 (15 min)。
    • 目標:您的 Lambda 函式 zoom_operationlogs_to_s3
    • 名稱zoom-operationlogs-schedule-15min
  4. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. AWS 控制台中,依序前往「IAM」>「Users」>「Add users」
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者secops-reader
    • 存取類型存取金鑰 - 程式輔助存取
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」
  6. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::zoom-operation-logs"
        }
      ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定動態饋給,擷取 Zoom 作業記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Zoom Operation Logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Zoom Operation Logs」(Zoom 作業記錄) 做為「Log type」(記錄類型)。
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://zoom-operation-logs/zoom/operationlogs/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
動作 metadata.product_event_type 原始記錄欄位「action」會對應至這個 UDM 欄位。
category_type additional.fields.key 原始記錄欄位「category_type」會對應至這個 UDM 欄位。
category_type additional.fields.value.string_value 原始記錄欄位「category_type」會對應至這個 UDM 欄位。
部門 target.user.department 原始記錄欄位「Department」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
說明 target.user.role_description 原始記錄欄位「Description」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
顯示名稱 target.user.user_display_name 原始記錄欄位「顯示名稱」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
電子郵件地址 target.user.email_addresses 原始記錄欄位「電子郵件地址」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
名字 target.user.first_name 原始記錄欄位「名字」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
職稱 target.user.title 原始記錄欄位「職稱」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
姓氏 target.user.last_name 原始記錄欄位「Last Name」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
位置 target.location.name 原始記錄欄位「Location」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
operation_detail metadata.description 原始記錄欄位「operation_detail」會對應至這個 UDM 欄位。
運算子 principal.user.email_addresses 如果原始記錄欄位「operator」符合電子郵件規則運算式,就會對應至這個 UDM 欄位。
運算子 principal.user.userid 如果原始記錄欄位「operator」不符合電子郵件規則運算式,就會對應至這個 UDM 欄位。
Room Name target.user.attribute.labels.value 原始記錄欄位「Room Name」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
角色名稱 target.user.attribute.roles.name 原始記錄欄位「角色名稱」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
時間 metadata.event_timestamp.seconds 系統會剖析原始記錄欄位「time」,並對應至這個 UDM 欄位。
類型 target.user.attribute.labels.value 原始記錄欄位「Type」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
使用者角色 target.user.attribute.roles.name 原始記錄欄位「使用者角色」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
使用者類型 target.user.attribute.labels.value 原始記錄欄位「使用者類型」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
metadata.log_type 「ZOOM_OPERATION_LOGS」值會指派給這個 UDM 欄位。
metadata.vendor_name 系統會將「ZOOM」值指派給這個 UDM 欄位。
metadata.product_name 「ZOOM_OPERATION_LOGS」值會指派給這個 UDM 欄位。
metadata.event_type 系統會根據下列邏輯判斷值:
1. 如果「event_type」欄位不為空白,系統會使用該欄位的值。
2. 如果「operator」、「email」或「email2」欄位不是空白,系統會將值設為「USER_UNCATEGORIZED」。
3. 否則,值會設為「GENERIC_EVENT」。
json_data about.user.attribute.labels.value 系統會將原始記錄檔欄位「json_data」(從「operation_detail」欄位擷取) 剖析為 JSON。剖析 JSON 陣列的每個元素時,「assistant」和「options」欄位會對應至 UDM 中「labels」陣列的「value」欄位。
json_data about.user.userid 系統會將原始記錄檔欄位「json_data」(從「operation_detail」欄位擷取) 剖析為 JSON。剖析 JSON 陣列時,每個元素 (第一個除外) 的「userId」欄位會對應至 UDM 中「about.user」物件的「userid」欄位。
json_data target.user.attribute.labels.value 系統會將原始記錄檔欄位「json_data」(從「operation_detail」欄位擷取) 剖析為 JSON。剖析的 JSON 陣列第一個元素的「assistant」和「options」欄位,會對應至 UDM 中「labels」陣列的「value」欄位。
json_data target.user.userid 系統會將原始記錄檔欄位「json_data」(從「operation_detail」欄位擷取) 剖析為 JSON。剖析 JSON 陣列的第一個元素後,「userId」欄位會對應至 UDM 中「target.user」物件的「userid」欄位。
電子郵件 target.user.email_addresses 原始記錄欄位「email」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
email2 target.user.email_addresses 原始記錄欄位「email2」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。
角色 target.user.attribute.roles.name 原始記錄欄位「role」(從「operation_detail」欄位擷取) 會對應至這個 UDM 欄位。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。