收集 Citrix Analytics 日志
支持的语言:
Google SecOps
SIEM
本文档介绍了如何使用 Amazon S3 将 Citrix Analytics 日志注入到 Google Security Operations。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Citrix Analytics for Performance 租户的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
收集 Citrix Analytics 前提条件
- 登录 Citrix Cloud 控制台。
- 前往 Identity and Access Management > API Access。
- 点击创建客户端。
- 复制以下详细信息并将其保存在安全的位置:
- 客户端 ID (Client ID)
- 客户端密钥 (Client Secret)
- 客户 ID(位于 Citrix Cloud 网址或 IAM 页面中)
- API 基本网址:
https://api.cloud.com/casodata
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
citrix-analytics-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::citrix-analytics-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::citrix-analytics-logs/citrix_analytics/state.json" } ] }
- 如果您输入了其他存储桶名称,请替换
citrix-analytics-logs
。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
CitrixAnalyticsLambdaRole
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 CitrixAnalyticsCollector
运行时 Python 3.13 架构 x86_64 执行角色 CitrixAnalyticsLambdaRole
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
CitrixAnalyticsCollector.py
):import os import json import uuid import datetime import urllib.parse import urllib.request import boto3 import botocore CITRIX_TOKEN_URL_TMPL = "https://api.cloud.com/cctrustoauth2/{customerid}/tokens/clients" DEFAULT_API_BASE = "https://api.cloud.com/casodata" s3 = boto3.client("s3") def _http_post_form(url, data_dict): """POST form data to get authentication token.""" data = urllib.parse.urlencode(data_dict).encode("utf-8") req = urllib.request.Request(url, data=data, headers={ "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", }) with urllib.request.urlopen(req, timeout=30) as response: return json.loads(response.read().decode("utf-8")) def _http_get_json(url, headers): """GET JSON data from API endpoint.""" req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=60) as response: return json.loads(response.read().decode("utf-8")) def get_citrix_token(customer_id, client_id, client_secret): """Get Citrix Cloud authentication token.""" url = CITRIX_TOKEN_URL_TMPL.format(customerid=customer_id) payload = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } token_response = _http_post_form(url, payload) return token_response["access_token"] def fetch_odata_entity(entity, when_utc, top, headers, api_base): """Fetch data from Citrix Analytics OData API with pagination.""" year = when_utc.year month = when_utc.month day = when_utc.day hour = when_utc.hour base_url = f"{api_base.rstrip('/')}/{entity}?year={year:04d}&month={month:02d}&day={day:02d}&hour={hour:02d}" skip = 0 while True: url = f"{base_url}&$top={top}&$skip={skip}" data = _http_get_json(url, headers) items = data.get("value", []) if not items: break for item in items: yield item if len(items) < top: break skip += top def read_state_file(bucket, state_key): """Read the last processed timestamp from S3 state file.""" try: obj = s3.get_object(Bucket=bucket, Key=state_key) content = obj["Body"].read().decode("utf-8") state = json.loads(content) timestamp_str = state.get("last_hour_utc") if timestamp_str: return datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).replace(tzinfo=None) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": return None raise return None def write_state_file(bucket, state_key, dt_utc): """Write the current processed timestamp to S3 state file.""" state_data = {"last_hour_utc": dt_utc.isoformat() + "Z"} s3.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data, separators=(",", ":")), ContentType="application/json" ) def write_ndjson_to_s3(bucket, key, records): """Write records as NDJSON to S3.""" body_lines = [] for record in records: json_line = json.dumps(record, separators=(",", ":"), ensure_ascii=False) body_lines.append(json_line) body = ("n".join(body_lines) + "n").encode("utf-8") s3.put_object( Bucket=bucket, Key=key, Body=body, ContentType="application/x-ndjson" ) def lambda_handler(event, context): """Main Lambda handler function.""" # Environment variables bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "").strip("/") state_key = os.environ.get("STATE_KEY") or f"{prefix}/state.json" customer_id = os.environ["CITRIX_CUSTOMER_ID"] client_id = os.environ["CITRIX_CLIENT_ID"] client_secret = os.environ["CITRIX_CLIENT_SECRET"] api_base = os.environ.get("API_BASE", DEFAULT_API_BASE) entities = [e.strip() for e in os.environ.get("ENTITIES", "sessions,machines,users").split(",") if e.strip()] top_n = int(os.environ.get("TOP_N", "1000")) lookback_minutes = int(os.environ.get("LOOKBACK_MINUTES", "75")) # Determine target hour to collect now = datetime.datetime.utcnow() fallback_target = (now - datetime.timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0) last_processed = read_state_file(bucket, state_key) if last_processed: target_hour = last_processed + datetime.timedelta(hours=1) else: target_hour = fallback_target # Get authentication token token = get_citrix_token(customer_id, client_id, client_secret) headers = { "Authorization": f"CwsAuth bearer={token}", "Citrix-CustomerId": customer_id, "Accept": "application/json", "Content-Type": "application/json", } total_records = 0 # Process each entity type for entity in entities: records = [] for row in fetch_odata_entity(entity, target_hour, top_n, headers, api_base): enriched_record = { "citrix_entity": entity, "citrix_hour_utc": target_hour.isoformat() + "Z", "collection_timestamp": datetime.datetime.utcnow().isoformat() + "Z", "raw": row } records.append(enriched_record) # Write in batches to avoid memory issues if len(records) >= 1000: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, records) total_records += len(records) records = [] # Write remaining records if records: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, records) total_records += len(records) # Update state file write_state_file(bucket, state_key, target_hour) return { "statusCode": 200, "body": json.dumps({ "success": True, "hour_collected": target_hour.isoformat() + "Z", "records_written": total_records, "entities_processed": entities }) }
依次前往配置 > 环境变量 > 修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值:
键 示例值 S3_BUCKET
citrix-analytics-logs
S3_PREFIX
citrix_analytics
STATE_KEY
citrix_analytics/state.json
CITRIX_CLIENT_ID
your-client-id
CITRIX_CLIENT_SECRET
your-client-secret
API_BASE
https://api.cloud.com/casodata
CITRIX_CUSTOMER_ID
your-customer-id
ENTITIES
sessions,machines,users
TOP_N
1000
LOOKBACK_MINUTES
75
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > CitrixAnalyticsCollector)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度程序 > 创建计划。
- 提供以下配置详细信息:
- 周期性安排:频率 (
1 hour
) - 目标:您的 Lambda 函数
CitrixAnalyticsCollector
- 名称:
CitrixAnalyticsCollector-1h
- 周期性安排:频率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:
secops-reader
- 访问类型:访问密钥 - 以程序化方式访问
- 用户:
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::citrix-analytics-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::citrix-analytics-logs" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Citrix Analytics 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Citrix Analytics Performance logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Citrix Analytics 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://citrix-analytics-logs/citrix_analytics/
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。