Coletar registros do serviço Citrix Monitor
Este documento explica como ingerir registros do Citrix Monitor Service no Google Security Operations usando o Amazon S3. O analisador transforma registros brutos formatados em JSON em um formato estruturado de acordo com a UDM do Google SecOps. Ele extrai campos relevantes do registro bruto, mapeia-os para os campos correspondentes da UDM e enriquece os dados com contexto adicional, como informações do usuário, detalhes da máquina e atividade de rede.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Instância do Google SecOps
- Acesso privilegiado ao locatário do Citrix Cloud
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Coletar os pré-requisitos do serviço Citrix Monitor (IDs, chaves de API, IDs da organização, tokens)
- Faça login no Citrix Cloud Console (em inglês).
- Acesse Identity and Access Management > Acesso à API.
- Clique em Criar cliente.
- Copie e salve em um local seguro os seguintes detalhes:
- ID do cliente
- Client Secret
- ID do cliente (visível no console do Citrix Cloud)
- URL base da API:
- Global:
https://api.cloud.com
- Japão:
https://api.citrixcloud.jp
- Global:
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
citrix-monitor-logs
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::citrix-monitor-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::citrix-monitor-logs/citrix_monitor/state.json" } ] }
- Substitua
citrix-monitor-logs
se você tiver inserido um nome de bucket diferente.
- Substitua
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada e a política gerenciada AWSLambdaBasicExecutionRole.
Nomeie a função como
CitrixMonitorLambdaRole
e clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome CitrixMonitorCollector
Ambiente de execução Python 3.13 Arquitetura x86_64 Função de execução CitrixMonitorLambdaRole
Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
CitrixMonitorCollector.py
):import os import json import uuid import datetime import urllib.parse import urllib.request import urllib.error import boto3 import botocore # Citrix Cloud OAuth2 endpoint template TOKEN_URL_TMPL = "{api_base}/cctrustoauth2/{customerid}/tokens/clients" DEFAULT_API_BASE = "https://api.cloud.com" MONITOR_BASE_PATH = "/monitorodata" s3 = boto3.client("s3") def http_post_form(url, data_dict): """POST form data to get authentication token.""" data = urllib.parse.urlencode(data_dict).encode("utf-8") req = urllib.request.Request(url, data=data, headers={ "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", }) with urllib.request.urlopen(req, timeout=45) as resp: return json.loads(resp.read().decode("utf-8")) def http_get_json(url, headers): """GET JSON data from API endpoint.""" req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=90) as resp: return json.loads(resp.read().decode("utf-8")) def get_citrix_token(api_base, customer_id, client_id, client_secret): """Get Citrix Cloud authentication token.""" url = TOKEN_URL_TMPL.format(api_base=api_base.rstrip("/"), customerid=customer_id) payload = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } response = http_post_form(url, payload) return response["access_token"] def build_entity_url(api_base, entity, filter_query=None, top=None): """Build OData URL with optional filter and pagination.""" base = api_base.rstrip("/") + MONITOR_BASE_PATH + "/" + entity params = [] if filter_query: params.append("$filter=" + urllib.parse.quote(filter_query, safe="()= ':-TZ0123456789")) if top: params.append("$top=" + str(top)) return base + ("?" + "&".join(params) if params else "") def fetch_entity_rows(entity, start_iso=None, end_iso=None, page_size=1000, headers=None, api_base=DEFAULT_API_BASE): """Fetch entity data with optional time filtering and pagination.""" # Try ModifiedDate filter if timestamps are provided first_url = None if start_iso and end_iso: filter_query = f"(ModifiedDate ge {start_iso} and ModifiedDate lt {end_iso})" first_url = build_entity_url(api_base, entity, filter_query, page_size) else: first_url = build_entity_url(api_base, entity, None, page_size) url = first_url while url: try: data = http_get_json(url, headers) items = data.get("value", []) for item in items: yield item url = data.get("@odata.nextLink") except urllib.error.HTTPError as e: # If ModifiedDate filtering fails, fall back to unfiltered query if e.code == 400 and start_iso and end_iso: print(f"ModifiedDate filter not supported for {entity}, falling back to unfiltered query") url = build_entity_url(api_base, entity, None, page_size) continue else: raise def read_state_file(bucket, key): """Read the last processed timestamp from S3 state file.""" try: obj = s3.get_object(Bucket=bucket, Key=key) content = obj["Body"].read().decode("utf-8") state = json.loads(content) timestamp_str = state.get("last_hour_utc") if timestamp_str: return datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).replace(tzinfo=None) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": return None raise return None def write_state_file(bucket, key, dt_utc): """Write the current processed timestamp to S3 state file.""" state = {"last_hour_utc": dt_utc.isoformat() + "Z"} s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, separators=(",", ":")), ContentType="application/json" ) def write_ndjson_to_s3(bucket, key, rows): """Write rows as NDJSON to S3.""" body_lines = [] for row in rows: json_line = json.dumps(row, separators=(",", ":"), ensure_ascii=False) body_lines.append(json_line) body = ("n".join(body_lines) + "n").encode("utf-8") s3.put_object( Bucket=bucket, Key=key, Body=body, ContentType="application/x-ndjson" ) def lambda_handler(event, context): """Main Lambda handler function.""" # Environment variables bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "citrix_monitor").strip("/") state_key = os.environ.get("STATE_KEY") or f"{prefix}/state.json" customer_id = os.environ["CITRIX_CUSTOMER_ID"] client_id = os.environ["CITRIX_CLIENT_ID"] client_secret = os.environ["CITRIX_CLIENT_SECRET"] api_base = os.environ.get("API_BASE", DEFAULT_API_BASE) entities = [e.strip() for e in os.environ.get("ENTITIES", "Machines,Sessions,Connections,Applications,Users").split(",") if e.strip()] page_size = int(os.environ.get("PAGE_SIZE", "1000")) lookback_minutes = int(os.environ.get("LOOKBACK_MINUTES", "75")) use_time_filter = os.environ.get("USE_TIME_FILTER", "true").lower() == "true" # Time window calculation now = datetime.datetime.utcnow() fallback_hour = (now - datetime.timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0) last_processed = read_state_file(bucket, state_key) target_hour = (last_processed + datetime.timedelta(hours=1)) if last_processed else fallback_hour start_iso = target_hour.isoformat() + "Z" end_iso = (target_hour + datetime.timedelta(hours=1)).isoformat() + "Z" # Authentication token = get_citrix_token(api_base, customer_id, client_id, client_secret) headers = { "Authorization": f"CWSAuth bearer={token}", "Citrix-CustomerId": customer_id, "Accept": "application/json", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "citrix-monitor-s3-collector/1.0" } total_records = 0 # Process each entity type for entity in entities: rows_batch = [] try: entity_generator = fetch_entity_rows( entity=entity, start_iso=start_iso if use_time_filter else None, end_iso=end_iso if use_time_filter else None, page_size=page_size, headers=headers, api_base=api_base ) for row in entity_generator: # Store raw Citrix data directly for proper parser recognition rows_batch.append(row) # Write in batches to avoid memory issues if len(rows_batch) >= 1000: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, rows_batch) total_records += len(rows_batch) rows_batch = [] except Exception as ex: print(f"Error processing entity {entity}: {str(ex)}") continue # Write remaining records if rows_batch: s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_s3(bucket, s3_key, rows_batch) total_records += len(rows_batch) # Update state file write_state_file(bucket, state_key, target_hour) return { "statusCode": 200, "body": json.dumps({ "success": True, "hour_collected": start_iso, "records_written": total_records, "entities_processed": entities }) }
Acesse Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Insira as seguintes variáveis de ambiente, substituindo pelos seus valores:
Chave Valor de exemplo S3_BUCKET
citrix-monitor-logs
S3_PREFIX
citrix_monitor
STATE_KEY
citrix_monitor/state.json
CITRIX_CLIENT_ID
your-client-id
CITRIX_CLIENT_SECRET
your-client-secret
CITRIX_CUSTOMER_ID
your-customer-id
API_BASE
https://api.cloud.com
ENTITIES
Machines,Sessions,Connections,Applications,Users
PAGE_SIZE
1000
LOOKBACK_MINUTES
75
USE_TIME_FILTER
true
Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > CitrixMonitorCollector.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour
) - Destino: sua função Lambda
CitrixMonitorCollector
- Nome:
CitrixMonitorCollector-1h
- Programação recorrente: Taxa (
- Clique em Criar programação.
Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários > Adicionar usuários.
- Clique em Add users.
- Informe os seguintes detalhes de configuração:
- Usuário:
secops-reader
- Tipo de acesso: Chave de acesso — Acesso programático
- Usuário:
- Clique em Criar usuário.
- Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::citrix-monitor-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::citrix-monitor-logs" } ] }
Defina o nome como
secops-reader-policy
.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros do Citrix Monitor Service
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Citrix Monitor Service logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Citrix Monitor como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://citrix-monitor-logs/citrix_monitor/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. Padrão de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.