收集 Rippling 活動記錄
本文說明如何使用 Amazon S3,將 Rippling 活動記錄擷取至 Google Security Operations。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體。
- Rippling 的特殊存取權 (API 權杖,可存取「公司活動」)。
- AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。
取得 Rippling 的必要條件
- 登入 Rippling 管理員。
- 開啟「Search」>「API Tokens」。
替代路徑:依序點選「設定」>「公司設定」>「API 權杖」。 - 按一下「Create API token」。
- 提供下列設定詳細資料:
- 名稱:提供有意義的不重複名稱 (例如
Google SecOps S3 Export
) - API 版本:Base API (v1)
- 範圍/權限:啟用
company:activity:read
(公司活動必須啟用這項權限)。
- 名稱:提供有意義的不重複名稱 (例如
- 點按「Create」(建立),並將權杖值儲存在安全的位置。(您會將其做為不記名憑證)。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
rippling-activity-logs
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」(政策)。
- 按一下「建立政策」>「JSON」分頁。
- 複製並貼上下列政策。
政策 JSON (如果您輸入不同的值區或前置字元,請替換值):
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::rippling-activity-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::rippling-activity-logs/rippling/activity/state.json" } ] } ````
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
WriteRipplingToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 rippling_activity_to_s3
執行階段 Python 3.13 架構 x86_64 執行角色 WriteRipplingToS3Role
建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (
rippling_activity_to_s3.py
)。#!/usr/bin/env python3 # Lambda: Pull Rippling Company Activity logs to S3 (raw JSON, no transforms) import os, json, time, urllib.parse from urllib.request import Request, urlopen from datetime import datetime, timezone, timedelta import boto3 API_TOKEN = os.environ["RIPPLING_API_TOKEN"] ACTIVITY_URL = os.environ.get("RIPPLING_ACTIVITY_URL", "https://api.rippling.com/platform/api/company_activity") S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "rippling/activity/") STATE_KEY = os.environ.get("STATE_KEY", "rippling/activity/state.json") LIMIT = int(os.environ.get("LIMIT", "1000")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "10")) LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60")) END_LAG_SECONDS = int(os.environ.get("END_LAG_SECONDS", "120")) s3 = boto3.client("s3") def _headers(): return {"Authorization": f"Bearer {API_TOKEN}", "Accept": "application/json"} def _get_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) j = json.loads(obj["Body"].read()) return {"since": j.get("since"), "next": j.get("next")} except Exception: return {"since": None, "next": None} def _put_state(since_iso, next_cursor): body = json.dumps({"since": since_iso, "next": next_cursor}, separators=(",", ":")).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body) def _get(url): req = Request(url, method="GET") for k, v in _headers().items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) def _build_url(base, params): qs = urllib.parse.urlencode(params) return f"{base}?{qs}" if qs else base def _parse_iso(ts): if ts.endswith("Z"): ts = ts[:-1] + "+00:00" return datetime.fromisoformat(ts) def _iso_from_epoch(sec): return datetime.fromtimestamp(sec, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _write(payload, run_ts_iso, page_index, source="company_activity"): day_path = _parse_iso(run_ts_iso).strftime("%Y/%m/%d") key = f"{S3_PREFIX.strip('/')}/{day_path}/{run_ts_iso.replace(':','').replace('-','')}-page{page_index:05d}-{source}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8")) return key def lambda_handler(event=None, context=None): state = _get_state() run_end = datetime.now(timezone.utc) - timedelta(seconds=END_LAG_SECONDS) end_iso = run_end.replace(microsecond=0).isoformat().replace("+00:00", "Z") since_iso = state["since"] next_cursor = state["next"] if since_iso is None: since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60) else: try: since_iso = (_parse_iso(since_iso) + timedelta(seconds=1)).replace(microsecond=0).isoformat().replace("+00:00", "Z") except Exception: since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60) run_ts_iso = end_iso pages = 0 total = 0 newest_ts = None pending_next = None while pages < MAX_PAGES: params = {"limit": str(LIMIT)} if next_cursor: params["next"] = next_cursor else: params["startDate"] = since_iso params["endDate"] = end_iso url = _build_url(ACTIVITY_URL, params) data = _get(url) _write(data, run_ts_iso, pages) events = data.get("events") or data.get("items") or data.get("data") or [] total += len(events) if isinstance(events, list) else 0 if isinstance(events, list): for ev in events: t = ev.get("timestamp") or ev.get("time") or ev.get("event_time") if isinstance(t, str): try: dt_ts = _parse_iso(t) if newest_ts is None or dt_ts > newest_ts: newest_ts = dt_ts except Exception: pass nxt = data.get("next") or data.get("next_cursor") or None pages += 1 if nxt: next_cursor = nxt pending_next = nxt continue else: pending_next = None break new_since_iso = (newest_ts or run_end).replace(microsecond=0).isoformat().replace("+00:00", "Z") _put_state(new_since_iso, pending_next) return {"ok": True, "pages": pages, "events": total, "since": new_since_iso, "next": pending_next}
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下表提供的環境變數,並將範例值換成您的值。
環境變數
鍵 範例值 S3_BUCKET
rippling-activity-logs
S3_PREFIX
rippling/activity/
STATE_KEY
rippling/activity/state.json
RIPPLING_API_TOKEN
your-api-token
RIPPLING_ACTIVITY_URL
https://api.rippling.com/platform/api/company_activity
LIMIT
1000
MAX_PAGES
10
LOOKBACK_MINUTES
60
END_LAG_SECONDS
120
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
rippling_activity_to_s3
。 - 名稱:
rippling-activity-logs-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」>「Add users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::rippling-activity-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::rippling-activity-logs" } ] }
Name =
secops-reader-policy
。依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
為
secops-reader
建立存取金鑰:依序點選「安全憑證」>「存取金鑰」。按一下「建立存取金鑰」。
下載
.CSV
。(您會將這些值貼到動態饋給中)。
在 Google SecOps 中設定動態消息,擷取 Rippling 活動記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Rippling Activity Logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Rippling 活動記錄」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://rippling-activity-logs/rippling/activity/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:
rippling.activity
- 選用:擷取標籤:新增擷取標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。