收集 Swimlane Platform 日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Swimlane 平台日志注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • Swimlane 的特权访问权限(账号管理员能够生成个人访问令牌)
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

收集 Swimlane Platform 前提条件(ID、API 密钥、组织 ID、令牌)

  1. 账号管理员身份登录 Swimlane 平台
  2. 前往个人资料选项
  3. 点击个人资料以打开个人资料编辑器。
  4. 前往个人访问令牌部分。
  5. 点击生成令牌以创建新的个人访问令牌。
  6. 立即复制令牌并妥善存储(系统不会再显示此令牌)。
  7. 记录以下集成详细信息:
    • 个人访问令牌 (PAT):用于 API 调用的 Private-Token 标头中。
    • 账号 ID:对于审核日志 API 路径 /api/public/audit/account/{ACCOUNT_ID}/auditlogs,此参数是必需的。如果您不知道自己的账号 ID,请与 Swimlane 管理员联系。
    • 基础网址:您的 Swimlane 网域(例如 https://eu.swimlane.apphttps://us.swimlane.app)。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 swimlane-audit)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSwimlaneAuditObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/*"
        },
        {
          "Sid": "AllowStateReadWrite",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/state.json"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 swimlane-audit
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策和 AWS 托管政策:

    • 上述创建的自定义政策
    • service-role/AWSLambdaBasicExecutionRole (CloudWatch Logs)
  6. 将角色命名为 WriteSwimlaneAuditToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 swimlane_audit_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteSwimlaneAuditToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (swimlane_audit_to_s3.py):

    #!/usr/bin/env python3
    import os, json, gzip, io, uuid, datetime as dt, urllib.parse, urllib.request
    import boto3
    
    # ---- Environment ----
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "swimlane/audit/")
    STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json")
    BASE_URL = os.environ["SWIMLANE_BASE_URL"].rstrip("/")  # e.g., https://eu.swimlane.app
    ACCOUNT_ID = os.environ["SWIMLANE_ACCOUNT_ID"]
    TENANT_LIST = os.environ.get("SWIMLANE_TENANT_LIST", "")  # comma-separated; optional
    INCLUDE_ACCOUNT = os.environ.get("INCLUDE_ACCOUNT", "true").lower() == "true"
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "100"))  # max 100
    WINDOW_MINUTES = int(os.environ.get("WINDOW_MINUTES", "15"))  # time range per run
    PAT_TOKEN = os.environ["SWIMLANE_PAT_TOKEN"]  # Personal Access Token
    TIMEOUT = int(os.environ.get("TIMEOUT", "30"))
    
    AUDIT_URL = f"{BASE_URL}/api/public/audit/account/{ACCOUNT_ID}/auditlogs"
    
    s3 = boto3.client("s3")
    
    # ---- Helpers ----
    
    def _http(req: urllib.request.Request):
        return urllib.request.urlopen(req, timeout=TIMEOUT)
    
    def _now():
        return dt.datetime.utcnow()
    
    def get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def put_state(state: dict) -> None:
        state["updated_at"] = _now().isoformat() + "Z"
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode())
    
    def build_url(from_dt: dt.datetime, to_dt: dt.datetime, page: int) -> str:
        params = {
            "pageNumber": str(page),
            "pageSize": str(PAGE_SIZE),
            "includeAccount": str(INCLUDE_ACCOUNT).lower(),
            "fromdate": from_dt.replace(microsecond=0).isoformat() + "Z",
            "todate": to_dt.replace(microsecond=0).isoformat() + "Z",
        }
        if TENANT_LIST:
            params["tenantList"] = TENANT_LIST
        return AUDIT_URL + "?" + urllib.parse.urlencode(params)
    
    def fetch_page(url: str) -> dict:
        headers = {
            "Accept": "application/json",
            "Private-Token": PAT_TOKEN,
        }
        req = urllib.request.Request(url, headers=headers)
        with _http(req) as r:
            return json.loads(r.read())
    
    def write_chunk(items: list[dict], ts: dt.datetime) -> str:
        key = f"{S3_PREFIX}{ts:%Y/%m/%d}/swimlane-audit-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for rec in items:
                gz.write((json.dumps(rec) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(buf, S3_BUCKET, key)
        return key
    
    def lambda_handler(event=None, context=None):
        state = get_state()
    
        # determine window
        to_dt = _now()
        from_dt = to_dt - dt.timedelta(minutes=WINDOW_MINUTES)
        if (prev := state.get("last_to_dt")):
            try:
                from_dt = dt.datetime.fromisoformat(prev.replace("Z", "+00:00"))
            except Exception:
                pass
    
        page = int(state.get("page", 1))
        total_written = 0
    
        while True:
            url = build_url(from_dt, to_dt, page)
            resp = fetch_page(url)
            items = resp.get("auditlogs", []) or []
            if items:
                write_chunk(items, _now())
                total_written += len(items)
            next_path = resp.get("next")
            if not next_path:
                break
            page += 1
            state["page"] = page
    
        # advance state window
        state["last_to_dt"] = to_dt.replace(microsecond=0).isoformat() + "Z"
        state["page"] = 1
        put_state(state)
    
        return {"ok": True, "written": total_written, "from": from_dt.isoformat() + "Z", "to": to_dt.isoformat() + "Z"}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入以下环境变量,并替换为您的值。

    示例值
    S3_BUCKET swimlane-audit
    S3_PREFIX swimlane/audit/
    STATE_KEY swimlane/audit/state.json
    SWIMLANE_BASE_URL https://eu.swimlane.app
    SWIMLANE_ACCOUNT_ID xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SWIMLANE_TENANT_LIST tenantA,tenantB(可选)
    INCLUDE_ACCOUNT true
    PAGE_SIZE 100
    WINDOW_MINUTES 15
    SWIMLANE_PAT_TOKEN <your-personal-access-token>
    TIMEOUT 30
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排频率 (15 min)
    • 目标:您的 Lambda 函数 swimlane_audit_to_s3
    • 名称swimlane-audit-schedule-15min
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. AWS 控制台中,依次前往 IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户secops-reader
    • 访问类型访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::swimlane-audit"
        }
      ]
    }
    
  7. 将名称设置为 secops-reader-policy

  8. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  10. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Swimlane Platform 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Swimlane Platform logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Swimlane Platform 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://swimlane-audit/swimlane/audit/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。