收集 MuleSoft Anypoint 記錄

支援的國家/地區:

本文說明如何使用 AWS S3,將 MuleSoft Anypoint 平台記錄檔中的稽核追蹤事件擷取至 Google Security Operations。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • MuleSoft 的特殊存取權
  • AWS 的特殊存取權

取得 MuleSoft 機構 ID

  1. 登入 Anypoint Platform。
  2. 依序點選「選單」>「存取管理」
  3. 在「業務群組」表格中,按一下貴機構名稱。
  4. 複製「機構 ID」(例如 0a12b3c4-d5e6-789f-1021-1a2b34cd5e6f)。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 mulesoft-audit-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

建立 MuleSoft 連線應用程式

  1. 登入 Anypoint Platform。
  2. 依序前往「存取權管理」>「已連結的應用程式」>「建立應用程式」
  3. 提供下列設定詳細資料:
    • 應用程式名稱:輸入不重複的名稱 (例如 Google SecOps export)。
    • 選取「應用程式代表自己執行動作 (用戶端憑證)」
    • 依序點按「新增範圍」→「稽核記錄檢視器」→「下一步」
    • 選取需要記錄的所有商家群組。
    • 依序點選「下一步」> 新增範圍
  4. 按一下「儲存」,然後複製「用戶端 ID」和「用戶端密鑰」

設定 S3 上傳的身分與存取權管理政策和角色

  1. 政策 JSON (將 mulesoft-audit-logs 替換為您的 bucket 名稱):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutAuditObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::mulesoft-audit-logs/*"
        }
      ]
    }
    
  2. 依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁

  3. 複製並貼上政策。

  4. 依序點選「Next」>「Create policy」

  5. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  6. 附加新建立的政策。

  7. 為角色命名 WriteMulesoftToS3Role,然後按一下「建立角色」

建立 Lambda 函式

設定
名稱 mulesoft_audit_to_s3
執行階段 Python 3.13
架構 x86_64
執行角色 使用現有 > WriteMulesoftToS3Role
  1. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (mulesoft_audit_to_s3.py)。

    #!/usr/bin/env python3
    
    import os, json, gzip, io, uuid, datetime as dt, urllib.request, urllib.error, urllib.parse
    import boto3
    
    ORG_ID        = os.environ["MULE_ORG_ID"]
    CLIENT_ID     = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    S3_BUCKET     = os.environ["S3_BUCKET_NAME"]
    
    TOKEN_URL = "https://anypoint.mulesoft.com/accounts/api/v2/oauth2/token"
    QUERY_URL = f"https://anypoint.mulesoft.com/audit/v2/organizations/{ORG_ID}/query"
    
    def http_post(url, data, headers=None):
        raw = json.dumps(data).encode() if headers else urllib.parse.urlencode(data).encode()
        req = urllib.request.Request(url, raw, headers or {})
        try:
            with urllib.request.urlopen(req, timeout=30) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            print("MuleSoft error body →", e.read().decode())
            raise
    
    def get_token():
        return http_post(TOKEN_URL, {
            "grant_type": "client_credentials",
            "client_id":  CLIENT_ID,
            "client_secret": CLIENT_SECRET
        })["access_token"]
    
    def fetch_audit(token, start, end):
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type":  "application/json"
        }
        body = {
            "startDate": f"{start.isoformat(timespec='milliseconds')}Z",
            "endDate":   f"{end.isoformat(timespec='milliseconds')}Z",
            "limit": 200,
            "offset": 0,
            "ascending": False
        }
        while True:
            data = http_post(QUERY_URL, body, headers)
            if not data.get("data"):
                break
            yield from data["data"]
            body["offset"] += body["limit"]
    
    def upload(events, ts):
        key = f"{ts:%Y/%m/%d}/mulesoft-audit-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for ev in events:
                gz.write((json.dumps(ev) + "\n").encode())
        buf.seek(0)
        boto3.client("s3").upload_fileobj(buf, S3_BUCKET, key)
    
    def lambda_handler(event=None, context=None):
        now   = dt.datetime.utcnow().replace(microsecond=0)
        start = now - dt.timedelta(days=1)
    
        token  = get_token()
        events = list(fetch_audit(token, start, now))
    
        if events:
            upload(events, start)
            print(f"Uploaded {len(events)} events")
        else:
            print("No events in the last 24 h")
    
    # For local testing
    if __name__ == "__main__":
        lambda_handler()
    
  2. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  3. 輸入下列環境變數,並將 換成您的值。

    範例值
    MULE_ORG_ID your_org_id
    CLIENT_ID your_client_id
    CLIENT_SECRET your_client_secret
    S3_BUCKET_NAME mulesoft-audit-logs

排定 Lambda 函式 (EventBridge 排程器)

  1. 依序前往「Configuration」>「Triggers」>「Add trigger」>「EventBridge Scheduler」>「Create rule」
  2. 提供下列設定詳細資料:
    • 名稱daily-mulesoft-audit export
    • 排程模式Cron 運算式
    • 運算式0 2 * * * (每天世界標準時間 02:00 執行)。
  3. 其餘設定保留預設值,然後點選「建立」

在 Google SecOps 中設定動態饋給,擷取 MuleSoft 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 MuleSoft Logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Mulesoft」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:

    • S3 URI:值區 URI
      • s3://mulesoft-audit-logs/
        • 請將 mulesoft-audit-logs 替換為實際值區名稱。
    • 來源刪除選項:根據偏好選取刪除選項。

    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。

    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。

    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤。

  8. 點選「下一步」

  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。