收集 Zoom 操作日志
本文档介绍了如何使用 Amazon S3 将 Zoom 操作日志注入到 Google Security Operations。解析器会将原始日志转换为统一数据模型 (UDM)。它从原始日志消息中提取字段,执行数据清理和标准化,并将提取的信息映射到相应的 UDM 字段,最终丰富数据,以便在 SIEM 系统中进行分析和关联。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Zoom 的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
收集 Zoom Operation Logs 的前提条件(ID、API 密钥、组织 ID、令牌)
- 登录 Zoom App Marketplace。
- 依次前往开发 > 构建应用 > 服务器到服务器 OAuth。
- 创建应用并添加以下范围:
report:read:operation_logs:admin
(或report:read:admin
)。 - 在应用凭据中,复制以下详细信息并将其保存在安全的位置:
- 账号 ID。
- 客户端 ID。
- 客户端密钥。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
zoom-operation-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutZoomOperationLogs", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/*" }, { "Sid": "AllowStateReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/state.json" } ] }
- 如果您输入了其他存储桶名称,请替换
zoom-operation-logs
。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
WriteZoomOperationLogsToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
- 提供以下配置详细信息:
设置 | 值 |
---|---|
名称 | zoom_operationlogs_to_s3 |
运行时 | Python 3.13 |
架构 | x86_64 |
执行角色 | WriteZoomOperationLogsToS3Role |
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码(
zoom_operationlogs_to_s3.py
):#!/usr/bin/env python3 import os, json, gzip, io, uuid, datetime as dt, base64, urllib.parse, urllib.request import boto3 # ---- Environment ---- S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "zoom/operationlogs/") STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json") ZOOM_ACCOUNT_ID = os.environ["ZOOM_ACCOUNT_ID"] ZOOM_CLIENT_ID = os.environ["ZOOM_CLIENT_ID"] ZOOM_CLIENT_SECRET = os.environ["ZOOM_CLIENT_SECRET"] PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "300")) # API default 30; max may vary TIMEOUT = int(os.environ.get("TIMEOUT", "30")) TOKEN_URL = "https://zoom.us/oauth/token" REPORT_URL = "https://api.zoom.us/v2/report/operationlogs" s3 = boto3.client("s3") # ---- Helpers ---- def _http(req: urllib.request.Request): return urllib.request.urlopen(req, timeout=TIMEOUT) def get_token() -> str: params = urllib.parse.urlencode({ "grant_type": "account_credentials", "account_id": ZOOM_ACCOUNT_ID, }).encode() basic = base64.b64encode(f"{ZOOM_CLIENT_ID}:{ZOOM_CLIENT_SECRET}".encode()).decode() req = urllib.request.Request( TOKEN_URL, data=params, headers={ "Authorization": f"Basic {basic}", "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "Host": "zoom.us", }, method="POST", ) with _http(req) as r: body = json.loads(r.read()) return body["access_token"] def get_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: # initial state: start today today = dt.date.today().isoformat() return {"cursor_date": today, "next_page_token": None} def put_state(state: dict): state["updated_at"] = dt.datetime.utcnow().isoformat() + "Z" s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode()) def write_chunk(items: list[dict], ts: dt.datetime) -> str: key = f"{S3_PREFIX}{ts:%Y/%m/%d}/zoom-operationlogs-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for rec in items: gz.write((json.dumps(rec) + "n").encode()) buf.seek(0) s3.upload_fileobj(buf, S3_BUCKET, key) return key def fetch_page(token: str, from_date: str, to_date: str, next_page_token: str | None) -> dict: q = { "from": from_date, "to": to_date, "page_size": str(PAGE_SIZE), } if next_page_token: q["next_page_token"] = next_page_token url = REPORT_URL + "?" + urllib.parse.urlencode(q) req = urllib.request.Request(url, headers={ "Authorization": f"Bearer {token}", "Accept": "application/json", }) with _http(req) as r: return json.loads(r.read()) def lambda_handler(event=None, context=None): token = get_token() state = get_state() cursor_date = state.get("cursor_date") # YYYY-MM-DD # API requires from/to in yyyy-mm-dd, max one month per request from_date = cursor_date to_date = cursor_date total_written = 0 next_token = state.get("next_page_token") while True: page = fetch_page(token, from_date, to_date, next_token) items = page.get("operation_logs", []) or [] if items: write_chunk(items, dt.datetime.utcnow()) total_written += len(items) next_token = page.get("next_page_token") if not next_token: break # Advance to next day if we've finished this date today = dt.date.today().isoformat() if cursor_date < today: nxt = (dt.datetime.fromisoformat(cursor_date) + dt.timedelta(days=1)).date().isoformat() state["cursor_date"] = nxt state["next_page_token"] = None else: # stay on today; continue later with next_page_token=None state["next_page_token"] = None put_state(state) return {"ok": True, "written": total_written, "date": from_date} if __name__ == "__main__": print(lambda_handler())
依次前往配置 > 环境变量 > 修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值:
键 示例值 S3_BUCKET
zoom-operation-logs
S3_PREFIX
zoom/operationlogs/
STATE_KEY
zoom/operationlogs/state.json
ZOOM_ACCOUNT_ID
<your-zoom-account-id>
ZOOM_CLIENT_ID
<your-zoom-client-id>
ZOOM_CLIENT_SECRET
<your-zoom-client-secret>
PAGE_SIZE
300
TIMEOUT
30
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 前往 Amazon EventBridge > 调度程序。
- 点击创建时间表。
- 提供以下配置详细信息:
- 周期性安排:费率 (
15 min
)。 - 目标:您的 Lambda 函数
zoom_operationlogs_to_s3
。 - 名称:
zoom-operationlogs-schedule-15min
。
- 周期性安排:费率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:
secops-reader
。 - 访问类型:访问密钥 - 以程序化方式访问。
- 用户:
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::zoom-operation-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::zoom-operation-logs" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Zoom Operation Logs
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Zoom Operation Logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Zoom Operation Logs 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://zoom-operation-logs/zoom/operationlogs/
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
操作 | metadata.product_event_type | 原始日志字段“action”映射到此 UDM 字段。 |
category_type | additional.fields.key | 原始日志字段“category_type”映射到此 UDM 字段。 |
category_type | additional.fields.value.string_value | 原始日志字段“category_type”映射到此 UDM 字段。 |
部门 | target.user.department | 原始日志字段“Department”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
说明 | target.user.role_description | 原始日志字段“Description”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
显示名称 | target.user.user_display_name | 原始日志字段“显示名称”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
电子邮件地址 | target.user.email_addresses | 原始日志字段“电子邮件地址”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
名字 | target.user.first_name | 原始日志字段“名字”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
职位 | target.user.title | 原始日志字段“职位”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
姓氏 | target.user.last_name | 原始日志字段“Last Name”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
位置 | target.location.name | 原始日志字段“位置”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
operation_detail | metadata.description | 原始日志字段“operation_detail”会映射到此 UDM 字段。 |
运算符 | principal.user.email_addresses | 如果原始日志字段“operator”与电子邮件正则表达式匹配,则会映射到此 UDM 字段。 |
运算符 | principal.user.userid | 如果原始日志字段“operator”与电子邮件正则表达式不匹配,则会将其映射到此 UDM 字段。 |
Room Name | target.user.attribute.labels.value | 原始日志字段“会议室名称”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
角色名称 | target.user.attribute.roles.name | 原始日志字段“Role Name”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
时间 | metadata.event_timestamp.seconds | 原始日志字段“time”经过解析后映射到此 UDM 字段。 |
类型 | target.user.attribute.labels.value | 原始日志字段“Type”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
用户角色 | target.user.attribute.roles.name | 原始日志字段“用户角色”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
用户类型 | target.user.attribute.labels.value | 原始日志字段“用户类型”(从“operation_detail”字段中提取)会映射到此 UDM 字段。 |
metadata.log_type | 系统会为相应 UDM 字段分配值“ZOOM_OPERATION_LOGS”。 | |
metadata.vendor_name | 系统会为相应 UDM 字段分配“ZOOM”值。 | |
metadata.product_name | 系统会为相应 UDM 字段分配值“ZOOM_OPERATION_LOGS”。 | |
metadata.event_type | 该值根据以下逻辑确定: 1. 如果“event_type”字段不为空,则使用其值。 2. 如果“operator”“email”或“email2”字段不为空,则该值会设置为“USER_UNCATEGORIZED”。 3. 否则,该值设置为“GENERIC_EVENT”。 |
|
json_data | about.user.attribute.labels.value | 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。已解析的 JSON 数组中每个元素的“assistant”和“options”字段会映射到 UDM 中“labels”数组的“value”字段。 |
json_data | about.user.userid | 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。已解析的 JSON 数组中每个元素(第一个元素除外)的“userId”字段会映射到 UDM 中“about.user”对象的“userid”字段。 |
json_data | target.user.attribute.labels.value | 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。解析后的 JSON 数组的第一个元素中的“assistant”和“options”字段会映射到 UDM 中“labels”数组的“value”字段。 |
json_data | target.user.userid | 原始日志字段“json_data”(从“operation_detail”字段中提取)被解析为 JSON。已解析的 JSON 数组的第一个元素中的“userId”字段会映射到 UDM 中“target.user”对象的“userid”字段。 |
电子邮件 | target.user.email_addresses | 原始日志字段“email”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
email2 | target.user.email_addresses | 原始日志字段“email2”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
角色 | target.user.attribute.roles.name | 原始日志字段“role”(从“operation_detail”字段中提取)映射到此 UDM 字段。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。