收集 Swimlane 平台記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Swimlane Platform 記錄檔擷取至 Google Security Operations。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • Swimlane 的特殊存取權 (帳戶管理員可產生個人存取權杖)
  • AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)

收集 Swimlane Platform 的必要條件 (ID、API 金鑰、機構 ID、權杖)

  1. 以「帳戶管理員」身分登入 Swimlane 平台
  2. 前往「設定檔選項」
  3. 按一下「個人資料」開啟個人資料編輯器。
  4. 前往「Personal Access Token」專區。
  5. 按一下「產生權杖」,建立新的個人存取權杖。
  6. 請立即複製並妥善儲存權杖 (系統不會再顯示)。
  7. 記錄整合的下列詳細資料:
    • 個人存取權杖 (PAT):用於 API 呼叫的 Private-Token 標頭。
    • 帳戶 ID:稽核記錄 API 路徑 /api/public/audit/account/{ACCOUNT_ID}/auditlogs 的必要欄位。如果您不知道帳戶 ID,請與 Swimlane 管理員聯絡。
    • 基準網址:您的 Swimlane 網域 (例如 https://eu.swimlane.apphttps://us.swimlane.app)。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 swimlane-audit)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSwimlaneAuditObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/*"
        },
        {
          "Sid": "AllowStateReadWrite",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/state.json"
        }
      ]
    }
    
    • 如果您輸入的值區名稱不同,請替換 swimlane-audit
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  5. 附加新建立的政策和 AWS 代管政策:

    • 上述建立的自訂政策
    • service-role/AWSLambdaBasicExecutionRole (CloudWatch Logs)
  6. 為角色命名 WriteSwimlaneAuditToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 swimlane_audit_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WriteSwimlaneAuditToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (swimlane_audit_to_s3.py):

    #!/usr/bin/env python3
    import os, json, gzip, io, uuid, datetime as dt, urllib.parse, urllib.request
    import boto3
    
    # ---- Environment ----
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "swimlane/audit/")
    STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json")
    BASE_URL = os.environ["SWIMLANE_BASE_URL"].rstrip("/")  # e.g., https://eu.swimlane.app
    ACCOUNT_ID = os.environ["SWIMLANE_ACCOUNT_ID"]
    TENANT_LIST = os.environ.get("SWIMLANE_TENANT_LIST", "")  # comma-separated; optional
    INCLUDE_ACCOUNT = os.environ.get("INCLUDE_ACCOUNT", "true").lower() == "true"
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "100"))  # max 100
    WINDOW_MINUTES = int(os.environ.get("WINDOW_MINUTES", "15"))  # time range per run
    PAT_TOKEN = os.environ["SWIMLANE_PAT_TOKEN"]  # Personal Access Token
    TIMEOUT = int(os.environ.get("TIMEOUT", "30"))
    
    AUDIT_URL = f"{BASE_URL}/api/public/audit/account/{ACCOUNT_ID}/auditlogs"
    
    s3 = boto3.client("s3")
    
    # ---- Helpers ----
    
    def _http(req: urllib.request.Request):
        return urllib.request.urlopen(req, timeout=TIMEOUT)
    
    def _now():
        return dt.datetime.utcnow()
    
    def get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def put_state(state: dict) -> None:
        state["updated_at"] = _now().isoformat() + "Z"
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode())
    
    def build_url(from_dt: dt.datetime, to_dt: dt.datetime, page: int) -> str:
        params = {
            "pageNumber": str(page),
            "pageSize": str(PAGE_SIZE),
            "includeAccount": str(INCLUDE_ACCOUNT).lower(),
            "fromdate": from_dt.replace(microsecond=0).isoformat() + "Z",
            "todate": to_dt.replace(microsecond=0).isoformat() + "Z",
        }
        if TENANT_LIST:
            params["tenantList"] = TENANT_LIST
        return AUDIT_URL + "?" + urllib.parse.urlencode(params)
    
    def fetch_page(url: str) -> dict:
        headers = {
            "Accept": "application/json",
            "Private-Token": PAT_TOKEN,
        }
        req = urllib.request.Request(url, headers=headers)
        with _http(req) as r:
            return json.loads(r.read())
    
    def write_chunk(items: list[dict], ts: dt.datetime) -> str:
        key = f"{S3_PREFIX}{ts:%Y/%m/%d}/swimlane-audit-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for rec in items:
                gz.write((json.dumps(rec) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(buf, S3_BUCKET, key)
        return key
    
    def lambda_handler(event=None, context=None):
        state = get_state()
    
        # determine window
        to_dt = _now()
        from_dt = to_dt - dt.timedelta(minutes=WINDOW_MINUTES)
        if (prev := state.get("last_to_dt")):
            try:
                from_dt = dt.datetime.fromisoformat(prev.replace("Z", "+00:00"))
            except Exception:
                pass
    
        page = int(state.get("page", 1))
        total_written = 0
    
        while True:
            url = build_url(from_dt, to_dt, page)
            resp = fetch_page(url)
            items = resp.get("auditlogs", []) or []
            if items:
                write_chunk(items, _now())
                total_written += len(items)
            next_path = resp.get("next")
            if not next_path:
                break
            page += 1
            state["page"] = page
    
        # advance state window
        state["last_to_dt"] = to_dt.replace(microsecond=0).isoformat() + "Z"
        state["page"] = 1
        put_state(state)
    
        return {"ok": True, "written": total_written, "from": from_dt.isoformat() + "Z", "to": to_dt.isoformat() + "Z"}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下列環境變數,並將 換成您的值。

    範例值
    S3_BUCKET swimlane-audit
    S3_PREFIX swimlane/audit/
    STATE_KEY swimlane/audit/state.json
    SWIMLANE_BASE_URL https://eu.swimlane.app
    SWIMLANE_ACCOUNT_ID xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SWIMLANE_TENANT_LIST tenantA,tenantB (選填)
    INCLUDE_ACCOUNT true
    PAGE_SIZE 100
    WINDOW_MINUTES 15
    SWIMLANE_PAT_TOKEN <your-personal-access-token>
    TIMEOUT 30
  8. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (15 min)
    • 目標:您的 Lambda 函式 swimlane_audit_to_s3
    • Name (名稱):swimlane-audit-schedule-15min
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. AWS 控制台中,依序前往「IAM」>「Users」>「Add users」
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者secops-reader
    • 存取類型存取金鑰 - 程式輔助存取
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」
  6. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::swimlane-audit"
        }
      ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定資訊提供,擷取 Swimlane Platform 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Swimlane Platform logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Swimlane Platform」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://swimlane-audit/swimlane/audit/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。