收集 Slack 审核日志
本文档介绍了如何使用 Amazon S3 将 Slack 审核日志注入到 Google Security Operations。解析器首先会对布尔值进行归一化处理,然后清除预定义的字段。然后,它将“message”字段解析为 JSON,并通过舍弃非 JSON 消息来处理这些消息。根据特定字段(date_create
和 user_id
)的存在情况,解析器会应用不同的逻辑将原始日志字段映射到 UDM,包括元数据、正文、网络、目标和相关信息,并构建安全结果。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Slack Enterprise Grid 租户和管理控制台的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
收集 Slack 前提条件(应用 ID、OAuth 令牌、组织 ID)
- 登录 Slack 管理控制台。
- 前往 https://api.slack.com/apps,然后依次点击创建新应用 > 从头开始。
- 输入唯一的应用名称,然后选择您的 Slack 工作区。
- 点击创建应用。
- 在左侧边栏中,前往 OAuth 和权限。
- 前往 Scopes 部分,然后添加以下用户令牌范围 - auditlogs:read
- 依次点击安装到 Workspace> 允许。
- 安装完成后,前往组织级应用。
- 点击安装到组织。
- 使用组织所有者/管理员账号授权该应用。
- 复制并安全地保存以
xoxp-
开头的用户 OAuth 令牌(这是您的 SLACK_AUDIT_TOKEN)。 - 记下组织 ID,该 ID 可在 Slack 管理控制台中的设置和权限 > 组织设置下找到。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
slack-audit-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::slack-audit-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::slack-audit-logs/slack/audit/state.json" } ] }
- 如果您输入了其他存储桶名称,请替换
slack-audit-logs
。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
SlackAuditToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
- 提供以下配置详细信息:
设置 | 值 |
---|---|
名称 | slack_audit_to_s3 |
运行时 | Python 3.13 |
架构 | x86_64 |
执行角色 | SlackAuditToS3Role |
创建函数后,打开 Code 标签页,删除桩代码并输入以下内容 (
slack_audit_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Slack Audit Logs (Enterprise Grid) to S3 (no transform) import os, json, time, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BASE_URL = "https://api.slack.com/audit/v1/logs" TOKEN = os.environ["SLACK_AUDIT_TOKEN"] # org-level user token with auditlogs:read BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "slack/audit/") STATE_KEY = os.environ.get("STATE_KEY", "slack/audit/state.json") LIMIT = int(os.environ.get("LIMIT", "200")) # Slack recommends <= 200 MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) LOOKBACK_SEC = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # First-run window HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60")) HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3")) RETRY_AFTER_DEFAULT = int(os.environ.get("RETRY_AFTER_DEFAULT", "2")) # Optional server-side filters (comma-separated "action" values), empty means no filter ACTIONS = os.environ.get("ACTIONS", "").strip() s3 = boto3.client("s3") def _get_state() -> dict: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) st = json.loads(obj["Body"].read() or b"{}") return {"cursor": st.get("cursor")} except Exception: return {"cursor": None} def _put_state(state: dict) -> None: body = json.dumps(state, separators=(",", ":")).encode("utf-8") s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _http_get(params: dict) -> dict: qs = urllib.parse.urlencode(params, doseq=True) url = f"{BASE_URL}?{qs}" if qs else BASE_URL req = Request(url, method="GET") req.add_header("Authorization", f"Bearer {TOKEN}") req.add_header("Accept", "application/json") attempt = 0 while True: try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Respect Retry-After on 429/5xx if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES: retry_after = 0 try: retry_after = int(e.headers.get("Retry-After", RETRY_AFTER_DEFAULT)) except Exception: retry_after = RETRY_AFTER_DEFAULT time.sleep(max(1, retry_after)) attempt += 1 continue # Re-raise other HTTP errors raise except URLError: if attempt < HTTP_RETRIES: time.sleep(RETRY_AFTER_DEFAULT) attempt += 1 continue raise def _write_page(payload: dict, page_idx: int) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX}/{ts}-slack-audit-p{page_idx:05d}.json" body = json.dumps(payload, separators=(",", ":")).encode("utf-8") s3.put_object(Bucket=BUCKET, Key=key, Body=body, ContentType="application/json") return key def lambda_handler(event=None, context=None): state = _get_state() cursor = state.get("cursor") params = {"limit": LIMIT} if ACTIONS: params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()] if cursor: params["cursor"] = cursor else: # First run (or reset): fetch a recent window by time params["oldest"] = int(time.time()) - LOOKBACK_SEC pages = 0 total = 0 last_cursor = None while pages < MAX_PAGES: data = _http_get(params) _write_page(data, pages) entries = data.get("entries") or [] total += len(entries) # Cursor for next page meta = data.get("response_metadata") or {} next_cursor = meta.get("next_cursor") or data.get("next_cursor") if next_cursor: params = {"limit": LIMIT, "cursor": next_cursor} if ACTIONS: params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()] last_cursor = next_cursor pages += 1 continue break if last_cursor: _put_state({"cursor": last_cursor}) return {"ok": True, "pages": pages + (1 if total or last_cursor else 0), "entries": total, "cursor": last_cursor} if __name__ == "__main__": print(lambda_handler())
依次前往配置 > 环境变量 > 修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值:
键 示例值 S3_BUCKET
slack-audit-logs
S3_PREFIX
slack/audit/
STATE_KEY
slack/audit/state.json
SLACK_AUDIT_TOKEN
xoxp-***
(具有auditlogs:read
的组织级用户令牌)LIMIT
200
MAX_PAGES
20
LOOKBACK_SECONDS
3600
HTTP_TIMEOUT
60
HTTP_RETRIES
3
RETRY_AFTER_DEFAULT
2
ACTIONS
(可选,CSV) user_login,app_installed
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度程序 > 创建计划。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数
slack_audit_to_s3
。 - 名称:
slack-audit-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:
secops-reader
。 - 访问类型:访问密钥 - 以程序化方式访问。
- 用户:
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::slack-audit-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::slack-audit-logs" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Slack 审核日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Slack Audit Logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Slack 审核作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://slack-audit-logs/slack/audit/
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
action |
metadata.product_event_type |
直接从原始日志中的 action 字段映射。 |
actor.type |
principal.labels.value |
直接从 actor.type 字段映射,并添加了键 actor.type 。 |
actor.user.email |
principal.user.email_addresses |
直接从 actor.user.email 字段映射。 |
actor.user.id |
principal.user.product_object_id |
直接从 actor.user.id 字段映射。 |
actor.user.id |
principal.user.userid |
直接从 actor.user.id 字段映射。 |
actor.user.name |
principal.user.user_display_name |
直接从 actor.user.name 字段映射。 |
actor.user.team |
principal.user.group_identifiers |
直接从 actor.user.team 字段映射。 |
context.ip_address |
principal.ip |
直接从 context.ip_address 字段映射。 |
context.location.domain |
about.resource.attribute.labels.value |
直接从 context.location.domain 字段映射,并添加了键 context.location.domain 。 |
context.location.id |
about.resource.id |
直接从 context.location.id 字段映射。 |
context.location.name |
about.resource.name |
直接从 context.location.name 字段映射。 |
context.location.name |
about.resource.attribute.labels.value |
直接从 context.location.name 字段映射,并添加了键 context.location.name 。 |
context.location.type |
about.resource.resource_subtype |
直接从 context.location.type 字段映射。 |
context.session_id |
network.session_id |
直接从 context.session_id 字段映射。 |
context.ua |
network.http.user_agent |
直接从 context.ua 字段映射。 |
context.ua |
network.http.parsed_user_agent |
使用 parseduseragent 过滤条件从 context.ua 字段派生的已解析的用户代理信息。 |
country |
principal.location.country_or_region |
直接从 country 字段映射。 |
date_create |
metadata.event_timestamp.seconds |
date_create 字段中的纪元时间戳会转换为时间戳对象。 |
details.inviter.email |
target.user.email_addresses |
直接从 details.inviter.email 字段映射。 |
details.inviter.id |
target.user.product_object_id |
直接从 details.inviter.id 字段映射。 |
details.inviter.name |
target.user.user_display_name |
直接从 details.inviter.name 字段映射。 |
details.inviter.team |
target.user.group_identifiers |
直接从 details.inviter.team 字段映射。 |
details.reason |
security_result.description |
直接从 details.reason 字段映射,如果是数组,则用英文逗号连接。 |
details.type |
about.resource.attribute.labels.value |
直接从 details.type 字段映射,并添加了键 details.type 。 |
details.type |
security_result.summary |
直接从 details.type 字段映射。 |
entity.app.id |
target.resource.id |
直接从 entity.app.id 字段映射。 |
entity.app.name |
target.resource.name |
直接从 entity.app.name 字段映射。 |
entity.channel.id |
target.resource.id |
直接从 entity.channel.id 字段映射。 |
entity.channel.name |
target.resource.name |
直接从 entity.channel.name 字段映射。 |
entity.channel.privacy |
target.resource.attribute.labels.value |
直接从 entity.channel.privacy 字段映射,并添加了键 entity.channel.privacy 。 |
entity.file.filetype |
target.resource.attribute.labels.value |
直接从 entity.file.filetype 字段映射,并添加了键 entity.file.filetype 。 |
entity.file.id |
target.resource.id |
直接从 entity.file.id 字段映射。 |
entity.file.name |
target.resource.name |
直接从 entity.file.name 字段映射。 |
entity.file.title |
target.resource.attribute.labels.value |
直接从 entity.file.title 字段映射,并添加了键 entity.file.title 。 |
entity.huddle.date_end |
about.resource.attribute.labels.value |
直接从 entity.huddle.date_end 字段映射,并添加了键 entity.huddle.date_end 。 |
entity.huddle.date_start |
about.resource.attribute.labels.value |
直接从 entity.huddle.date_start 字段映射,并添加了键 entity.huddle.date_start 。 |
entity.huddle.id |
about.resource.attribute.labels.value |
直接从 entity.huddle.id 字段映射,并添加了键 entity.huddle.id 。 |
entity.huddle.participants.0 |
about.resource.attribute.labels.value |
直接从 entity.huddle.participants.0 字段映射,并添加了键 entity.huddle.participants.0 。 |
entity.huddle.participants.1 |
about.resource.attribute.labels.value |
直接从 entity.huddle.participants.1 字段映射,并添加了键 entity.huddle.participants.1 。 |
entity.type |
target.resource.resource_subtype |
直接从 entity.type 字段映射。 |
entity.user.email |
target.user.email_addresses |
直接从 entity.user.email 字段映射。 |
entity.user.id |
target.user.product_object_id |
直接从 entity.user.id 字段映射。 |
entity.user.name |
target.user.user_display_name |
直接从 entity.user.name 字段映射。 |
entity.user.team |
target.user.group_identifiers |
直接从 entity.user.team 字段映射。 |
entity.workflow.id |
target.resource.id |
直接从 entity.workflow.id 字段映射。 |
entity.workflow.name |
target.resource.name |
直接从 entity.workflow.name 字段映射。 |
id |
metadata.product_log_id |
直接从 id 字段映射。 |
ip |
principal.ip |
直接从 ip 字段映射。由基于 action 字段的逻辑确定。默认值为 USER_COMMUNICATION ,但会根据 action 的值更改为其他值,例如 USER_CREATION 、USER_LOGIN 、USER_LOGOUT 、USER_RESOURCE_ACCESS 、USER_RESOURCE_UPDATE_PERMISSIONS 或 USER_CHANGE_PERMISSIONS 。硬编码为“SLACK_AUDIT”。如果存在 date_create ,则设置为“Enterprise Grid”;否则,如果存在 user_id ,则设置为“Audit Logs”。硬编码为“Slack”。硬编码为“REMOTE”。如果 action 包含“user_login”或“user_logout”,则设置为“SSO”。否则,设置为“MACHINE”。在提供的示例中未映射。默认为“ALLOW”,但如果 action 为“user_login_failed”,则设置为“BLOCK”。如果 date_create 存在,则设置为“Slack”;否则,如果 user_id 存在,则设置为“SLACK”。 |
user_agent |
network.http.user_agent |
直接从 user_agent 字段映射。 |
user_id |
principal.user.product_object_id |
直接从 user_id 字段映射。 |
username |
principal.user.product_object_id |
直接从 username 字段映射。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。