收集 Swimlane 平台記錄
本文說明如何使用 Amazon S3,將 Swimlane Platform 記錄檔擷取至 Google Security Operations。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- Swimlane 的特殊存取權 (帳戶管理員可產生個人存取權杖)
- AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)
收集 Swimlane Platform 的必要條件 (ID、API 金鑰、機構 ID、權杖)
- 以「帳戶管理員」身分登入 Swimlane 平台。
- 前往「設定檔選項」。
- 按一下「個人資料」開啟個人資料編輯器。
- 前往「Personal Access Token」專區。
- 按一下「產生權杖」,建立新的個人存取權杖。
- 請立即複製並妥善儲存權杖 (系統不會再顯示)。
- 記錄整合的下列詳細資料:
- 個人存取權杖 (PAT):用於 API 呼叫的
Private-Token
標頭。 - 帳戶 ID:稽核記錄 API 路徑
/api/public/audit/account/{ACCOUNT_ID}/auditlogs
的必要欄位。如果您不知道帳戶 ID,請與 Swimlane 管理員聯絡。 - 基準網址:您的 Swimlane 網域 (例如
https://eu.swimlane.app
、https://us.swimlane.app
)。
- 個人存取權杖 (PAT):用於 API 呼叫的
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 值區:建立值區
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
swimlane-audit
)。 - 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSwimlaneAuditObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/*" }, { "Sid": "AllowStateReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/state.json" } ] }
- 如果您輸入的值區名稱不同,請替換
swimlane-audit
。
- 如果您輸入的值區名稱不同,請替換
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策和 AWS 代管政策:
- 上述建立的自訂政策
service-role/AWSLambdaBasicExecutionRole
(CloudWatch Logs)
為角色命名
WriteSwimlaneAuditToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 swimlane_audit_to_s3
執行階段 Python 3.13 架構 x86_64 執行角色 WriteSwimlaneAuditToS3Role
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
swimlane_audit_to_s3.py
):#!/usr/bin/env python3 import os, json, gzip, io, uuid, datetime as dt, urllib.parse, urllib.request import boto3 # ---- Environment ---- S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "swimlane/audit/") STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json") BASE_URL = os.environ["SWIMLANE_BASE_URL"].rstrip("/") # e.g., https://eu.swimlane.app ACCOUNT_ID = os.environ["SWIMLANE_ACCOUNT_ID"] TENANT_LIST = os.environ.get("SWIMLANE_TENANT_LIST", "") # comma-separated; optional INCLUDE_ACCOUNT = os.environ.get("INCLUDE_ACCOUNT", "true").lower() == "true" PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "100")) # max 100 WINDOW_MINUTES = int(os.environ.get("WINDOW_MINUTES", "15")) # time range per run PAT_TOKEN = os.environ["SWIMLANE_PAT_TOKEN"] # Personal Access Token TIMEOUT = int(os.environ.get("TIMEOUT", "30")) AUDIT_URL = f"{BASE_URL}/api/public/audit/account/{ACCOUNT_ID}/auditlogs" s3 = boto3.client("s3") # ---- Helpers ---- def _http(req: urllib.request.Request): return urllib.request.urlopen(req, timeout=TIMEOUT) def _now(): return dt.datetime.utcnow() def get_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def put_state(state: dict) -> None: state["updated_at"] = _now().isoformat() + "Z" s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode()) def build_url(from_dt: dt.datetime, to_dt: dt.datetime, page: int) -> str: params = { "pageNumber": str(page), "pageSize": str(PAGE_SIZE), "includeAccount": str(INCLUDE_ACCOUNT).lower(), "fromdate": from_dt.replace(microsecond=0).isoformat() + "Z", "todate": to_dt.replace(microsecond=0).isoformat() + "Z", } if TENANT_LIST: params["tenantList"] = TENANT_LIST return AUDIT_URL + "?" + urllib.parse.urlencode(params) def fetch_page(url: str) -> dict: headers = { "Accept": "application/json", "Private-Token": PAT_TOKEN, } req = urllib.request.Request(url, headers=headers) with _http(req) as r: return json.loads(r.read()) def write_chunk(items: list[dict], ts: dt.datetime) -> str: key = f"{S3_PREFIX}{ts:%Y/%m/%d}/swimlane-audit-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for rec in items: gz.write((json.dumps(rec) + "n").encode()) buf.seek(0) s3.upload_fileobj(buf, S3_BUCKET, key) return key def lambda_handler(event=None, context=None): state = get_state() # determine window to_dt = _now() from_dt = to_dt - dt.timedelta(minutes=WINDOW_MINUTES) if (prev := state.get("last_to_dt")): try: from_dt = dt.datetime.fromisoformat(prev.replace("Z", "+00:00")) except Exception: pass page = int(state.get("page", 1)) total_written = 0 while True: url = build_url(from_dt, to_dt, page) resp = fetch_page(url) items = resp.get("auditlogs", []) or [] if items: write_chunk(items, _now()) total_written += len(items) next_path = resp.get("next") if not next_path: break page += 1 state["page"] = page # advance state window state["last_to_dt"] = to_dt.replace(microsecond=0).isoformat() + "Z" state["page"] = 1 put_state(state) return {"ok": True, "written": total_written, "from": from_dt.isoformat() + "Z", "to": to_dt.isoformat() + "Z"} if __name__ == "__main__": print(lambda_handler())
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下列環境變數,並將 換成您的值。
鍵 範例值 S3_BUCKET
swimlane-audit
S3_PREFIX
swimlane/audit/
STATE_KEY
swimlane/audit/state.json
SWIMLANE_BASE_URL
https://eu.swimlane.app
SWIMLANE_ACCOUNT_ID
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SWIMLANE_TENANT_LIST
tenantA,tenantB
(選填)INCLUDE_ACCOUNT
true
PAGE_SIZE
100
WINDOW_MINUTES
15
SWIMLANE_PAT_TOKEN
<your-personal-access-token>
TIMEOUT
30
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
15 min
) - 目標:您的 Lambda 函式
swimlane_audit_to_s3
- Name (名稱):
swimlane-audit-schedule-15min
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」>「Add users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:
secops-reader
- 存取類型:存取金鑰 - 程式輔助存取
- 使用者:
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::swimlane-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::swimlane-audit" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定資訊提供,擷取 Swimlane Platform 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Swimlane Platform logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Swimlane Platform」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://swimlane-audit/swimlane/audit/
- 來源刪除選項:根據偏好選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。