Zoom 작업 로그 수집
이 문서에서는 Amazon S3를 사용하여 Zoom 작업 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 원시 로그를 통합 데이터 모델 (UDM)로 변환합니다. 원시 로그 메시지에서 필드를 추출하고, 데이터 정리 및 정규화를 실행하고, 추출된 정보를 해당 UDM 필드에 매핑하여 궁극적으로 SIEM 시스템 내에서 분석 및 상관관계를 위한 데이터를 보강합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- Zoom에 대한 액세스 권한 관리
- AWS (S3, IAM, Lambda, EventBridge)에 대한 권한 액세스
Zoom Operation Logs 필수사항 (ID, API 키, 조직 ID, 토큰) 수집
- Zoom 앱 마켓플레이스에 로그인합니다.
- 개발 > 앱 빌드 > 서버 간 OAuth로 이동합니다.
- 앱을 만들고
report:read:operation_logs:admin
(또는report:read:admin
) 범위를 추가합니다. - 앱 사용자 인증 정보에서 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- 계정 ID.
- 클라이언트 ID.
- 클라이언트 보안 비밀번호
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다 (예:
zoom-operation-logs
). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색하여 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책 > 정책 만들기 > JSON 탭으로 이동합니다.
다음 정책을 입력합니다.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutZoomOperationLogs", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/*" }, { "Sid": "AllowStateReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/state.json" } ] }
- 다른 버킷 이름을 입력한 경우
zoom-operation-logs
을 해당 이름으로 바꿉니다.
- 다른 버킷 이름을 입력한 경우
다음 > 정책 만들기를 클릭합니다.
IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.
새로 만든 정책을 연결합니다.
역할 이름을
WriteZoomOperationLogsToS3Role
로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
설정 | 값 |
---|---|
이름 | zoom_operationlogs_to_s3 |
런타임 | Python 3.13 |
아키텍처 | x86_64 |
실행 역할 | WriteZoomOperationLogsToS3Role |
함수가 생성되면 코드 탭을 열고 스텁을 삭제하고 다음 코드를 입력합니다(
zoom_operationlogs_to_s3.py
).#!/usr/bin/env python3 import os, json, gzip, io, uuid, datetime as dt, base64, urllib.parse, urllib.request import boto3 # ---- Environment ---- S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "zoom/operationlogs/") STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json") ZOOM_ACCOUNT_ID = os.environ["ZOOM_ACCOUNT_ID"] ZOOM_CLIENT_ID = os.environ["ZOOM_CLIENT_ID"] ZOOM_CLIENT_SECRET = os.environ["ZOOM_CLIENT_SECRET"] PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "300")) # API default 30; max may vary TIMEOUT = int(os.environ.get("TIMEOUT", "30")) TOKEN_URL = "https://zoom.us/oauth/token" REPORT_URL = "https://api.zoom.us/v2/report/operationlogs" s3 = boto3.client("s3") # ---- Helpers ---- def _http(req: urllib.request.Request): return urllib.request.urlopen(req, timeout=TIMEOUT) def get_token() -> str: params = urllib.parse.urlencode({ "grant_type": "account_credentials", "account_id": ZOOM_ACCOUNT_ID, }).encode() basic = base64.b64encode(f"{ZOOM_CLIENT_ID}:{ZOOM_CLIENT_SECRET}".encode()).decode() req = urllib.request.Request( TOKEN_URL, data=params, headers={ "Authorization": f"Basic {basic}", "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "Host": "zoom.us", }, method="POST", ) with _http(req) as r: body = json.loads(r.read()) return body["access_token"] def get_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: # initial state: start today today = dt.date.today().isoformat() return {"cursor_date": today, "next_page_token": None} def put_state(state: dict): state["updated_at"] = dt.datetime.utcnow().isoformat() + "Z" s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode()) def write_chunk(items: list[dict], ts: dt.datetime) -> str: key = f"{S3_PREFIX}{ts:%Y/%m/%d}/zoom-operationlogs-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for rec in items: gz.write((json.dumps(rec) + "n").encode()) buf.seek(0) s3.upload_fileobj(buf, S3_BUCKET, key) return key def fetch_page(token: str, from_date: str, to_date: str, next_page_token: str | None) -> dict: q = { "from": from_date, "to": to_date, "page_size": str(PAGE_SIZE), } if next_page_token: q["next_page_token"] = next_page_token url = REPORT_URL + "?" + urllib.parse.urlencode(q) req = urllib.request.Request(url, headers={ "Authorization": f"Bearer {token}", "Accept": "application/json", }) with _http(req) as r: return json.loads(r.read()) def lambda_handler(event=None, context=None): token = get_token() state = get_state() cursor_date = state.get("cursor_date") # YYYY-MM-DD # API requires from/to in yyyy-mm-dd, max one month per request from_date = cursor_date to_date = cursor_date total_written = 0 next_token = state.get("next_page_token") while True: page = fetch_page(token, from_date, to_date, next_token) items = page.get("operation_logs", []) or [] if items: write_chunk(items, dt.datetime.utcnow()) total_written += len(items) next_token = page.get("next_page_token") if not next_token: break # Advance to next day if we've finished this date today = dt.date.today().isoformat() if cursor_date < today: nxt = (dt.datetime.fromisoformat(cursor_date) + dt.timedelta(days=1)).date().isoformat() state["cursor_date"] = nxt state["next_page_token"] = None else: # stay on today; continue later with next_page_token=None state["next_page_token"] = None put_state(state) return {"ok": True, "written": total_written, "date": from_date} if __name__ == "__main__": print(lambda_handler())
구성 > 환경 변수 > 수정 > 새 환경 변수 추가로 이동합니다.
다음 환경 변수를 입력하고 값으로 바꿉니다.
키 예시 값 S3_BUCKET
zoom-operation-logs
S3_PREFIX
zoom/operationlogs/
STATE_KEY
zoom/operationlogs/state.json
ZOOM_ACCOUNT_ID
<your-zoom-account-id>
ZOOM_CLIENT_ID
<your-zoom-client-id>
ZOOM_CLIENT_SECRET
<your-zoom-client-secret>
PAGE_SIZE
300
TIMEOUT
30
함수가 생성되면 해당 페이지에 머무르거나 Lambda > 함수 > your-function을 엽니다.
구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)으로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러로 이동합니다.
- 일정 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요율 (
15 min
) - 타겟: Lambda 함수
zoom_operationlogs_to_s3
- 이름:
zoom-operationlogs-schedule-15min
.
- 반복 일정: 요율 (
- 일정 만들기를 클릭합니다.
선택사항: Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기
- AWS 콘솔에서 IAM > 사용자 > 사용자 추가로 이동합니다.
- Add users를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader
- 액세스 유형: 액세스 키 — 프로그래매틱 액세스
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
JSON 편집기에서 다음 정책을 입력합니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::zoom-operation-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::zoom-operation-logs" } ] }
이름을
secops-reader-policy
로 설정합니다.정책 만들기 > 검색/선택 > 다음 > 권한 추가로 이동합니다.
보안 사용자 인증 정보> 액세스 키> 액세스 키 만들기로 이동합니다.
CSV를 다운로드합니다 (이러한 값은 피드에 입력됨).
Zoom Operation Logs를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- + 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다 (예:
Zoom Operation Logs
). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Zoom 작업 로그를 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI:
s3://zoom-operation-logs/zoom/operationlogs/
- 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스: 애셋 네임스페이스입니다.
- 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
- S3 URI:
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
UDM 매핑 테이블
로그 필드 | UDM 매핑 | 논리 |
---|---|---|
action | metadata.product_event_type | 'action' 원시 로그 필드가 이 UDM 필드에 매핑됩니다. |
category_type | additional.fields.key | 원시 로그 필드 'category_type'이 이 UDM 필드에 매핑됩니다. |
category_type | additional.fields.value.string_value | 원시 로그 필드 'category_type'이 이 UDM 필드에 매핑됩니다. |
부서 | target.user.department | 'operation_detail' 필드에서 추출된 원시 로그 필드 'Department'가 이 UDM 필드에 매핑됩니다. |
설명 | target.user.role_description | 원시 로그 필드 'Description'('operation_detail' 필드에서 추출됨)이 이 UDM 필드에 매핑됩니다. |
표시 이름 | target.user.user_display_name | 원시 로그 필드 '표시 이름'('operation_detail' 필드에서 추출됨)이 이 UDM 필드에 매핑됩니다. |
이메일 주소 | target.user.email_addresses | 원시 로그 필드 '이메일 주소'('operation_detail' 필드에서 추출됨)가 이 UDM 필드에 매핑됩니다. |
이름 | target.user.first_name | 'operation_detail' 필드에서 추출된 원시 로그 필드 'First Name'이 이 UDM 필드에 매핑됩니다. |
직책 | target.user.title | 'Job Title'(직책) 원시 로그 필드('operation_detail' 필드에서 추출됨)가 이 UDM 필드에 매핑됩니다. |
성 | target.user.last_name | 'operation_detail' 필드에서 추출된 원시 로그 필드 'Last Name'이 이 UDM 필드에 매핑됩니다. |
위치 | target.location.name | 'operation_detail' 필드에서 추출된 원시 로그 필드 'Location'이 이 UDM 필드에 매핑됩니다. |
operation_detail | metadata.description | 원시 로그 필드 'operation_detail'이 이 UDM 필드에 매핑됩니다. |
연산자 | principal.user.email_addresses | 이메일 정규식과 일치하는 경우 원시 로그 필드 'operator'가 이 UDM 필드에 매핑됩니다. |
연산자 | principal.user.userid | 이메일 정규식과 일치하지 않는 경우 원시 로그 필드 'operator'가 이 UDM 필드에 매핑됩니다. |
채팅방 이름 | target.user.attribute.labels.value | 'operation_detail' 필드에서 추출된 원시 로그 필드 'Room Name'이 이 UDM 필드에 매핑됩니다. |
역할 이름 | target.user.attribute.roles.name | 'operation_detail' 필드에서 추출된 원시 로그 필드 'Role Name'이 이 UDM 필드에 매핑됩니다. |
시간 | metadata.event_timestamp.seconds | 원시 로그 필드 'time'이 파싱되어 이 UDM 필드에 매핑됩니다. |
유형 | target.user.attribute.labels.value | 'operation_detail' 필드에서 추출된 원시 로그 필드 'Type'이 이 UDM 필드에 매핑됩니다. |
사용자 역할 | target.user.attribute.roles.name | 원시 로그 필드 'User Role'('operation_detail' 필드에서 추출됨)이 이 UDM 필드에 매핑됩니다. |
사용자 유형 | target.user.attribute.labels.value | 'User Type' 원시 로그 필드('operation_detail' 필드에서 추출됨)가 이 UDM 필드에 매핑됩니다. |
metadata.log_type | 'ZOOM_OPERATION_LOGS' 값이 이 UDM 필드에 할당됩니다. | |
metadata.vendor_name | 'ZOOM' 값이 이 UDM 필드에 할당됩니다. | |
metadata.product_name | 'ZOOM_OPERATION_LOGS' 값이 이 UDM 필드에 할당됩니다. | |
metadata.event_type | 값은 다음 논리에 따라 결정됩니다. 1. 'event_type' 필드가 비어 있지 않으면 해당 값이 사용됩니다. 2. 'operator', 'email' 또는 'email2' 필드가 비어 있지 않으면 값이 'USER_UNCATEGORIZED'로 설정됩니다. 3. 그 외에는 값이 'GENERIC_EVENT'로 설정됩니다. |
|
json_data | about.user.attribute.labels.value | 원시 로그 필드 'json_data'('operation_detail' 필드에서 추출됨)가 JSON으로 파싱됩니다. 파싱된 JSON 배열의 각 요소에 있는 'assistant' 및 'options' 필드는 UDM의 'labels' 배열에 있는 'value' 필드에 매핑됩니다. |
json_data | about.user.userid | 원시 로그 필드 'json_data'('operation_detail' 필드에서 추출됨)가 JSON으로 파싱됩니다. 파싱된 JSON 배열의 각 요소 (첫 번째 요소 제외)의 'userId' 필드는 UDM의 'about.user' 객체의 'userid' 필드에 매핑됩니다. |
json_data | target.user.attribute.labels.value | 원시 로그 필드 'json_data'('operation_detail' 필드에서 추출됨)가 JSON으로 파싱됩니다. 파싱된 JSON 배열의 첫 번째 요소에 있는 'assistant' 및 'options' 필드가 UDM의 'labels' 배열에 있는 'value' 필드에 매핑됩니다. |
json_data | target.user.userid | 원시 로그 필드 'json_data'('operation_detail' 필드에서 추출됨)가 JSON으로 파싱됩니다. 파싱된 JSON 배열의 첫 번째 요소에 있는 'userId' 필드는 UDM의 'target.user' 객체의 'userid' 필드에 매핑됩니다. |
이메일 | target.user.email_addresses | 'operation_detail' 필드에서 추출된 원시 로그 필드 'email'이 이 UDM 필드에 매핑됩니다. |
email2 | target.user.email_addresses | 'operation_detail' 필드에서 추출된 원시 로그 필드 'email2'가 이 UDM 필드에 매핑됩니다. |
역할 | target.user.attribute.roles.name | 'operation_detail' 필드에서 추출된 원시 로그 필드 'role'이 이 UDM 필드에 매핑됩니다. |
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.