收集 Zoom 操作日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Zoom 操作日志注入到 Google Security Operations。解析器会将原始日志转换为统一数据模型 (UDM)。它从原始日志消息中提取字段,执行数据清理和标准化,并将提取的信息映射到相应的 UDM 字段,最终丰富数据,以便在 SIEM 系统中进行分析和关联。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • Zoom 的特权访问权限
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

收集 Zoom Operation Logs 的前提条件(ID、API 密钥、组织 ID、令牌)

  1. 登录 Zoom App Marketplace
  2. 依次前往开发 > 构建应用 > 服务器到服务器 OAuth
  3. 创建应用并添加以下范围:report:read:operation_logs:admin(或 report:read:admin)。
  4. 应用凭据中,复制以下详细信息并将其保存在安全的位置:
    • 账号 ID
    • 客户端 ID
    • 客户端密钥

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 zoom-operation-logs)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutZoomOperationLogs",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/*"
        },
        {
          "Sid": "AllowStateReadWrite",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/state.json"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 zoom-operation-logs
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteZoomOperationLogsToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:
设置
名称 zoom_operationlogs_to_s3
运行时 Python 3.13
架构 x86_64
执行角色 WriteZoomOperationLogsToS3Role
  1. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码(zoom_operationlogs_to_s3.py):

    #!/usr/bin/env python3
    import os, json, gzip, io, uuid, datetime as dt, base64, urllib.parse, urllib.request
    import boto3
    
    # ---- Environment ----
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "zoom/operationlogs/")
    STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json")
    ZOOM_ACCOUNT_ID = os.environ["ZOOM_ACCOUNT_ID"]
    ZOOM_CLIENT_ID = os.environ["ZOOM_CLIENT_ID"]
    ZOOM_CLIENT_SECRET = os.environ["ZOOM_CLIENT_SECRET"]
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "300"))  # API default 30; max may vary
    TIMEOUT = int(os.environ.get("TIMEOUT", "30"))
    
    TOKEN_URL = "https://zoom.us/oauth/token"
    REPORT_URL = "https://api.zoom.us/v2/report/operationlogs"
    
    s3 = boto3.client("s3")
    
    # ---- Helpers ----
    
    def _http(req: urllib.request.Request):
        return urllib.request.urlopen(req, timeout=TIMEOUT)
    
    def get_token() -> str:
        params = urllib.parse.urlencode({
            "grant_type": "account_credentials",
            "account_id": ZOOM_ACCOUNT_ID,
        }).encode()
        basic = base64.b64encode(f"{ZOOM_CLIENT_ID}:{ZOOM_CLIENT_SECRET}".encode()).decode()
        req = urllib.request.Request(
            TOKEN_URL,
            data=params,
            headers={
                "Authorization": f"Basic {basic}",
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": "application/json",
                "Host": "zoom.us",
            },
            method="POST",
        )
        with _http(req) as r:
            body = json.loads(r.read())
            return body["access_token"]
    
    def get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            # initial state: start today
            today = dt.date.today().isoformat()
            return {"cursor_date": today, "next_page_token": None}
    
    def put_state(state: dict):
        state["updated_at"] = dt.datetime.utcnow().isoformat() + "Z"
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode())
    
    def write_chunk(items: list[dict], ts: dt.datetime) -> str:
        key = f"{S3_PREFIX}{ts:%Y/%m/%d}/zoom-operationlogs-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for rec in items:
                gz.write((json.dumps(rec) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(buf, S3_BUCKET, key)
        return key
    
    def fetch_page(token: str, from_date: str, to_date: str, next_page_token: str | None) -> dict:
        q = {
            "from": from_date,
            "to": to_date,
            "page_size": str(PAGE_SIZE),
        }
        if next_page_token:
            q["next_page_token"] = next_page_token
        url = REPORT_URL + "?" + urllib.parse.urlencode(q)
        req = urllib.request.Request(url, headers={
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        })
        with _http(req) as r:
            return json.loads(r.read())
    
    def lambda_handler(event=None, context=None):
        token = get_token()
        state = get_state()
    
        cursor_date = state.get("cursor_date")  # YYYY-MM-DD
        # API requires from/to in yyyy-mm-dd, max one month per request
        from_date = cursor_date
        to_date = cursor_date
    
        total_written = 0
        next_token = state.get("next_page_token")
    
        while True:
            page = fetch_page(token, from_date, to_date, next_token)
            items = page.get("operation_logs", []) or []
            if items:
                write_chunk(items, dt.datetime.utcnow())
                total_written += len(items)
            next_token = page.get("next_page_token")
            if not next_token:
                break
    
        # Advance to next day if we've finished this date
        today = dt.date.today().isoformat()
        if cursor_date < today:
            nxt = (dt.datetime.fromisoformat(cursor_date) + dt.timedelta(days=1)).date().isoformat()
            state["cursor_date"] = nxt
            state["next_page_token"] = None
        else:
            # stay on today; continue later with next_page_token=None
            state["next_page_token"] = None
    
        put_state(state)
        return {"ok": True, "written": total_written, "date": from_date}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  3. 输入以下环境变量,并替换为您的值:

    示例值
    S3_BUCKET zoom-operation-logs
    S3_PREFIX zoom/operationlogs/
    STATE_KEY zoom/operationlogs/state.json
    ZOOM_ACCOUNT_ID <your-zoom-account-id>
    ZOOM_CLIENT_ID <your-zoom-client-id>
    ZOOM_CLIENT_SECRET <your-zoom-client-secret>
    PAGE_SIZE 300
    TIMEOUT 30
  4. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  5. 选择配置标签页。

  6. 常规配置面板中,点击修改

  7. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 前往 Amazon EventBridge > 调度程序
  2. 点击创建时间表
  3. 提供以下配置详细信息:
    • 周期性安排费率 (15 min)。
    • 目标:您的 Lambda 函数 zoom_operationlogs_to_s3
    • 名称zoom-operationlogs-schedule-15min
  4. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. AWS 控制台中,依次前往 IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户secops-reader
    • 访问类型访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::zoom-operation-logs"
        }
      ]
    }
    
  7. 将名称设置为 secops-reader-policy

  8. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  10. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Zoom Operation Logs

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Zoom Operation Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Zoom Operation Logs 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://zoom-operation-logs/zoom/operationlogs/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
操作 metadata.product_event_type 原始日志字段“action”映射到此 UDM 字段。
category_type additional.fields.key 原始日志字段“category_type”映射到此 UDM 字段。
category_type additional.fields.value.string_value 原始日志字段“category_type”映射到此 UDM 字段。
部门 target.user.department 原始日志字段“Department”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
说明 target.user.role_description 原始日志字段“Description”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
显示名称 target.user.user_display_name 原始日志字段“显示名称”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
电子邮件地址 target.user.email_addresses 原始日志字段“电子邮件地址”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
名字 target.user.first_name 原始日志字段“名字”(从“operation_detail”字段中提取)映射到此 UDM 字段。
职位 target.user.title 原始日志字段“职位”(从“operation_detail”字段中提取)映射到此 UDM 字段。
姓氏 target.user.last_name 原始日志字段“Last Name”(从“operation_detail”字段中提取)映射到此 UDM 字段。
位置 target.location.name 原始日志字段“位置”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
operation_detail metadata.description 原始日志字段“operation_detail”会映射到此 UDM 字段。
运算符 principal.user.email_addresses 如果原始日志字段“operator”与电子邮件正则表达式匹配,则会映射到此 UDM 字段。
运算符 principal.user.userid 如果原始日志字段“operator”与电子邮件正则表达式不匹配,则会将其映射到此 UDM 字段。
Room Name target.user.attribute.labels.value 原始日志字段“会议室名称”(从“operation_detail”字段中提取)映射到此 UDM 字段。
角色名称 target.user.attribute.roles.name 原始日志字段“Role Name”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
时间 metadata.event_timestamp.seconds 原始日志字段“time”经过解析后映射到此 UDM 字段。
类型 target.user.attribute.labels.value 原始日志字段“Type”(从“operation_detail”字段中提取)映射到此 UDM 字段。
用户角色 target.user.attribute.roles.name 原始日志字段“用户角色”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
用户类型 target.user.attribute.labels.value 原始日志字段“用户类型”(从“operation_detail”字段中提取)会映射到此 UDM 字段。
metadata.log_type 系统会为相应 UDM 字段分配值“ZOOM_OPERATION_LOGS”。
metadata.vendor_name 系统会为相应 UDM 字段分配“ZOOM”值。
metadata.product_name 系统会为相应 UDM 字段分配值“ZOOM_OPERATION_LOGS”。
metadata.event_type 该值根据以下逻辑确定:
1. 如果“event_type”字段不为空,则使用其值。
2. 如果“operator”“email”或“email2”字段不为空,则该值会设置为“USER_UNCATEGORIZED”。
3. 否则,该值设置为“GENERIC_EVENT”。
json_data about.user.attribute.labels.value 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。已解析的 JSON 数组中每个元素的“assistant”和“options”字段会映射到 UDM 中“labels”数组的“value”字段。
json_data about.user.userid 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。已解析的 JSON 数组中每个元素(第一个元素除外)的“userId”字段会映射到 UDM 中“about.user”对象的“userid”字段。
json_data target.user.attribute.labels.value 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。解析后的 JSON 数组的第一个元素中的“assistant”和“options”字段会映射到 UDM 中“labels”数组的“value”字段。
json_data target.user.userid 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。已解析的 JSON 数组的第一个元素中的“userId”字段会映射到 UDM 中“target.user”对象的“userid”字段。
电子邮件 target.user.email_addresses 原始日志字段“email”(从“operation_detail”字段中提取)映射到此 UDM 字段。
email2 target.user.email_addresses 原始日志字段“email2”(从“operation_detail”字段中提取)映射到此 UDM 字段。
角色 target.user.attribute.roles.name 原始日志字段“role”(从“operation_detail”字段中提取)映射到此 UDM 字段。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。