Mengumpulkan log Layanan Pemantauan Citrix
Dokumen ini menjelaskan cara menyerap log Citrix Monitor Service ke Google Security Operations menggunakan Amazon S3. Parser mengubah log berformat JSON mentah menjadi format terstruktur yang sesuai dengan UDM Google SecOps. Proses ini mengekstrak kolom yang relevan dari log mentah, memetakannya ke kolom UDM yang sesuai, dan memperkaya data dengan konteks tambahan seperti informasi pengguna, detail mesin, dan aktivitas jaringan.
Sebelum memulai
Pastikan Anda memenuhi prasyarat berikut:
- Instance Google SecOps
- Akses istimewa ke tenant Citrix Cloud
- Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)
Kumpulkan prasyarat Layanan Monitor Citrix (ID, kunci API, ID organisasi, token)
- Login ke Citrix Cloud Console.
- Buka Identity and Access Management > API Access.
- Klik Buat Klien.
- Salin dan simpan detail berikut di lokasi yang aman:
- Client ID
- Rahasia Klien
- Customer ID (terlihat di konsol Citrix Cloud)
- URL Dasar API:
- Global:
https://api.cloud.com
- Jepang:
https://api.citrixcloud.jp
- Global:
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
citrix-monitor-logs
). - Buat pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk digunakan nanti.
- Klik Selesai.
- Pilih tab Izin.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung
- Telusuri dan pilih kebijakan AmazonS3FullAccess.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka IAM > Policies > Create policy > JSON tab.
Masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::citrix-monitor-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::citrix-monitor-logs/citrix_monitor/state.json" } ] }
- Ganti
citrix-monitor-logs
jika Anda memasukkan nama bucket yang berbeda.
- Ganti
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat dan kebijakan terkelola AWSLambdaBasicExecutionRole.
Beri nama peran
CitrixMonitorLambdaRole
, lalu klik Buat peran.
Buat fungsi Lambda
- Di Konsol AWS, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama CitrixMonitorCollector
Runtime Python 3.13 Arsitektur x86_64 Peran eksekusi CitrixMonitorLambdaRole
Setelah fungsi dibuat, buka tab Code, hapus stub, dan masukkan kode berikut (
CitrixMonitorCollector.py
):import os import json import uuid import datetime import urllib.parse import urllib.request import urllib.error import boto3 import botocore # Citrix Cloud OAuth2 endpoint template TOKEN_URL_TMPL = "{api_base}/cctrustoauth2/{customerid}/tokens/clients" DEFAULT_API_BASE = "https://api.cloud.com" MONITOR_BASE_PATH = "/monitorodata" s3 = boto3.client("s3") def http_post_form(url, data_dict): """POST form data to get authentication token.""" data = urllib.parse.urlencode(data_dict).encode("utf-8") req = urllib.request.Request(url, data=data, headers={ "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", }) with urllib.request.urlopen(req, timeout=45) as resp: return json.loads(resp.read().decode("utf-8")) def http_get_json(url, headers): """GET JSON data from API endpoint.""" req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=90) as resp: return json.loads(resp.read().decode("utf-8")) def get_citrix_token(api_base, customer_id, client_id, client_secret): """Get Citrix Cloud authentication token.""" url = TOKEN_URL_TMPL.format(api_base=api_base.rstrip("/"), customerid=customer_id) payload = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } response = http_post_form(url, payload) return response["access_token"] def build_entity_url(api_base, entity, filter_query=None, top=None): """Build OData URL with optional filter and pagination.""" base = api_base.rstrip("/") + MONITOR_BASE_PATH + "/" + entity params = [] if filter_query: params.append("$filter=" + urllib.parse.quote(filter_query, safe="()= ':-TZ0123456789")) if top: params.append("$top=" + str(top)) return base + ("?" + "&".join(params) if params else "") def fetch_entity_rows(entity, start_iso=None, end_iso=None, page_size=1000, headers=None, api_base=DEFAULT_API_BASE): """Fetch entity data with optional time filtering and pagination.""" # Try ModifiedDate filter if timestamps are provided first_url = None if start_iso and end_iso: filter_query = f"(ModifiedDate ge {start_iso} and ModifiedDate lt {end_iso})" first_url = build_entity_url(api_base, entity, filter_query, page_size) else: first_url = build_entity_url(api_base, entity, None, page_size) url = first_url while url: try: data = http_get_json(url, headers) items = data.get("value", []) for item in items: yield item url = data.get("@odata.nextLink") except urllib.error.HTTPError as e: # If ModifiedDate filtering fails, fall back to unfiltered query if e.code == 400 and start_iso and end_iso: print(f"ModifiedDate filter not supported for {entity}, falling back to unfiltered query") url = build_entity_url(api_base, entity, None, page_size) continue else: raise def read_state_file(bucket, key): """Read the last processed timestamp from S3 state file.""" try: obj = s3.get_object(Bucket=bucket, Key=key) content = obj["Body"].read().decode("utf-8") state = json.loads(content) timestamp_str = state.get("last_hour_utc") if timestamp_str: return datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).replace(tzinfo=None) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": return None raise return None def write_state_file(bucket, key, dt_utc): """Write the current processed timestamp to S3 state file.""" state = {"last_hour_utc": dt_utc.isoformat() + "Z"} s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, separators=(",", ":")), ContentType="application/json" ) def write_ndjson_to_s3(bucket, key, rows): """Write rows as NDJSON to S3.""" body_lines = [] for row in rows: json_line = json.dumps(row, separators=(",", ":"), ensure_ascii=False) body_lines.append(json_line) body = ("n".join(body_lines) + "n").encode("utf-8") s3.put_object( Bucket=bucket, Key=key, Body=body, ContentType="application/x-ndjson" ) def lambda_handler(event, context): """Main Lambda handler function.""" # Environment variables bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "citrix_monitor").strip("/") state_key = os.environ.get("STATE_KEY") or f"{prefix}/state.json" customer_id = os.environ["CITRIX_CUSTOMER_ID"] client_id = os.environ["CITRIX_CLIENT_ID"] client_secret = os.environ["CITRIX_CLIENT_SECRET"] api_base = os.environ.get("API_BASE", DEFAULT_API_BASE) entities = [e.strip() for e in os.environ.get("ENTITIES", "Machines,Sessions,Connections,Applications,Users").split(",") if e.strip()] page_size = int(os.environ.get("PAGE_SIZE", "1000")) lookback_minutes = int(os.environ.get("LOOKBACK_MINUTES", "75")) use_time_filter = os.environ.get("USE_TIME_FILTER", "true").lower() == "true" # Time window calculation now = datetime.datetime.utcnow() fallback_hour = (now - datetime.timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0) last_processed = read_state_file(bucket, state_key) target_hour = (last_processed + datetime.timedelta(hours=1)) if last_processed else fallback_hour start_iso = target_hour.isoformat() + "Z" end_iso = (target_hour + datetime.timedelta(hours=1)).isoformat() + "Z" # Authentication token = get_citrix_token(api_base, customer_id, client_id, client_secret) headers = { "Authorization": f"CWSAuth bearer={token}", "Citrix-CustomerId": customer_id, "Accept": "application/json", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "citrix-monitor-s3-collector/1.0" } total_records = 0 # Process each entity type for entity in entities: rows_batch = [] try: entity_generator = fetch_entity_rows( entity=entity, start_iso=start_iso if use_time_filter else None, end_iso=end_iso if use_time_filter else None, page_size=page_size, headers=headers, api_base=api_base ) for row in entity_generator: # Store raw Citrix data directly for proper parser recognition rows_batch.append(row) # Write in batches to avoid memory issues if len(rows_batch) >= 1000: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, rows_batch) total_records += len(rows_batch) rows_batch = [] except Exception as ex: print(f"Error processing entity {entity}: {str(ex)}") continue # Write remaining records if rows_batch: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, rows_batch) total_records += len(rows_batch) # Update state file write_state_file(bucket, state_key, target_hour) return { "statusCode": 200, "body": json.dumps({ "success": True, "hour_collected": start_iso, "records_written": total_records, "entities_processed": entities }) }
Buka Configuration > Environment variables.
Klik Edit > Tambahkan variabel lingkungan baru.
Masukkan variabel lingkungan berikut, ganti dengan nilai Anda:
Kunci Nilai contoh S3_BUCKET
citrix-monitor-logs
S3_PREFIX
citrix_monitor
STATE_KEY
citrix_monitor/state.json
CITRIX_CLIENT_ID
your-client-id
CITRIX_CLIENT_SECRET
your-client-secret
CITRIX_CUSTOMER_ID
your-customer-id
API_BASE
https://api.cloud.com
ENTITIES
Machines,Sessions,Connections,Applications,Users
PAGE_SIZE
1000
LOOKBACK_MINUTES
75
USE_TIME_FILTER
true
Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > CitrixMonitorCollector).
Pilih tab Configuration
Di panel General configuration, klik Edit.
Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour
) - Target: fungsi Lambda Anda
CitrixMonitorCollector
- Nama:
CitrixMonitorCollector-1h
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Di Konsol AWS, buka IAM > Pengguna > Tambahkan pengguna.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna:
secops-reader
- Jenis akses: Kunci akses — Akses terprogram
- Pengguna:
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
Di editor JSON, masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::citrix-monitor-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::citrix-monitor-logs" } ] }
Tetapkan nama ke
secops-reader-policy
.Buka Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.
Buka Kredensial keamanan > Kunci akses > Buat kunci akses.
Download CSV (nilai ini dimasukkan ke dalam feed).
Mengonfigurasi feed di Google SecOps untuk memproses log Layanan Monitor Citrix
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
Citrix Monitor Service logs
). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Citrix Monitor sebagai Jenis log.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://citrix-monitor-logs/citrix_monitor/
- Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Default 180 Hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.