Citrix Analytics のログを収集する
このドキュメントでは、Amazon S3 を使用して Citrix Analytics ログを Google Security Operations に取り込む方法について説明します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Citrix Analytics for Performance テナントへの特権アクセス
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Citrix Analytics の前提条件を収集する
- Citrix Cloud Console にログインします。
- [Identity and Access Management] > [API Access] に移動します。
- [Create Client] をクリックします。
- 次の詳細をコピーして安全な場所に保存します。
- クライアント ID
- Client Secret
- お客様 ID(Citrix Cloud URL または IAM ページにあります)
- API ベース URL:
https://api.cloud.com/casodata
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
citrix-analytics-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] で [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] タブに移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::citrix-analytics-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::citrix-analytics-logs/citrix_analytics/state.json" } ] }
- 別のバケット名を入力した場合は、
citrix-analytics-logs
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
CitrixAnalyticsLambdaRole
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 CitrixAnalyticsCollector
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール CitrixAnalyticsLambdaRole
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
CitrixAnalyticsCollector.py
)を入力します。import os import json import uuid import datetime import urllib.parse import urllib.request import boto3 import botocore CITRIX_TOKEN_URL_TMPL = "https://api.cloud.com/cctrustoauth2/{customerid}/tokens/clients" DEFAULT_API_BASE = "https://api.cloud.com/casodata" s3 = boto3.client("s3") def _http_post_form(url, data_dict): """POST form data to get authentication token.""" data = urllib.parse.urlencode(data_dict).encode("utf-8") req = urllib.request.Request(url, data=data, headers={ "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", }) with urllib.request.urlopen(req, timeout=30) as response: return json.loads(response.read().decode("utf-8")) def _http_get_json(url, headers): """GET JSON data from API endpoint.""" req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=60) as response: return json.loads(response.read().decode("utf-8")) def get_citrix_token(customer_id, client_id, client_secret): """Get Citrix Cloud authentication token.""" url = CITRIX_TOKEN_URL_TMPL.format(customerid=customer_id) payload = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } token_response = _http_post_form(url, payload) return token_response["access_token"] def fetch_odata_entity(entity, when_utc, top, headers, api_base): """Fetch data from Citrix Analytics OData API with pagination.""" year = when_utc.year month = when_utc.month day = when_utc.day hour = when_utc.hour base_url = f"{api_base.rstrip('/')}/{entity}?year={year:04d}&month={month:02d}&day={day:02d}&hour={hour:02d}" skip = 0 while True: url = f"{base_url}&$top={top}&$skip={skip}" data = _http_get_json(url, headers) items = data.get("value", []) if not items: break for item in items: yield item if len(items) < top: break skip += top def read_state_file(bucket, state_key): """Read the last processed timestamp from S3 state file.""" try: obj = s3.get_object(Bucket=bucket, Key=state_key) content = obj["Body"].read().decode("utf-8") state = json.loads(content) timestamp_str = state.get("last_hour_utc") if timestamp_str: return datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).replace(tzinfo=None) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": return None raise return None def write_state_file(bucket, state_key, dt_utc): """Write the current processed timestamp to S3 state file.""" state_data = {"last_hour_utc": dt_utc.isoformat() + "Z"} s3.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data, separators=(",", ":")), ContentType="application/json" ) def write_ndjson_to_s3(bucket, key, records): """Write records as NDJSON to S3.""" body_lines = [] for record in records: json_line = json.dumps(record, separators=(",", ":"), ensure_ascii=False) body_lines.append(json_line) body = ("n".join(body_lines) + "n").encode("utf-8") s3.put_object( Bucket=bucket, Key=key, Body=body, ContentType="application/x-ndjson" ) def lambda_handler(event, context): """Main Lambda handler function.""" # Environment variables bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "").strip("/") state_key = os.environ.get("STATE_KEY") or f"{prefix}/state.json" customer_id = os.environ["CITRIX_CUSTOMER_ID"] client_id = os.environ["CITRIX_CLIENT_ID"] client_secret = os.environ["CITRIX_CLIENT_SECRET"] api_base = os.environ.get("API_BASE", DEFAULT_API_BASE) entities = [e.strip() for e in os.environ.get("ENTITIES", "sessions,machines,users").split(",") if e.strip()] top_n = int(os.environ.get("TOP_N", "1000")) lookback_minutes = int(os.environ.get("LOOKBACK_MINUTES", "75")) # Determine target hour to collect now = datetime.datetime.utcnow() fallback_target = (now - datetime.timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0) last_processed = read_state_file(bucket, state_key) if last_processed: target_hour = last_processed + datetime.timedelta(hours=1) else: target_hour = fallback_target # Get authentication token token = get_citrix_token(customer_id, client_id, client_secret) headers = { "Authorization": f"CwsAuth bearer={token}", "Citrix-CustomerId": customer_id, "Accept": "application/json", "Content-Type": "application/json", } total_records = 0 # Process each entity type for entity in entities: records = [] for row in fetch_odata_entity(entity, target_hour, top_n, headers, api_base): enriched_record = { "citrix_entity": entity, "citrix_hour_utc": target_hour.isoformat() + "Z", "collection_timestamp": datetime.datetime.utcnow().isoformat() + "Z", "raw": row } records.append(enriched_record) # Write in batches to avoid memory issues if len(records) >= 1000: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, records) total_records += len(records) records = [] # Write remaining records if records: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, records) total_records += len(records) # Update state file write_state_file(bucket, state_key, target_hour) return { "statusCode": 200, "body": json.dumps({ "success": True, "hour_collected": target_hour.isoformat() + "Z", "records_written": total_records, "entities_processed": entities }) }
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 値の例 S3_BUCKET
citrix-analytics-logs
S3_PREFIX
citrix_analytics
STATE_KEY
citrix_analytics/state.json
CITRIX_CLIENT_ID
your-client-id
CITRIX_CLIENT_SECRET
your-client-secret
API_BASE
https://api.cloud.com/casodata
CITRIX_CUSTOMER_ID
your-customer-id
ENTITIES
sessions,machines,users
TOP_N
1000
LOOKBACK_MINUTES
75
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [CitrixAnalyticsCollector] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
) - ターゲット: Lambda 関数
CitrixAnalyticsCollector
- 名前:
CitrixAnalyticsCollector-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] > [Add users] に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー:
secops-reader
- アクセスタイプ: アクセスキー - プログラムによるアクセス
- ユーザー:
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON エディタで、次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::citrix-analytics-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::citrix-analytics-logs" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Citrix Analytics のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Citrix Analytics Performance logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Citrix Analytics] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://citrix-analytics-logs/citrix_analytics/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。