收集 Citrix Monitor Service 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Citrix Monitor Service 記錄檔擷取至 Google Security Operations。剖析器會將原始 JSON 格式的記錄轉換為符合 Google SecOps UDM 的結構化格式。這項服務會從原始記錄中擷取相關欄位,將這些欄位對應至相應的 UDM 欄位,並使用額外情境資訊 (例如使用者資訊、電腦詳細資料和網路活動) 擴充資料。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • Citrix Cloud 租戶的特殊存取權
  • AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)

收集 Citrix Monitor Service 的必要條件 (ID、API 金鑰、機構 ID、權杖)

  1. 登入 Citrix Cloud 控制台
  2. 依序前往「Identity and Access Management」(身分與存取權管理)「API Access」(API 存取權)
  3. 按一下「建立用戶端」
  4. 複製下列詳細資料並儲存於安全位置:
    • 用戶端 ID
    • 用戶端密碼
    • 客戶 ID (顯示在 Citrix Cloud 控制台中)
    • API 基礎網址
      • 全域:https://api.cloud.com
      • 日本:https://api.citrixcloud.jp

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 值區建立值區
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 citrix-monitor-logs)。
  3. 按照這份使用者指南建立使用者:建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋並選取 AmazonS3FullAccess 政策。
  18. 點選「下一步」
  19. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::citrix-monitor-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::citrix-monitor-logs/citrix_monitor/state.json"
        }
      ]
    }
    
    • 如果您輸入的值區名稱不同,請替換 citrix-monitor-logs
  3. 依序點選「Next」>「Create policy」

  4. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  5. 附加新建立的政策和 AWSLambdaBasicExecutionRole 代管政策。

  6. 為角色命名 CitrixMonitorLambdaRole,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 CitrixMonitorCollector
    執行階段 Python 3.13
    架構 x86_64
    執行角色 CitrixMonitorLambdaRole
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (CitrixMonitorCollector.py):

    import os
    import json
    import uuid
    import datetime
    import urllib.parse
    import urllib.request
    import urllib.error
    import boto3
    import botocore
    
    # Citrix Cloud OAuth2 endpoint template
    TOKEN_URL_TMPL = "{api_base}/cctrustoauth2/{customerid}/tokens/clients"
    DEFAULT_API_BASE = "https://api.cloud.com"
    MONITOR_BASE_PATH = "/monitorodata"
    
    s3 = boto3.client("s3")
    
    def http_post_form(url, data_dict):
        """POST form data to get authentication token."""
        data = urllib.parse.urlencode(data_dict).encode("utf-8")
        req = urllib.request.Request(url, data=data, headers={
            "Accept": "application/json",
            "Content-Type": "application/x-www-form-urlencoded",
        })
        with urllib.request.urlopen(req, timeout=45) as resp:
            return json.loads(resp.read().decode("utf-8"))
    
    def http_get_json(url, headers):
        """GET JSON data from API endpoint."""
        req = urllib.request.Request(url, headers=headers)
        with urllib.request.urlopen(req, timeout=90) as resp:
            return json.loads(resp.read().decode("utf-8"))
    
    def get_citrix_token(api_base, customer_id, client_id, client_secret):
        """Get Citrix Cloud authentication token."""
        url = TOKEN_URL_TMPL.format(api_base=api_base.rstrip("/"), customerid=customer_id)
        payload = {
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
        }
        response = http_post_form(url, payload)
        return response["access_token"]
    
    def build_entity_url(api_base, entity, filter_query=None, top=None):
        """Build OData URL with optional filter and pagination."""
        base = api_base.rstrip("/") + MONITOR_BASE_PATH + "/" + entity
        params = []
        if filter_query:
            params.append("$filter=" + urllib.parse.quote(filter_query, safe="()= ':-TZ0123456789"))
        if top:
            params.append("$top=" + str(top))
        return base + ("?" + "&".join(params) if params else "")
    
    def fetch_entity_rows(entity, start_iso=None, end_iso=None, page_size=1000, headers=None, api_base=DEFAULT_API_BASE):
        """Fetch entity data with optional time filtering and pagination."""
        # Try ModifiedDate filter if timestamps are provided
        first_url = None
        if start_iso and end_iso:
            filter_query = f"(ModifiedDate ge {start_iso} and ModifiedDate lt {end_iso})"
            first_url = build_entity_url(api_base, entity, filter_query, page_size)
        else:
            first_url = build_entity_url(api_base, entity, None, page_size)
    
        url = first_url
        while url:
            try:
                data = http_get_json(url, headers)
                items = data.get("value", [])
                for item in items:
                    yield item
                url = data.get("@odata.nextLink")
            except urllib.error.HTTPError as e:
                # If ModifiedDate filtering fails, fall back to unfiltered query
                if e.code == 400 and start_iso and end_iso:
                    print(f"ModifiedDate filter not supported for {entity}, falling back to unfiltered query")
                    url = build_entity_url(api_base, entity, None, page_size)
                    continue
                else:
                    raise
    
    def read_state_file(bucket, key):
        """Read the last processed timestamp from S3 state file."""
        try:
            obj = s3.get_object(Bucket=bucket, Key=key)
            content = obj["Body"].read().decode("utf-8")
            state = json.loads(content)
            timestamp_str = state.get("last_hour_utc")
            if timestamp_str:
                return datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).replace(tzinfo=None)
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "NoSuchKey":
                return None
            raise
        return None
    
    def write_state_file(bucket, key, dt_utc):
        """Write the current processed timestamp to S3 state file."""
        state = {"last_hour_utc": dt_utc.isoformat() + "Z"}
        s3.put_object(
            Bucket=bucket, 
            Key=key, 
            Body=json.dumps(state, separators=(",", ":")), 
            ContentType="application/json"
        )
    
    def write_ndjson_to_s3(bucket, key, rows):
        """Write rows as NDJSON to S3."""
        body_lines = []
        for row in rows:
            json_line = json.dumps(row, separators=(",", ":"), ensure_ascii=False)
            body_lines.append(json_line)
    
        body = ("n".join(body_lines) + "n").encode("utf-8")
        s3.put_object(
            Bucket=bucket, 
            Key=key, 
            Body=body, 
            ContentType="application/x-ndjson"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler function."""
    
        # Environment variables
        bucket = os.environ["S3_BUCKET"]
        prefix = os.environ.get("S3_PREFIX", "citrix_monitor").strip("/")
        state_key = os.environ.get("STATE_KEY") or f"{prefix}/state.json"
        customer_id = os.environ["CITRIX_CUSTOMER_ID"]
        client_id = os.environ["CITRIX_CLIENT_ID"]
        client_secret = os.environ["CITRIX_CLIENT_SECRET"]
        api_base = os.environ.get("API_BASE", DEFAULT_API_BASE)
        entities = [e.strip() for e in os.environ.get("ENTITIES", "Machines,Sessions,Connections,Applications,Users").split(",") if e.strip()]
        page_size = int(os.environ.get("PAGE_SIZE", "1000"))
        lookback_minutes = int(os.environ.get("LOOKBACK_MINUTES", "75"))
        use_time_filter = os.environ.get("USE_TIME_FILTER", "true").lower() == "true"
    
        # Time window calculation
        now = datetime.datetime.utcnow()
        fallback_hour = (now - datetime.timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0)
    
        last_processed = read_state_file(bucket, state_key)
        target_hour = (last_processed + datetime.timedelta(hours=1)) if last_processed else fallback_hour
        start_iso = target_hour.isoformat() + "Z"
        end_iso = (target_hour + datetime.timedelta(hours=1)).isoformat() + "Z"
    
        # Authentication
        token = get_citrix_token(api_base, customer_id, client_id, client_secret)
        headers = {
            "Authorization": f"CWSAuth bearer={token}",
            "Citrix-CustomerId": customer_id,
            "Accept": "application/json",
            "Accept-Encoding": "gzip, deflate, br",
            "User-Agent": "citrix-monitor-s3-collector/1.0"
        }
    
        total_records = 0
    
        # Process each entity type
        for entity in entities:
            rows_batch = []
            try:
                entity_generator = fetch_entity_rows(
                    entity=entity,
                    start_iso=start_iso if use_time_filter else None,
                    end_iso=end_iso if use_time_filter else None,
                    page_size=page_size,
                    headers=headers,
                    api_base=api_base
                )
    
                for row in entity_generator:
                    # Store raw Citrix data directly for proper parser recognition
                    rows_batch.append(row)
    
                    # Write in batches to avoid memory issues
                    if len(rows_batch) >= 1000:
                        s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson"
                        write_ndjson_to_s3(bucket, s3_key, rows_batch)
                        total_records += len(rows_batch)
                        rows_batch = []
    
            except Exception as ex:
                print(f"Error processing entity {entity}: {str(ex)}")
                continue
    
            # Write remaining records
            if rows_batch:
                s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson"
                write_ndjson_to_s3(bucket, s3_key, rows_batch)
                total_records += len(rows_batch)
    
        # Update state file
        write_state_file(bucket, state_key, target_hour)
    
        return {
            "statusCode": 200,
            "body": json.dumps({
                "success": True, 
                "hour_collected": start_iso, 
                "records_written": total_records, 
                "entities_processed": entities
            })
        }
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下列環境變數,並將 換成您的值:

    範例值
    S3_BUCKET citrix-monitor-logs
    S3_PREFIX citrix_monitor
    STATE_KEY citrix_monitor/state.json
    CITRIX_CLIENT_ID your-client-id
    CITRIX_CLIENT_SECRET your-client-secret
    CITRIX_CUSTOMER_ID your-customer-id
    API_BASE https://api.cloud.com
    ENTITIES Machines,Sessions,Connections,Applications,Users
    PAGE_SIZE 1000
    LOOKBACK_MINUTES 75
    USE_TIME_FILTER true
  8. 建立函式後,請留在該函式頁面 (或依序開啟 Lambda > Functions > CitrixMonitorCollector)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)
    • 目標:您的 Lambda 函式 CitrixMonitorCollector
    • Name (名稱):CitrixMonitorCollector-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. AWS 控制台中,依序前往「IAM」>「Users」>「Add users」
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者secops-reader
    • 存取類型存取金鑰 - 程式輔助存取
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」
  6. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::citrix-monitor-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::citrix-monitor-logs"
        }
      ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定動態饋給,擷取 Citrix Monitor Service 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Citrix Monitor Service logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Citrix Monitor」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://citrix-monitor-logs/citrix_monitor/
    • 來源刪除選項:根據偏好選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。