收集 Slack 审核日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Slack 审核日志注入到 Google Security Operations。解析器首先会对布尔值进行归一化处理,然后清除预定义的字段。然后,它将“message”字段解析为 JSON,并通过舍弃非 JSON 消息来处理这些消息。根据特定字段(date_createuser_id)的存在情况,解析器会应用不同的逻辑将原始日志字段映射到 UDM,包括元数据、正文、网络、目标和相关信息,并构建安全结果。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • Slack Enterprise Grid 租户和管理控制台的特权访问权限
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

收集 Slack 前提条件(应用 ID、OAuth 令牌、组织 ID)

  1. 登录 Slack 管理控制台。
  2. 前往 https://api.slack.com/apps,然后依次点击创建新应用 > 从头开始
  3. 输入唯一的应用名称,然后选择您的 Slack 工作区
  4. 点击创建应用
  5. 在左侧边栏中,前往 OAuth 和权限
  6. 前往 Scopes 部分,然后添加以下用户令牌范围 - auditlogs:read
  7. 依次点击安装到 Workspace> 允许
  8. 安装完成后,前往组织级应用
  9. 点击安装到组织
  10. 使用组织所有者/管理员账号授权该应用。
  11. 复制并安全地保存以 xoxp- 开头的用户 OAuth 令牌(这是您的 SLACK_AUDIT_TOKEN)。
  12. 记下组织 ID,该 ID 可在 Slack 管理控制台中的设置和权限 > 组织设置下找到。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 slack-audit-logs)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::slack-audit-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::slack-audit-logs/slack/audit/state.json"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 slack-audit-logs
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 SlackAuditToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:
设置
名称 slack_audit_to_s3
运行时 Python 3.13
架构 x86_64
执行角色 SlackAuditToS3Role
  1. 创建函数后,打开 Code 标签页,删除桩代码并输入以下内容 (slack_audit_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Slack Audit Logs (Enterprise Grid) to S3 (no transform)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BASE_URL = "https://api.slack.com/audit/v1/logs"
    
    TOKEN        = os.environ["SLACK_AUDIT_TOKEN"]  # org-level user token with auditlogs:read
    BUCKET       = os.environ["S3_BUCKET"]
    PREFIX       = os.environ.get("S3_PREFIX", "slack/audit/")
    STATE_KEY    = os.environ.get("STATE_KEY", "slack/audit/state.json")
    LIMIT        = int(os.environ.get("LIMIT", "200"))             # Slack recommends <= 200
    MAX_PAGES    = int(os.environ.get("MAX_PAGES", "20"))
    LOOKBACK_SEC = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # First-run window
    HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60"))
    HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3"))
    RETRY_AFTER_DEFAULT = int(os.environ.get("RETRY_AFTER_DEFAULT", "2"))
    # Optional server-side filters (comma-separated "action" values), empty means no filter
    ACTIONS      = os.environ.get("ACTIONS", "").strip()
    
    s3 = boto3.client("s3")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            st = json.loads(obj["Body"].read() or b"{}")
            return {"cursor": st.get("cursor")}
        except Exception:
            return {"cursor": None}
    
    def _put_state(state: dict) -> None:
        body = json.dumps(state, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _http_get(params: dict) -> dict:
        qs  = urllib.parse.urlencode(params, doseq=True)
        url = f"{BASE_URL}?{qs}" if qs else BASE_URL
        req = Request(url, method="GET")
        req.add_header("Authorization", f"Bearer {TOKEN}")
        req.add_header("Accept", "application/json")
    
        attempt = 0
        while True:
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # Respect Retry-After on 429/5xx
                if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES:
                    retry_after = 0
                    try:
                        retry_after = int(e.headers.get("Retry-After", RETRY_AFTER_DEFAULT))
                    except Exception:
                        retry_after = RETRY_AFTER_DEFAULT
                    time.sleep(max(1, retry_after))
                    attempt += 1
                    continue
                # Re-raise other HTTP errors
                raise
            except URLError:
                if attempt < HTTP_RETRIES:
                    time.sleep(RETRY_AFTER_DEFAULT)
                    attempt += 1
                    continue
                raise
    
    def _write_page(payload: dict, page_idx: int) -> str:
        ts  = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX}/{ts}-slack-audit-p{page_idx:05d}.json"
        body = json.dumps(payload, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=key, Body=body, ContentType="application/json")
        return key
    
    def lambda_handler(event=None, context=None):
        state  = _get_state()
        cursor = state.get("cursor")
    
        params = {"limit": LIMIT}
        if ACTIONS:
            params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()]
        if cursor:
            params["cursor"] = cursor
        else:
            # First run (or reset): fetch a recent window by time
            params["oldest"] = int(time.time()) - LOOKBACK_SEC
    
        pages = 0
        total = 0
        last_cursor = None
    
        while pages < MAX_PAGES:
            data = _http_get(params)
            _write_page(data, pages)
    
            entries = data.get("entries") or []
            total += len(entries)
    
            # Cursor for next page
            meta = data.get("response_metadata") or {}
            next_cursor = meta.get("next_cursor") or data.get("next_cursor")
            if next_cursor:
                params = {"limit": LIMIT, "cursor": next_cursor}
                if ACTIONS:
                    params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()]
                last_cursor = next_cursor
                pages += 1
                continue
            break
    
        if last_cursor:
            _put_state({"cursor": last_cursor})
    
        return {"ok": True, "pages": pages + (1 if total or last_cursor else 0), "entries": total, "cursor": last_cursor}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  3. 输入以下环境变量,并替换为您的值:

    示例值
    S3_BUCKET slack-audit-logs
    S3_PREFIX slack/audit/
    STATE_KEY slack/audit/state.json
    SLACK_AUDIT_TOKEN xoxp-***(具有 auditlogs:read 的组织级用户令牌)
    LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    HTTP_TIMEOUT 60
    HTTP_RETRIES 3
    RETRY_AFTER_DEFAULT 2
    ACTIONS (可选,CSV) user_login,app_installed
  4. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  5. 选择配置标签页。

  6. 常规配置面板中,点击修改

  7. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 slack_audit_to_s3
    • 名称slack-audit-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. AWS 控制台中,依次前往 IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户secops-reader
    • 访问类型访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::slack-audit-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::slack-audit-logs"
        }
      ]
    }
    
  7. 将名称设置为 secops-reader-policy

  8. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  10. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Slack 审核日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Slack Audit Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Slack 审核作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://slack-audit-logs/slack/audit/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
action metadata.product_event_type 直接从原始日志中的 action 字段映射。
actor.type principal.labels.value 直接从 actor.type 字段映射,并添加了键 actor.type
actor.user.email principal.user.email_addresses 直接从 actor.user.email 字段映射。
actor.user.id principal.user.product_object_id 直接从 actor.user.id 字段映射。
actor.user.id principal.user.userid 直接从 actor.user.id 字段映射。
actor.user.name principal.user.user_display_name 直接从 actor.user.name 字段映射。
actor.user.team principal.user.group_identifiers 直接从 actor.user.team 字段映射。
context.ip_address principal.ip 直接从 context.ip_address 字段映射。
context.location.domain about.resource.attribute.labels.value 直接从 context.location.domain 字段映射,并添加了键 context.location.domain
context.location.id about.resource.id 直接从 context.location.id 字段映射。
context.location.name about.resource.name 直接从 context.location.name 字段映射。
context.location.name about.resource.attribute.labels.value 直接从 context.location.name 字段映射,并添加了键 context.location.name
context.location.type about.resource.resource_subtype 直接从 context.location.type 字段映射。
context.session_id network.session_id 直接从 context.session_id 字段映射。
context.ua network.http.user_agent 直接从 context.ua 字段映射。
context.ua network.http.parsed_user_agent 使用 parseduseragent 过滤条件从 context.ua 字段派生的已解析的用户代理信息。
country principal.location.country_or_region 直接从 country 字段映射。
date_create metadata.event_timestamp.seconds date_create 字段中的纪元时间戳会转换为时间戳对象。
details.inviter.email target.user.email_addresses 直接从 details.inviter.email 字段映射。
details.inviter.id target.user.product_object_id 直接从 details.inviter.id 字段映射。
details.inviter.name target.user.user_display_name 直接从 details.inviter.name 字段映射。
details.inviter.team target.user.group_identifiers 直接从 details.inviter.team 字段映射。
details.reason security_result.description 直接从 details.reason 字段映射,如果是数组,则用英文逗号连接。
details.type about.resource.attribute.labels.value 直接从 details.type 字段映射,并添加了键 details.type
details.type security_result.summary 直接从 details.type 字段映射。
entity.app.id target.resource.id 直接从 entity.app.id 字段映射。
entity.app.name target.resource.name 直接从 entity.app.name 字段映射。
entity.channel.id target.resource.id 直接从 entity.channel.id 字段映射。
entity.channel.name target.resource.name 直接从 entity.channel.name 字段映射。
entity.channel.privacy target.resource.attribute.labels.value 直接从 entity.channel.privacy 字段映射,并添加了键 entity.channel.privacy
entity.file.filetype target.resource.attribute.labels.value 直接从 entity.file.filetype 字段映射,并添加了键 entity.file.filetype
entity.file.id target.resource.id 直接从 entity.file.id 字段映射。
entity.file.name target.resource.name 直接从 entity.file.name 字段映射。
entity.file.title target.resource.attribute.labels.value 直接从 entity.file.title 字段映射,并添加了键 entity.file.title
entity.huddle.date_end about.resource.attribute.labels.value 直接从 entity.huddle.date_end 字段映射,并添加了键 entity.huddle.date_end
entity.huddle.date_start about.resource.attribute.labels.value 直接从 entity.huddle.date_start 字段映射,并添加了键 entity.huddle.date_start
entity.huddle.id about.resource.attribute.labels.value 直接从 entity.huddle.id 字段映射,并添加了键 entity.huddle.id
entity.huddle.participants.0 about.resource.attribute.labels.value 直接从 entity.huddle.participants.0 字段映射,并添加了键 entity.huddle.participants.0
entity.huddle.participants.1 about.resource.attribute.labels.value 直接从 entity.huddle.participants.1 字段映射,并添加了键 entity.huddle.participants.1
entity.type target.resource.resource_subtype 直接从 entity.type 字段映射。
entity.user.email target.user.email_addresses 直接从 entity.user.email 字段映射。
entity.user.id target.user.product_object_id 直接从 entity.user.id 字段映射。
entity.user.name target.user.user_display_name 直接从 entity.user.name 字段映射。
entity.user.team target.user.group_identifiers 直接从 entity.user.team 字段映射。
entity.workflow.id target.resource.id 直接从 entity.workflow.id 字段映射。
entity.workflow.name target.resource.name 直接从 entity.workflow.name 字段映射。
id metadata.product_log_id 直接从 id 字段映射。
ip principal.ip 直接从 ip 字段映射。由基于 action 字段的逻辑确定。默认值为 USER_COMMUNICATION,但会根据 action 的值更改为其他值,例如 USER_CREATIONUSER_LOGINUSER_LOGOUTUSER_RESOURCE_ACCESSUSER_RESOURCE_UPDATE_PERMISSIONSUSER_CHANGE_PERMISSIONS。硬编码为“SLACK_AUDIT”。如果存在 date_create,则设置为“Enterprise Grid”;否则,如果存在 user_id,则设置为“Audit Logs”。硬编码为“Slack”。硬编码为“REMOTE”。如果 action 包含“user_login”或“user_logout”,则设置为“SSO”。否则,设置为“MACHINE”。在提供的示例中未映射。默认为“ALLOW”,但如果 action 为“user_login_failed”,则设置为“BLOCK”。如果 date_create 存在,则设置为“Slack”;否则,如果 user_id 存在,则设置为“SLACK”。
user_agent network.http.user_agent 直接从 user_agent 字段映射。
user_id principal.user.product_object_id 直接从 user_id 字段映射。
username principal.user.product_object_id 直接从 username 字段映射。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。