收集 Slack 稽核記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Slack 稽核記錄擷取至 Google Security Operations。剖析器會先將布林值標準化,並清除預先定義的欄位。然後將「message」欄位剖析為 JSON,並捨棄非 JSON 訊息。視特定欄位 (date_createuser_id) 是否存在,剖析器會套用不同邏輯,將原始記錄檔欄位對應至 UDM,包括中繼資料、主體、網路、目標和相關資訊,並建構安全性結果。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • Slack Enterprise Grid 租戶和管理控制台的專屬存取權
  • AWS (S3、IAM、Lambda、EventBridge) 的具備權限存取權

收集 Slack 必要條件 (應用程式 ID、OAuth 權杖、機構 ID)

  1. 登入 Slack 管理控制台。
  2. 前往 https://api.slack.com/apps,然後依序點選「Create New App」>「From scratch」
  3. 輸入不重複的應用程式名稱,然後選取 Slack 工作區
  4. 點選「建立應用程式」
  5. 前往左側邊欄的「OAuth 和權限」
  6. 前往「範圍」部分,並新增下列使用者權杖範圍 - auditlogs:read
  7. 依序點選「安裝到 Workspace」>「允許」
  8. 安裝完成後,請前往「機構層級應用程式」
  9. 按一下「安裝至機構」
  10. 使用機構擁有者/管理員帳戶授權應用程式。
  11. 複製並安全地儲存以 xoxp- 開頭的使用者 OAuth 權杖 (這是您的 SLACK_AUDIT_TOKEN)。
  12. 記下機構組織 ID,這項 ID 位於 Slack 管理主控台的「設定與權限」>「機構組織設定」下方。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 slack-audit-logs)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::slack-audit-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::slack-audit-logs/slack/audit/state.json"
        }
      ]
    }
    
    • 如果您輸入的值區名稱不同,請替換 slack-audit-logs
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  5. 附加新建立的政策。

  6. 為角色命名 SlackAuditToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:
設定
名稱 slack_audit_to_s3
執行階段 Python 3.13
架構 x86_64
執行角色 SlackAuditToS3Role
  1. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列內容 (slack_audit_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Slack Audit Logs (Enterprise Grid) to S3 (no transform)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BASE_URL = "https://api.slack.com/audit/v1/logs"
    
    TOKEN        = os.environ["SLACK_AUDIT_TOKEN"]  # org-level user token with auditlogs:read
    BUCKET       = os.environ["S3_BUCKET"]
    PREFIX       = os.environ.get("S3_PREFIX", "slack/audit/")
    STATE_KEY    = os.environ.get("STATE_KEY", "slack/audit/state.json")
    LIMIT        = int(os.environ.get("LIMIT", "200"))             # Slack recommends <= 200
    MAX_PAGES    = int(os.environ.get("MAX_PAGES", "20"))
    LOOKBACK_SEC = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # First-run window
    HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60"))
    HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3"))
    RETRY_AFTER_DEFAULT = int(os.environ.get("RETRY_AFTER_DEFAULT", "2"))
    # Optional server-side filters (comma-separated "action" values), empty means no filter
    ACTIONS      = os.environ.get("ACTIONS", "").strip()
    
    s3 = boto3.client("s3")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            st = json.loads(obj["Body"].read() or b"{}")
            return {"cursor": st.get("cursor")}
        except Exception:
            return {"cursor": None}
    
    def _put_state(state: dict) -> None:
        body = json.dumps(state, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _http_get(params: dict) -> dict:
        qs  = urllib.parse.urlencode(params, doseq=True)
        url = f"{BASE_URL}?{qs}" if qs else BASE_URL
        req = Request(url, method="GET")
        req.add_header("Authorization", f"Bearer {TOKEN}")
        req.add_header("Accept", "application/json")
    
        attempt = 0
        while True:
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # Respect Retry-After on 429/5xx
                if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES:
                    retry_after = 0
                    try:
                        retry_after = int(e.headers.get("Retry-After", RETRY_AFTER_DEFAULT))
                    except Exception:
                        retry_after = RETRY_AFTER_DEFAULT
                    time.sleep(max(1, retry_after))
                    attempt += 1
                    continue
                # Re-raise other HTTP errors
                raise
            except URLError:
                if attempt < HTTP_RETRIES:
                    time.sleep(RETRY_AFTER_DEFAULT)
                    attempt += 1
                    continue
                raise
    
    def _write_page(payload: dict, page_idx: int) -> str:
        ts  = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX}/{ts}-slack-audit-p{page_idx:05d}.json"
        body = json.dumps(payload, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=key, Body=body, ContentType="application/json")
        return key
    
    def lambda_handler(event=None, context=None):
        state  = _get_state()
        cursor = state.get("cursor")
    
        params = {"limit": LIMIT}
        if ACTIONS:
            params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()]
        if cursor:
            params["cursor"] = cursor
        else:
            # First run (or reset): fetch a recent window by time
            params["oldest"] = int(time.time()) - LOOKBACK_SEC
    
        pages = 0
        total = 0
        last_cursor = None
    
        while pages < MAX_PAGES:
            data = _http_get(params)
            _write_page(data, pages)
    
            entries = data.get("entries") or []
            total += len(entries)
    
            # Cursor for next page
            meta = data.get("response_metadata") or {}
            next_cursor = meta.get("next_cursor") or data.get("next_cursor")
            if next_cursor:
                params = {"limit": LIMIT, "cursor": next_cursor}
                if ACTIONS:
                    params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()]
                last_cursor = next_cursor
                pages += 1
                continue
            break
    
        if last_cursor:
            _put_state({"cursor": last_cursor})
    
        return {"ok": True, "pages": pages + (1 if total or last_cursor else 0), "entries": total, "cursor": last_cursor}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  3. 輸入下列環境變數,並將 換成您的值:

    範例值
    S3_BUCKET slack-audit-logs
    S3_PREFIX slack/audit/
    STATE_KEY slack/audit/state.json
    SLACK_AUDIT_TOKEN xoxp-*** (機構層級使用者權杖,附有 auditlogs:read)
    LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    HTTP_TIMEOUT 60
    HTTP_RETRIES 3
    RETRY_AFTER_DEFAULT 2
    ACTIONS (選填,CSV) user_login,app_installed
  4. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  5. 選取「設定」分頁標籤。

  6. 在「一般設定」面板中,按一下「編輯」

  7. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 slack_audit_to_s3
    • 名稱slack-audit-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. AWS 控制台中,依序前往「IAM」>「Users」>「Add users」
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者secops-reader
    • 存取類型存取金鑰 - 程式輔助存取
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」
  6. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::slack-audit-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::slack-audit-logs"
        }
      ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定動態饋給,擷取 Slack 稽核記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Slack Audit Logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Slack 稽核」做為「記錄類型」。
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://slack-audit-logs/slack/audit/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
action metadata.product_event_type 直接從原始記錄中的 action 欄位對應。
actor.type principal.labels.value 直接從 actor.type 欄位對應,並新增 actor.type 鍵。
actor.user.email principal.user.email_addresses 直接對應 actor.user.email 欄位。
actor.user.id principal.user.product_object_id 直接對應 actor.user.id 欄位。
actor.user.id principal.user.userid 直接對應 actor.user.id 欄位。
actor.user.name principal.user.user_display_name 直接對應 actor.user.name 欄位。
actor.user.team principal.user.group_identifiers 直接對應 actor.user.team 欄位。
context.ip_address principal.ip 直接對應 context.ip_address 欄位。
context.location.domain about.resource.attribute.labels.value 直接從 context.location.domain 欄位對應,並新增 context.location.domain 鍵。
context.location.id about.resource.id 直接對應 context.location.id 欄位。
context.location.name about.resource.name 直接對應 context.location.name 欄位。
context.location.name about.resource.attribute.labels.value 直接從 context.location.name 欄位對應,並新增 context.location.name 鍵。
context.location.type about.resource.resource_subtype 直接對應 context.location.type 欄位。
context.session_id network.session_id 直接對應 context.session_id 欄位。
context.ua network.http.user_agent 直接對應 context.ua 欄位。
context.ua network.http.parsed_user_agent 使用 parseduseragent 篩選器從 context.ua 欄位衍生而來的已剖析使用者代理程式資訊。
country principal.location.country_or_region 直接對應 country 欄位。
date_create metadata.event_timestamp.seconds 系統會將 date_create 欄位的紀元時間戳記轉換為時間戳記物件。
details.inviter.email target.user.email_addresses 直接對應 details.inviter.email 欄位。
details.inviter.id target.user.product_object_id 直接對應 details.inviter.id 欄位。
details.inviter.name target.user.user_display_name 直接對應 details.inviter.name 欄位。
details.inviter.team target.user.group_identifiers 直接對應 details.inviter.team 欄位。
details.reason security_result.description 直接從 details.reason 欄位對應,如果是陣列,則以半形逗號串連。
details.type about.resource.attribute.labels.value 直接從 details.type 欄位對應,並新增 details.type 鍵。
details.type security_result.summary 直接對應 details.type 欄位。
entity.app.id target.resource.id 直接對應 entity.app.id 欄位。
entity.app.name target.resource.name 直接對應 entity.app.name 欄位。
entity.channel.id target.resource.id 直接對應 entity.channel.id 欄位。
entity.channel.name target.resource.name 直接對應 entity.channel.name 欄位。
entity.channel.privacy target.resource.attribute.labels.value 直接從 entity.channel.privacy 欄位對應,並新增 entity.channel.privacy 鍵。
entity.file.filetype target.resource.attribute.labels.value 直接從 entity.file.filetype 欄位對應,並新增 entity.file.filetype 鍵。
entity.file.id target.resource.id 直接對應 entity.file.id 欄位。
entity.file.name target.resource.name 直接對應 entity.file.name 欄位。
entity.file.title target.resource.attribute.labels.value 直接從 entity.file.title 欄位對應,並新增 entity.file.title 鍵。
entity.huddle.date_end about.resource.attribute.labels.value 直接從 entity.huddle.date_end 欄位對應,並新增 entity.huddle.date_end 鍵。
entity.huddle.date_start about.resource.attribute.labels.value 直接從 entity.huddle.date_start 欄位對應,並新增 entity.huddle.date_start 鍵。
entity.huddle.id about.resource.attribute.labels.value 直接從 entity.huddle.id 欄位對應,並新增 entity.huddle.id 鍵。
entity.huddle.participants.0 about.resource.attribute.labels.value 直接從 entity.huddle.participants.0 欄位對應,並新增 entity.huddle.participants.0 鍵。
entity.huddle.participants.1 about.resource.attribute.labels.value 直接從 entity.huddle.participants.1 欄位對應,並新增 entity.huddle.participants.1 鍵。
entity.type target.resource.resource_subtype 直接對應 entity.type 欄位。
entity.user.email target.user.email_addresses 直接對應 entity.user.email 欄位。
entity.user.id target.user.product_object_id 直接對應 entity.user.id 欄位。
entity.user.name target.user.user_display_name 直接對應 entity.user.name 欄位。
entity.user.team target.user.group_identifiers 直接對應 entity.user.team 欄位。
entity.workflow.id target.resource.id 直接對應 entity.workflow.id 欄位。
entity.workflow.name target.resource.name 直接對應 entity.workflow.name 欄位。
id metadata.product_log_id 直接對應 id 欄位。
ip principal.ip 直接對應至「ip」欄位。根據 action 欄位的邏輯判斷。預設為 USER_COMMUNICATION,但會根據 action 的值變更為其他值,例如 USER_CREATIONUSER_LOGINUSER_LOGOUTUSER_RESOURCE_ACCESSUSER_RESOURCE_UPDATE_PERMISSIONSUSER_CHANGE_PERMISSIONS。已硬式編碼為「SLACK_AUDIT」。如果存在 date_create,請設為「Enterprise Grid」,否則如果存在 user_id,請設為「稽核記錄」。已硬式編碼為「Slack」。已硬式編碼為「REMOTE」。如果 action 包含「user_login」或「user_logout」,請設為「SSO」。否則請設為「MACHINE」。提供的範例中未對應。預設值為「ALLOW」,但如果 action 為「user_login_failed」,則會設為「BLOCK」。如果 date_create 存在,請設為「Slack」,否則如果 user_id 存在,請設為「SLACK」。
user_agent network.http.user_agent 直接對應 user_agent 欄位。
user_id principal.user.product_object_id 直接對應 user_id 欄位。
username principal.user.product_object_id 直接對應 username 欄位。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。