收集 Swimlane Platform 日志
支持的语言:
Google SecOps
SIEM
本文档介绍了如何使用 Amazon S3 将 Swimlane 平台日志注入到 Google Security Operations。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Swimlane 的特权访问权限(账号管理员能够生成个人访问令牌)
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
收集 Swimlane Platform 前提条件(ID、API 密钥、组织 ID、令牌)
- 以账号管理员身份登录 Swimlane 平台。
- 前往个人资料选项。
- 点击个人资料以打开个人资料编辑器。
- 前往个人访问令牌部分。
- 点击生成令牌以创建新的个人访问令牌。
- 立即复制令牌并妥善存储(系统不会再显示此令牌)。
- 记录以下集成详细信息:
- 个人访问令牌 (PAT):用于 API 调用的
Private-Token
标头中。 - 账号 ID:对于审核日志 API 路径
/api/public/audit/account/{ACCOUNT_ID}/auditlogs
,此参数是必需的。如果您不知道自己的账号 ID,请与 Swimlane 管理员联系。 - 基础网址:您的 Swimlane 网域(例如
https://eu.swimlane.app
、https://us.swimlane.app
)。
- 个人访问令牌 (PAT):用于 API 调用的
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
swimlane-audit
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSwimlaneAuditObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/*" }, { "Sid": "AllowStateReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/state.json" } ] }
- 如果您输入了其他存储桶名称,请替换
swimlane-audit
。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策和 AWS 托管政策:
- 上述创建的自定义政策
service-role/AWSLambdaBasicExecutionRole
(CloudWatch Logs)
将角色命名为
WriteSwimlaneAuditToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 swimlane_audit_to_s3
运行时 Python 3.13 架构 x86_64 执行角色 WriteSwimlaneAuditToS3Role
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
swimlane_audit_to_s3.py
):#!/usr/bin/env python3 import os, json, gzip, io, uuid, datetime as dt, urllib.parse, urllib.request import boto3 # ---- Environment ---- S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "swimlane/audit/") STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json") BASE_URL = os.environ["SWIMLANE_BASE_URL"].rstrip("/") # e.g., https://eu.swimlane.app ACCOUNT_ID = os.environ["SWIMLANE_ACCOUNT_ID"] TENANT_LIST = os.environ.get("SWIMLANE_TENANT_LIST", "") # comma-separated; optional INCLUDE_ACCOUNT = os.environ.get("INCLUDE_ACCOUNT", "true").lower() == "true" PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "100")) # max 100 WINDOW_MINUTES = int(os.environ.get("WINDOW_MINUTES", "15")) # time range per run PAT_TOKEN = os.environ["SWIMLANE_PAT_TOKEN"] # Personal Access Token TIMEOUT = int(os.environ.get("TIMEOUT", "30")) AUDIT_URL = f"{BASE_URL}/api/public/audit/account/{ACCOUNT_ID}/auditlogs" s3 = boto3.client("s3") # ---- Helpers ---- def _http(req: urllib.request.Request): return urllib.request.urlopen(req, timeout=TIMEOUT) def _now(): return dt.datetime.utcnow() def get_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def put_state(state: dict) -> None: state["updated_at"] = _now().isoformat() + "Z" s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode()) def build_url(from_dt: dt.datetime, to_dt: dt.datetime, page: int) -> str: params = { "pageNumber": str(page), "pageSize": str(PAGE_SIZE), "includeAccount": str(INCLUDE_ACCOUNT).lower(), "fromdate": from_dt.replace(microsecond=0).isoformat() + "Z", "todate": to_dt.replace(microsecond=0).isoformat() + "Z", } if TENANT_LIST: params["tenantList"] = TENANT_LIST return AUDIT_URL + "?" + urllib.parse.urlencode(params) def fetch_page(url: str) -> dict: headers = { "Accept": "application/json", "Private-Token": PAT_TOKEN, } req = urllib.request.Request(url, headers=headers) with _http(req) as r: return json.loads(r.read()) def write_chunk(items: list[dict], ts: dt.datetime) -> str: key = f"{S3_PREFIX}{ts:%Y/%m/%d}/swimlane-audit-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for rec in items: gz.write((json.dumps(rec) + "n").encode()) buf.seek(0) s3.upload_fileobj(buf, S3_BUCKET, key) return key def lambda_handler(event=None, context=None): state = get_state() # determine window to_dt = _now() from_dt = to_dt - dt.timedelta(minutes=WINDOW_MINUTES) if (prev := state.get("last_to_dt")): try: from_dt = dt.datetime.fromisoformat(prev.replace("Z", "+00:00")) except Exception: pass page = int(state.get("page", 1)) total_written = 0 while True: url = build_url(from_dt, to_dt, page) resp = fetch_page(url) items = resp.get("auditlogs", []) or [] if items: write_chunk(items, _now()) total_written += len(items) next_path = resp.get("next") if not next_path: break page += 1 state["page"] = page # advance state window state["last_to_dt"] = to_dt.replace(microsecond=0).isoformat() + "Z" state["page"] = 1 put_state(state) return {"ok": True, "written": total_written, "from": from_dt.isoformat() + "Z", "to": to_dt.isoformat() + "Z"} if __name__ == "__main__": print(lambda_handler())
依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值。
键 示例值 S3_BUCKET
swimlane-audit
S3_PREFIX
swimlane/audit/
STATE_KEY
swimlane/audit/state.json
SWIMLANE_BASE_URL
https://eu.swimlane.app
SWIMLANE_ACCOUNT_ID
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SWIMLANE_TENANT_LIST
tenantA,tenantB
(可选)INCLUDE_ACCOUNT
true
PAGE_SIZE
100
WINDOW_MINUTES
15
SWIMLANE_PAT_TOKEN
<your-personal-access-token>
TIMEOUT
30
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度程序 > 创建计划。
- 提供以下配置详细信息:
- 周期性安排:频率 (
15 min
) - 目标:您的 Lambda 函数
swimlane_audit_to_s3
- 名称:
swimlane-audit-schedule-15min
- 周期性安排:频率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:
secops-reader
- 访问类型:访问密钥 - 以程序化方式访问
- 用户:
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::swimlane-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::swimlane-audit" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Swimlane Platform 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Swimlane Platform logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Swimlane Platform 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://swimlane-audit/swimlane/audit/
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。