收集 Citrix Monitor Service 記錄
本文說明如何使用 Amazon S3,將 Citrix Monitor Service 記錄檔擷取至 Google Security Operations。剖析器會將原始 JSON 格式的記錄轉換為符合 Google SecOps UDM 的結構化格式。這項服務會從原始記錄中擷取相關欄位,將這些欄位對應至相應的 UDM 欄位,並使用額外情境資訊 (例如使用者資訊、電腦詳細資料和網路活動) 擴充資料。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- Citrix Cloud 租戶的特殊存取權
- AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)
收集 Citrix Monitor Service 的必要條件 (ID、API 金鑰、機構 ID、權杖)
- 登入 Citrix Cloud 控制台。
- 依序前往「Identity and Access Management」(身分與存取權管理)「API Access」(API 存取權)。
- 按一下「建立用戶端」。
- 複製下列詳細資料並儲存於安全位置:
- 用戶端 ID
- 用戶端密碼
- 客戶 ID (顯示在 Citrix Cloud 控制台中)
- API 基礎網址:
- 全域:
https://api.cloud.com
- 日本:
https://api.citrixcloud.jp
- 全域:
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 值區:建立值區
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
citrix-monitor-logs
)。 - 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::citrix-monitor-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::citrix-monitor-logs/citrix_monitor/state.json" } ] }
- 如果您輸入的值區名稱不同,請替換
citrix-monitor-logs
。
- 如果您輸入的值區名稱不同,請替換
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策和 AWSLambdaBasicExecutionRole 代管政策。
為角色命名
CitrixMonitorLambdaRole
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 CitrixMonitorCollector
執行階段 Python 3.13 架構 x86_64 執行角色 CitrixMonitorLambdaRole
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
CitrixMonitorCollector.py
):import os import json import uuid import datetime import urllib.parse import urllib.request import urllib.error import boto3 import botocore # Citrix Cloud OAuth2 endpoint template TOKEN_URL_TMPL = "{api_base}/cctrustoauth2/{customerid}/tokens/clients" DEFAULT_API_BASE = "https://api.cloud.com" MONITOR_BASE_PATH = "/monitorodata" s3 = boto3.client("s3") def http_post_form(url, data_dict): """POST form data to get authentication token.""" data = urllib.parse.urlencode(data_dict).encode("utf-8") req = urllib.request.Request(url, data=data, headers={ "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", }) with urllib.request.urlopen(req, timeout=45) as resp: return json.loads(resp.read().decode("utf-8")) def http_get_json(url, headers): """GET JSON data from API endpoint.""" req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=90) as resp: return json.loads(resp.read().decode("utf-8")) def get_citrix_token(api_base, customer_id, client_id, client_secret): """Get Citrix Cloud authentication token.""" url = TOKEN_URL_TMPL.format(api_base=api_base.rstrip("/"), customerid=customer_id) payload = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } response = http_post_form(url, payload) return response["access_token"] def build_entity_url(api_base, entity, filter_query=None, top=None): """Build OData URL with optional filter and pagination.""" base = api_base.rstrip("/") + MONITOR_BASE_PATH + "/" + entity params = [] if filter_query: params.append("$filter=" + urllib.parse.quote(filter_query, safe="()= ':-TZ0123456789")) if top: params.append("$top=" + str(top)) return base + ("?" + "&".join(params) if params else "") def fetch_entity_rows(entity, start_iso=None, end_iso=None, page_size=1000, headers=None, api_base=DEFAULT_API_BASE): """Fetch entity data with optional time filtering and pagination.""" # Try ModifiedDate filter if timestamps are provided first_url = None if start_iso and end_iso: filter_query = f"(ModifiedDate ge {start_iso} and ModifiedDate lt {end_iso})" first_url = build_entity_url(api_base, entity, filter_query, page_size) else: first_url = build_entity_url(api_base, entity, None, page_size) url = first_url while url: try: data = http_get_json(url, headers) items = data.get("value", []) for item in items: yield item url = data.get("@odata.nextLink") except urllib.error.HTTPError as e: # If ModifiedDate filtering fails, fall back to unfiltered query if e.code == 400 and start_iso and end_iso: print(f"ModifiedDate filter not supported for {entity}, falling back to unfiltered query") url = build_entity_url(api_base, entity, None, page_size) continue else: raise def read_state_file(bucket, key): """Read the last processed timestamp from S3 state file.""" try: obj = s3.get_object(Bucket=bucket, Key=key) content = obj["Body"].read().decode("utf-8") state = json.loads(content) timestamp_str = state.get("last_hour_utc") if timestamp_str: return datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).replace(tzinfo=None) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": return None raise return None def write_state_file(bucket, key, dt_utc): """Write the current processed timestamp to S3 state file.""" state = {"last_hour_utc": dt_utc.isoformat() + "Z"} s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, separators=(",", ":")), ContentType="application/json" ) def write_ndjson_to_s3(bucket, key, rows): """Write rows as NDJSON to S3.""" body_lines = [] for row in rows: json_line = json.dumps(row, separators=(",", ":"), ensure_ascii=False) body_lines.append(json_line) body = ("n".join(body_lines) + "n").encode("utf-8") s3.put_object( Bucket=bucket, Key=key, Body=body, ContentType="application/x-ndjson" ) def lambda_handler(event, context): """Main Lambda handler function.""" # Environment variables bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "citrix_monitor").strip("/") state_key = os.environ.get("STATE_KEY") or f"{prefix}/state.json" customer_id = os.environ["CITRIX_CUSTOMER_ID"] client_id = os.environ["CITRIX_CLIENT_ID"] client_secret = os.environ["CITRIX_CLIENT_SECRET"] api_base = os.environ.get("API_BASE", DEFAULT_API_BASE) entities = [e.strip() for e in os.environ.get("ENTITIES", "Machines,Sessions,Connections,Applications,Users").split(",") if e.strip()] page_size = int(os.environ.get("PAGE_SIZE", "1000")) lookback_minutes = int(os.environ.get("LOOKBACK_MINUTES", "75")) use_time_filter = os.environ.get("USE_TIME_FILTER", "true").lower() == "true" # Time window calculation now = datetime.datetime.utcnow() fallback_hour = (now - datetime.timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0) last_processed = read_state_file(bucket, state_key) target_hour = (last_processed + datetime.timedelta(hours=1)) if last_processed else fallback_hour start_iso = target_hour.isoformat() + "Z" end_iso = (target_hour + datetime.timedelta(hours=1)).isoformat() + "Z" # Authentication token = get_citrix_token(api_base, customer_id, client_id, client_secret) headers = { "Authorization": f"CWSAuth bearer={token}", "Citrix-CustomerId": customer_id, "Accept": "application/json", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "citrix-monitor-s3-collector/1.0" } total_records = 0 # Process each entity type for entity in entities: rows_batch = [] try: entity_generator = fetch_entity_rows( entity=entity, start_iso=start_iso if use_time_filter else None, end_iso=end_iso if use_time_filter else None, page_size=page_size, headers=headers, api_base=api_base ) for row in entity_generator: # Store raw Citrix data directly for proper parser recognition rows_batch.append(row) # Write in batches to avoid memory issues if len(rows_batch) >= 1000: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, rows_batch) total_records += len(rows_batch) rows_batch = [] except Exception as ex: print(f"Error processing entity {entity}: {str(ex)}") continue # Write remaining records if rows_batch: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, rows_batch) total_records += len(rows_batch) # Update state file write_state_file(bucket, state_key, target_hour) return { "statusCode": 200, "body": json.dumps({ "success": True, "hour_collected": start_iso, "records_written": total_records, "entities_processed": entities }) }
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下列環境變數,並將 換成您的值:
鍵 範例值 S3_BUCKET
citrix-monitor-logs
S3_PREFIX
citrix_monitor
STATE_KEY
citrix_monitor/state.json
CITRIX_CLIENT_ID
your-client-id
CITRIX_CLIENT_SECRET
your-client-secret
CITRIX_CUSTOMER_ID
your-customer-id
API_BASE
https://api.cloud.com
ENTITIES
Machines,Sessions,Connections,Applications,Users
PAGE_SIZE
1000
LOOKBACK_MINUTES
75
USE_TIME_FILTER
true
建立函式後,請留在該函式頁面 (或依序開啟 Lambda > Functions > CitrixMonitorCollector)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
) - 目標:您的 Lambda 函式
CitrixMonitorCollector
- Name (名稱):
CitrixMonitorCollector-1h
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」>「Add users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:
secops-reader
- 存取類型:存取金鑰 - 程式輔助存取
- 使用者:
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::citrix-monitor-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::citrix-monitor-logs" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定動態饋給,擷取 Citrix Monitor Service 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Citrix Monitor Service logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Citrix Monitor」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://citrix-monitor-logs/citrix_monitor/
- 來源刪除選項:根據偏好選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。