收集 Bitwarden Enterprise 事件記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Bitwarden Enterprise 事件記錄擷取至 Google Security Operations。剖析器會將原始 JSON 格式的事件記錄轉換為符合 Chronicle UDM 的結構化格式。並擷取使用者詳細資料、IP 位址和事件類型等相關欄位,然後對應至相應的 UDM 欄位,以進行一致的安全性分析。

事前準備

  • Google SecOps 執行個體
  • Bitwarden 租戶的特殊存取權
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)

取得 Bitwarden API 金鑰和網址

  1. 在 Bitwarden 管理控制台中。
  2. 依序前往「設定」>「機構資訊」>「查看 API 金鑰」
  3. 複製下列詳細資料並儲存到安全位置:
    • 用戶端 ID
    • 用戶端密碼
  4. 判斷 Bitwarden 端點 (依區域而定):
    • IDENTITY_URL: https://identity.bitwarden.com/connect/token (歐盟:https://identity.bitwarden.eu/connect/token)
    • API_BASE: https://api.bitwarden.com (歐盟:https://api.bitwarden.eu)

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照本使用指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 bitwarden-events)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 依序前往 AWS 管理控制台 > IAM > 政策 > 建立政策 > JSON 分頁
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutBitwardenObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::bitwarden-events/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::bitwarden-events/bitwarden/events/state.json"
        }
      ]
    }
    
    
    • 如果您輸入的值區名稱不同,請替換 bitwarden-events
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」

  5. 附加新建立的政策。

  6. 為角色命名 WriteBitwardenToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 bitwarden_events_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WriteBitwardenToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (bitwarden_events_to_s3.py):

    #!/usr/bin/env python3
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    IDENTITY_URL = os.environ.get("IDENTITY_URL", "https://identity.bitwarden.com/connect/token")
    API_BASE = os.environ.get("API_BASE", "https://api.bitwarden.com").rstrip("/")
    CID = os.environ["BW_CLIENT_ID"]          # organization.ClientId
    CSECRET = os.environ["BW_CLIENT_SECRET"]  # organization.ClientSecret
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "bitwarden/events/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "bitwarden/events/state.json")
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "10"))
    
    HEADERS_FORM = {"Content-Type": "application/x-www-form-urlencoded"}
    HEADERS_JSON = {"Accept": "application/json"}
    
    s3 = boto3.client("s3")
    
    def _read_state():
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            j = json.loads(obj["Body"].read())
            return j.get("continuationToken")
        except Exception:
            return None
    
    def _write_state(token):
        body = json.dumps({"continuationToken": token}).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _http(req: Request, timeout: int = 60, max_retries: int = 5):
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # Retry on 429 and 5xx
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _get_token():
        body = urllib.parse.urlencode({
            "grant_type": "client_credentials",
            "scope": "api.organization",
            "client_id": CID,
            "client_secret": CSECRET,
        }).encode("utf-8")
        req = Request(IDENTITY_URL, data=body, method="POST", headers=HEADERS_FORM)
        data = _http(req, timeout=30)
        return data["access_token"], int(data.get("expires_in", 3600))
    
    def _fetch_events(bearer: str, cont: str | None):
        params = {}
        if cont:
            params["continuationToken"] = cont
        qs = ("?" + urllib.parse.urlencode(params)) if params else ""
        url = f"{API_BASE}/public/events{qs}"
        req = Request(url, method="GET", headers={"Authorization": f"Bearer {bearer}", **HEADERS_JSON})
        return _http(req, timeout=60)
    
    def _write_page(obj: dict, run_ts_s: int, page_index: int) -> str:
        # Make filename unique per page to avoid overwrites in the same second
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts_s))}-page{page_index:05d}-bitwarden-events.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        bearer, _ttl = _get_token()
        cont = _read_state()
        run_ts_s = int(time.time())
    
        pages = 0
        written = 0
        while pages < MAX_PAGES:
            data = _fetch_events(bearer, cont)
            # write page
            _write_page(data, run_ts_s, pages)
            pages += 1
    
            # count entries (official shape: {"object":"list","data":[...], "continuationToken": "..."} )
            entries = []
            if isinstance(data.get("data"), list):
                entries = data["data"]
            elif isinstance(data.get("entries"), list):  # fallback if shape differs
                entries = data["entries"]
            written += len(entries)
    
            # next page token (official: "continuationToken")
            next_cont = data.get("continuationToken")
            if next_cont:
                cont = next_cont
                continue
            break
    
        # Save state only if there are more pages to continue in next run
        _write_state(cont if pages >= MAX_PAGES and cont else None)
        return {"ok": True, "pages": pages, "events_estimate": written, "nextContinuationToken": cont}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  6. 輸入下列環境變數,並將 換成您的值:

    範例
    S3_BUCKET bitwarden-events
    S3_PREFIX bitwarden/events/
    STATE_KEY bitwarden/events/state.json
    BW_CLIENT_ID <organization client_id>
    BW_CLIENT_SECRET <organization client_secret>
    IDENTITY_URL https://identity.bitwarden.com/connect/token (歐盟:https://identity.bitwarden.eu/connect/token)
    API_BASE https://api.bitwarden.com (歐盟:https://api.bitwarden.eu)
    MAX_PAGES 10
  7. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  8. 選取「設定」分頁標籤。

  9. 在「一般設定」面板中,按一下「編輯」

  10. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式。
    • 名稱bitwarden-events-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 在 AWS 控制台中,依序前往「IAM」>「Users」,然後按一下「Add users」
  2. 提供下列設定詳細資料:
    • 使用者:輸入不重複的名稱 (例如 secops-reader)
    • 存取權類型:選取「存取金鑰 - 程式輔助存取」
    • 按一下「建立使用者」
  3. 附加最低讀取權限政策 (自訂):依序選取「使用者」secops-reader「權限」「新增權限」「直接附加政策」「建立政策」
  4. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 將名稱設為 secops-reader-policy

  6. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  7. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  8. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定動態饋給,擷取 Bitwarden Enterprise 事件記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Bitwarden Events)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Bitwarden 事件」做為「記錄類型」。
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://bitwarden-events/bitwarden/events/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

UDM 對應表

記錄欄位 UDM 對應 邏輯
actingUserId target.user.userid 如果 enriched.actingUser.userId 為空或空值,系統會使用這個欄位填入 target.user.userid 欄位。
collectionID security_result.detection_fields.key security_result 中填入 detection_fields 內的 key 欄位。
collectionID security_result.detection_fields.value security_result 中填入 detection_fields 內的 value 欄位。
日期 metadata.event_timestamp 系統會剖析並轉換為時間戳記格式,然後對應至 event_timestamp
enriched.actingUser.accessAll security_result.rule_labels.key security_resultrule_labels 中,將值設為「Access_All」。
enriched.actingUser.accessAll security_result.rule_labels.value security_result 中,以從 enriched.actingUser.accessAll 轉換為字串的值,填入 rule_labels 內的 value 欄位。
enriched.actingUser.email target.user.email_addresses 填入 target.user 內的 email_addresses 欄位。
enriched.actingUser.id metadata.product_log_id 填入 metadata 內的 product_log_id 欄位。
enriched.actingUser.id target.labels.key 將值設為 target.labels 內的「ID」。
enriched.actingUser.id target.labels.value 使用 enriched.actingUser.id 中的值填入 target.labels 內的 value 欄位。
enriched.actingUser.name target.user.user_display_name 填入 target.user 內的 user_display_name 欄位。
enriched.actingUser.object target.labels.key target.labels 中將值設為「Object」。
enriched.actingUser.object target.labels.value 使用 enriched.actingUser.object 中的值填入 target.labels 內的 value 欄位。
enriched.actingUser.resetPasswordEnrolled target.labels.key target.labels 中將值設為「ResetPasswordEnrolled」。
enriched.actingUser.resetPasswordEnrolled target.labels.value target.labels 中填入 value 欄位,並將 enriched.actingUser.resetPasswordEnrolled 中的值轉換為字串。
enriched.actingUser.twoFactorEnabled security_result.rule_labels.key security_resultrule_labels 中,將值設為「Two Factor Enabled」。
enriched.actingUser.twoFactorEnabled security_result.rule_labels.value security_result 中,以從 enriched.actingUser.twoFactorEnabled 轉換為字串的值,填入 rule_labels 內的 value 欄位。
enriched.actingUser.userId target.user.userid 填入 target.user 內的 userid 欄位。
enriched.collection.id additional.fields.key 將值設為 additional.fields 內的「集合 ID」。
enriched.collection.id additional.fields.value.string_value 使用 enriched.collection.id 中的值填入 additional.fields 內的 string_value 欄位。
enriched.collection.object additional.fields.key additional.fields 中將值設為「Collection Object」。
enriched.collection.object additional.fields.value.string_value 使用 enriched.collection.object 中的值填入 additional.fields 內的 string_value 欄位。
enriched.type metadata.product_event_type 填入 metadata 內的 product_event_type 欄位。
groupId target.user.group_identifiers 將值新增至 target.user 內的 group_identifiers 陣列。
ipAddress principal.ip 從欄位擷取 IP 位址,並對應至 principal.ip
不適用 extensions.auth 剖析器會建立空物件。
不適用 metadata.event_type 系統會根據 enriched.type,以及 principaltarget 資訊是否存在,可能的值:USER_LOGIN、STATUS_UPDATE、GENERIC_EVENT。
不適用 security_result.action 根據 enriched.type判斷。可能的值:ALLOW、BLOCK。
物件 additional.fields.key additional.fields 中將值設為「Object」。
物件 additional.fields.value 使用 object 中的值填入 additional.fields 內的 value 欄位。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。