Collecter les journaux du service Citrix Monitor

Compatible avec :

Ce document explique comment ingérer les journaux du service Citrix Monitor dans Google Security Operations à l'aide d'Amazon S3. L'analyseur transforme les journaux bruts au format JSON en un format structuré conforme à l'UDM Google SecOps. Il extrait les champs pertinents du journal brut, les mappe aux champs UDM correspondants et enrichit les données avec un contexte supplémentaire, comme les informations sur l'utilisateur, les détails de la machine et l'activité réseau.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Instance Google SecOps
  • Accès privilégié au locataire Citrix Cloud
  • Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)

Collecter les prérequis du service Citrix Monitor (ID, clés API, ID d'organisation, jetons)

  1. Connectez-vous à la console Citrix Cloud.
  2. Accédez à Identity and Access Management> Accès à l'API.
  3. Cliquez sur Créer un client.
  4. Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
    • ID client
    • Code secret du client
    • ID client (visible dans la console Citrix Cloud)
    • URL de base de l'API :
      • Champ d'application global : https://api.cloud.com
      • Japon : https://api.citrixcloud.jp

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple, citrix-monitor-logs).
  3. Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez et sélectionnez la règle AmazonS3FullAccess.
  18. Cliquez sur Suivant.
  19. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Dans la console AWS, accédez à IAM > Stratégies > Créer une stratégie > Onglet JSON.
  2. Saisissez la règle suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::citrix-monitor-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::citrix-monitor-logs/citrix_monitor/state.json"
        }
      ]
    }
    
    • Remplacez citrix-monitor-logs si vous avez saisi un autre nom de bucket.
  3. Cliquez sur Suivant > Créer une règle.

  4. Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.

  5. Associez la stratégie que vous venez de créer et la stratégie gérée AWSLambdaBasicExecutionRole.

  6. Nommez le rôle CitrixMonitorLambdaRole, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom CitrixMonitorCollector
    Durée d'exécution Python 3.13
    Architecture x86_64
    Rôle d'exécution CitrixMonitorLambdaRole
  4. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (CitrixMonitorCollector.py) :

    import os
    import json
    import uuid
    import datetime
    import urllib.parse
    import urllib.request
    import urllib.error
    import boto3
    import botocore
    
    # Citrix Cloud OAuth2 endpoint template
    TOKEN_URL_TMPL = "{api_base}/cctrustoauth2/{customerid}/tokens/clients"
    DEFAULT_API_BASE = "https://api.cloud.com"
    MONITOR_BASE_PATH = "/monitorodata"
    
    s3 = boto3.client("s3")
    
    def http_post_form(url, data_dict):
        """POST form data to get authentication token."""
        data = urllib.parse.urlencode(data_dict).encode("utf-8")
        req = urllib.request.Request(url, data=data, headers={
            "Accept": "application/json",
            "Content-Type": "application/x-www-form-urlencoded",
        })
        with urllib.request.urlopen(req, timeout=45) as resp:
            return json.loads(resp.read().decode("utf-8"))
    
    def http_get_json(url, headers):
        """GET JSON data from API endpoint."""
        req = urllib.request.Request(url, headers=headers)
        with urllib.request.urlopen(req, timeout=90) as resp:
            return json.loads(resp.read().decode("utf-8"))
    
    def get_citrix_token(api_base, customer_id, client_id, client_secret):
        """Get Citrix Cloud authentication token."""
        url = TOKEN_URL_TMPL.format(api_base=api_base.rstrip("/"), customerid=customer_id)
        payload = {
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
        }
        response = http_post_form(url, payload)
        return response["access_token"]
    
    def build_entity_url(api_base, entity, filter_query=None, top=None):
        """Build OData URL with optional filter and pagination."""
        base = api_base.rstrip("/") + MONITOR_BASE_PATH + "/" + entity
        params = []
        if filter_query:
            params.append("$filter=" + urllib.parse.quote(filter_query, safe="()= ':-TZ0123456789"))
        if top:
            params.append("$top=" + str(top))
        return base + ("?" + "&".join(params) if params else "")
    
    def fetch_entity_rows(entity, start_iso=None, end_iso=None, page_size=1000, headers=None, api_base=DEFAULT_API_BASE):
        """Fetch entity data with optional time filtering and pagination."""
        # Try ModifiedDate filter if timestamps are provided
        first_url = None
        if start_iso and end_iso:
            filter_query = f"(ModifiedDate ge {start_iso} and ModifiedDate lt {end_iso})"
            first_url = build_entity_url(api_base, entity, filter_query, page_size)
        else:
            first_url = build_entity_url(api_base, entity, None, page_size)
    
        url = first_url
        while url:
            try:
                data = http_get_json(url, headers)
                items = data.get("value", [])
                for item in items:
                    yield item
                url = data.get("@odata.nextLink")
            except urllib.error.HTTPError as e:
                # If ModifiedDate filtering fails, fall back to unfiltered query
                if e.code == 400 and start_iso and end_iso:
                    print(f"ModifiedDate filter not supported for {entity}, falling back to unfiltered query")
                    url = build_entity_url(api_base, entity, None, page_size)
                    continue
                else:
                    raise
    
    def read_state_file(bucket, key):
        """Read the last processed timestamp from S3 state file."""
        try:
            obj = s3.get_object(Bucket=bucket, Key=key)
            content = obj["Body"].read().decode("utf-8")
            state = json.loads(content)
            timestamp_str = state.get("last_hour_utc")
            if timestamp_str:
                return datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).replace(tzinfo=None)
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "NoSuchKey":
                return None
            raise
        return None
    
    def write_state_file(bucket, key, dt_utc):
        """Write the current processed timestamp to S3 state file."""
        state = {"last_hour_utc": dt_utc.isoformat() + "Z"}
        s3.put_object(
            Bucket=bucket, 
            Key=key, 
            Body=json.dumps(state, separators=(",", ":")), 
            ContentType="application/json"
        )
    
    def write_ndjson_to_s3(bucket, key, rows):
        """Write rows as NDJSON to S3."""
        body_lines = []
        for row in rows:
            json_line = json.dumps(row, separators=(",", ":"), ensure_ascii=False)
            body_lines.append(json_line)
    
        body = ("n".join(body_lines) + "n").encode("utf-8")
        s3.put_object(
            Bucket=bucket, 
            Key=key, 
            Body=body, 
            ContentType="application/x-ndjson"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler function."""
    
        # Environment variables
        bucket = os.environ["S3_BUCKET"]
        prefix = os.environ.get("S3_PREFIX", "citrix_monitor").strip("/")
        state_key = os.environ.get("STATE_KEY") or f"{prefix}/state.json"
        customer_id = os.environ["CITRIX_CUSTOMER_ID"]
        client_id = os.environ["CITRIX_CLIENT_ID"]
        client_secret = os.environ["CITRIX_CLIENT_SECRET"]
        api_base = os.environ.get("API_BASE", DEFAULT_API_BASE)
        entities = [e.strip() for e in os.environ.get("ENTITIES", "Machines,Sessions,Connections,Applications,Users").split(",") if e.strip()]
        page_size = int(os.environ.get("PAGE_SIZE", "1000"))
        lookback_minutes = int(os.environ.get("LOOKBACK_MINUTES", "75"))
        use_time_filter = os.environ.get("USE_TIME_FILTER", "true").lower() == "true"
    
        # Time window calculation
        now = datetime.datetime.utcnow()
        fallback_hour = (now - datetime.timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0)
    
        last_processed = read_state_file(bucket, state_key)
        target_hour = (last_processed + datetime.timedelta(hours=1)) if last_processed else fallback_hour
        start_iso = target_hour.isoformat() + "Z"
        end_iso = (target_hour + datetime.timedelta(hours=1)).isoformat() + "Z"
    
        # Authentication
        token = get_citrix_token(api_base, customer_id, client_id, client_secret)
        headers = {
            "Authorization": f"CWSAuth bearer={token}",
            "Citrix-CustomerId": customer_id,
            "Accept": "application/json",
            "Accept-Encoding": "gzip, deflate, br",
            "User-Agent": "citrix-monitor-s3-collector/1.0"
        }
    
        total_records = 0
    
        # Process each entity type
        for entity in entities:
            rows_batch = []
            try:
                entity_generator = fetch_entity_rows(
                    entity=entity,
                    start_iso=start_iso if use_time_filter else None,
                    end_iso=end_iso if use_time_filter else None,
                    page_size=page_size,
                    headers=headers,
                    api_base=api_base
                )
    
                for row in entity_generator:
                    # Store raw Citrix data directly for proper parser recognition
                    rows_batch.append(row)
    
                    # Write in batches to avoid memory issues
                    if len(rows_batch) >= 1000:
                        s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson"
                        write_ndjson_to_s3(bucket, s3_key, rows_batch)
                        total_records += len(rows_batch)
                        rows_batch = []
    
            except Exception as ex:
                print(f"Error processing entity {entity}: {str(ex)}")
                continue
    
            # Write remaining records
            if rows_batch:
                s3_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson"
                write_ndjson_to_s3(bucket, s3_key, rows_batch)
                total_records += len(rows_batch)
    
        # Update state file
        write_state_file(bucket, state_key, target_hour)
    
        return {
            "statusCode": 200,
            "body": json.dumps({
                "success": True, 
                "hour_collected": start_iso, 
                "records_written": total_records, 
                "entities_processed": entities
            })
        }
    
  5. Accédez à Configuration > Variables d'environnement.

  6. Cliquez sur Modifier > Ajouter une variable d'environnement.

  7. Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :

    Clé Exemple de valeur
    S3_BUCKET citrix-monitor-logs
    S3_PREFIX citrix_monitor
    STATE_KEY citrix_monitor/state.json
    CITRIX_CLIENT_ID your-client-id
    CITRIX_CLIENT_SECRET your-client-secret
    CITRIX_CUSTOMER_ID your-customer-id
    API_BASE https://api.cloud.com
    ENTITIES Machines,Sessions,Connections,Applications,Users
    PAGE_SIZE 1000
    LOOKBACK_MINUTES 75
    USE_TIME_FILTER true
  8. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Functions > CitrixMonitorCollector).

  9. Accédez à l'onglet Configuration.

  10. Dans le panneau Configuration générale, cliquez sur Modifier.

  11. Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule.
  2. Fournissez les informations de configuration suivantes :
    • Calendrier récurrent : Taux (1 hour)
    • Cible : votre fonction Lambda CitrixMonitorCollector
    • Nom : CitrixMonitorCollector-1h
  3. Cliquez sur Créer la programmation.

Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps

  1. Dans la console AWS, accédez à IAM > Utilisateurs > Ajouter des utilisateurs.
  2. Cliquez sur Add users (Ajouter des utilisateurs).
  3. Fournissez les informations de configuration suivantes :
    • Utilisateur : secops-reader
    • Type d'accès : Clé d'accès – Accès programmatique
  4. Cliquez sur Créer un utilisateur.
  5. Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
  6. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::citrix-monitor-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::citrix-monitor-logs"
        }
      ]
    }
    
  7. Définissez le nom sur secops-reader-policy.

  8. Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.

  9. Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.

  10. Téléchargez le CSV (ces valeurs sont saisies dans le flux).

Configurer un flux dans Google SecOps pour ingérer les journaux du service Citrix Monitor

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur + Ajouter un flux.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Citrix Monitor Service logs).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez Citrix Monitor comme Type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://citrix-monitor-logs/citrix_monitor/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. 180 jours par défaut.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.