Recolha registos da plataforma Swimlane

Compatível com:

Este documento explica como carregar registos da plataforma Swimlane para o Google Security Operations através do Amazon S3.

Antes de começar

Certifique-se de que tem os seguintes pré-requisitos:

  • Uma instância do Google SecOps
  • Acesso privilegiado ao Swimlane (administrador da conta capaz de gerar um token de acesso pessoal)
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Recolha os pré-requisitos da plataforma Swimlane (IDs, chaves da API, IDs da organização, tokens)

  1. Inicie sessão na plataforma Swimlane como administrador da conta.
  2. Aceda a Opções do perfil.
  3. Clique em Perfil para abrir o editor de perfis.
  4. Navegue para a secção Token de acesso pessoal.
  5. Clique em Gerar token para criar um novo token de acesso pessoal.
  6. Copie o token imediatamente e armazene-o em segurança (não é apresentado novamente).
  7. Registe os seguintes detalhes para a integração:
    • Token de acesso pessoal (PAT): usado no cabeçalho Private-Token para chamadas API.
    • ID da conta: obrigatório para o caminho da API Audit Log /api/public/audit/account/{ACCOUNT_ID}/auditlogs. Contacte o administrador do Swimlane se não souber o ID da sua conta.
    • URL de base: o seu domínio do Swimlane (por exemplo, https://eu.swimlane.app, https://us.swimlane.app).

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, swimlane-audit).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
  2. Introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSwimlaneAuditObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/*"
        },
        {
          "Sid": "AllowStateReadWrite",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/swimlane/audit/state.json"
        }
      ]
    }
    
    • Substitua swimlane-audit se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política recém-criada e a política gerida pela AWS:

    • A política personalizada criada acima
    • service-role/AWSLambdaBasicExecutionRole (registos do CloudWatch)
  6. Dê o nome WriteSwimlaneAuditToS3Role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:

    Definição Valor
    Nome swimlane_audit_to_s3
    Runtime Python 3.13
    Arquitetura x86_64
    Função de execução WriteSwimlaneAuditToS3Role
  4. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (swimlane_audit_to_s3.py):

    #!/usr/bin/env python3
    import os, json, gzip, io, uuid, datetime as dt, urllib.parse, urllib.request
    import boto3
    
    # ---- Environment ----
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "swimlane/audit/")
    STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json")
    BASE_URL = os.environ["SWIMLANE_BASE_URL"].rstrip("/")  # e.g., https://eu.swimlane.app
    ACCOUNT_ID = os.environ["SWIMLANE_ACCOUNT_ID"]
    TENANT_LIST = os.environ.get("SWIMLANE_TENANT_LIST", "")  # comma-separated; optional
    INCLUDE_ACCOUNT = os.environ.get("INCLUDE_ACCOUNT", "true").lower() == "true"
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "100"))  # max 100
    WINDOW_MINUTES = int(os.environ.get("WINDOW_MINUTES", "15"))  # time range per run
    PAT_TOKEN = os.environ["SWIMLANE_PAT_TOKEN"]  # Personal Access Token
    TIMEOUT = int(os.environ.get("TIMEOUT", "30"))
    
    AUDIT_URL = f"{BASE_URL}/api/public/audit/account/{ACCOUNT_ID}/auditlogs"
    
    s3 = boto3.client("s3")
    
    # ---- Helpers ----
    
    def _http(req: urllib.request.Request):
        return urllib.request.urlopen(req, timeout=TIMEOUT)
    
    def _now():
        return dt.datetime.utcnow()
    
    def get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def put_state(state: dict) -> None:
        state["updated_at"] = _now().isoformat() + "Z"
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode())
    
    def build_url(from_dt: dt.datetime, to_dt: dt.datetime, page: int) -> str:
        params = {
            "pageNumber": str(page),
            "pageSize": str(PAGE_SIZE),
            "includeAccount": str(INCLUDE_ACCOUNT).lower(),
            "fromdate": from_dt.replace(microsecond=0).isoformat() + "Z",
            "todate": to_dt.replace(microsecond=0).isoformat() + "Z",
        }
        if TENANT_LIST:
            params["tenantList"] = TENANT_LIST
        return AUDIT_URL + "?" + urllib.parse.urlencode(params)
    
    def fetch_page(url: str) -> dict:
        headers = {
            "Accept": "application/json",
            "Private-Token": PAT_TOKEN,
        }
        req = urllib.request.Request(url, headers=headers)
        with _http(req) as r:
            return json.loads(r.read())
    
    def write_chunk(items: list[dict], ts: dt.datetime) -> str:
        key = f"{S3_PREFIX}{ts:%Y/%m/%d}/swimlane-audit-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for rec in items:
                gz.write((json.dumps(rec) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(buf, S3_BUCKET, key)
        return key
    
    def lambda_handler(event=None, context=None):
        state = get_state()
    
        # determine window
        to_dt = _now()
        from_dt = to_dt - dt.timedelta(minutes=WINDOW_MINUTES)
        if (prev := state.get("last_to_dt")):
            try:
                from_dt = dt.datetime.fromisoformat(prev.replace("Z", "+00:00"))
            except Exception:
                pass
    
        page = int(state.get("page", 1))
        total_written = 0
    
        while True:
            url = build_url(from_dt, to_dt, page)
            resp = fetch_page(url)
            items = resp.get("auditlogs", []) or []
            if items:
                write_chunk(items, _now())
                total_written += len(items)
            next_path = resp.get("next")
            if not next_path:
                break
            page += 1
            state["page"] = page
    
        # advance state window
        state["last_to_dt"] = to_dt.replace(microsecond=0).isoformat() + "Z"
        state["page"] = 1
        put_state(state)
    
        return {"ok": True, "written": total_written, "from": from_dt.isoformat() + "Z", "to": to_dt.isoformat() + "Z"}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. Aceda a Configuração > Variáveis de ambiente.

  6. Clique em Editar > Adicionar nova variável de ambiente.

  7. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores.

    Chave Valor de exemplo
    S3_BUCKET swimlane-audit
    S3_PREFIX swimlane/audit/
    STATE_KEY swimlane/audit/state.json
    SWIMLANE_BASE_URL https://eu.swimlane.app
    SWIMLANE_ACCOUNT_ID xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SWIMLANE_TENANT_LIST tenantA,tenantB (opcional)
    INCLUDE_ACCOUNT true
    PAGE_SIZE 100
    WINDOW_MINUTES 15
    SWIMLANE_PAT_TOKEN <your-personal-access-token>
    TIMEOUT 30
  8. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).

  9. Selecione o separador Configuração.

  10. No painel Configuração geral, clique em Editar.

  11. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Horário recorrente: Taxa (15 min)
    • Alvo: a sua função Lambda swimlane_audit_to_s3
    • Nome: swimlane-audit-schedule-15min
  3. Clique em Criar programação.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: secops-reader
    • Tipo de acesso: chave de acesso – acesso programático
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::swimlane-audit/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::swimlane-audit"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar registos da Swimlane Platform

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Swimlane Platform logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Plataforma Swimlane como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://swimlane-audit/swimlane/audit/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. Predefinição de 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.