Collecter les journaux de surveillance continue Qualys
Ce code d'analyseur Logstash extrait d'abord des champs tels que l'adresse IP source, l'utilisateur, la méthode et le protocole d'application à partir des messages de journal bruts à l'aide de modèles Grok. Il mappe ensuite des champs spécifiques des données de journal brutes sur leurs champs correspondants dans le modèle de données unifié (UDM), effectue des conversions de types de données et enrichit les données avec des libellés et des métadonnées supplémentaires, avant de structurer enfin la sortie au format UDM souhaité.
Avant de commencer
- Assurez-vous de disposer d'une instance Google Security Operations.
- Assurez-vous de disposer d'un accès privilégié à Google Cloud.
- Assurez-vous de disposer d'un accès privilégié à Qualys.
Activez les API requises:
- Connectez-vous à la console Google Cloud .
- Accédez à API et services > Bibliothèque.
- Recherchez les API suivantes et activez-les :
- API Cloud Functions
- API Cloud Scheduler
- Cloud Pub/Sub (obligatoire pour que Cloud Scheduler puisse appeler des fonctions)
Créer un Google Cloud bucket de stockage
- Connectez-vous à la console Google Cloud .
Accédez à la page Buckets Cloud Storage.
Cliquez sur Créer.
Configurez le bucket:
- Nom: saisissez un nom unique qui répond aux exigences de dénomination des buckets (par exemple, qualys-asset-bucket).
- Choisissez où stocker vos données: sélectionnez un emplacement.
- Choisissez une classe de stockage pour vos données: sélectionnez une classe de stockage par défaut pour le bucket ou classe automatique pour une gestion automatique des classes de stockage.
- Choisir comment contrôler l'accès aux objets: sélectionnez non pour appliquer la protection contre l'accès public, puis sélectionnez un modèle de contrôle des accès pour les objets de votre bucket.
- Classe de stockage: choisissez en fonction de vos besoins (par exemple, Standard).
Cliquez sur Créer.
Créer un compte de service Google Cloud
- Connectez-vous à la console Google Cloud .
- Accédez à IAM et administration > Comptes de service.
- Créez un compte de service.
- Attribuez-lui un nom descriptif (par exemple, qualys-user).
- Accordez au compte de service le rôle Administrateur des objets de l'espace de stockage sur le bucket GCS que vous avez créé à l'étape précédente.
- Attribuez au compte de service le rôle Demandeur Cloud Functions.
- Créez une clé SSH pour le compte de service.
- Téléchargez un fichier de clé JSON pour le compte de service. Conservez ce fichier dans un endroit sécurisé.
Facultatif: Créer un utilisateur d'API dédié dans Qualys
- Connectez-vous à la console Qualys.
- Accédez à Utilisateurs.
- Cliquez sur Nouveau > Utilisateur.
- Saisissez les informations générales requises pour l'utilisateur.
- Sélectionnez l'onglet Rôle utilisateur.
- Assurez-vous que la case Accès aux API est cochée pour le rôle.
- Cliquez sur Enregistrer.
Identifier votre URL d'API Qualys spécifique
Option 1
Identifiez vos URL comme indiqué dans la section Identification de la plate-forme.
Option 2
- Connectez-vous à la console Qualys.
- Accédez à Aide > À propos.
- Faites défiler la page pour afficher ces informations sous "Centre des opérations de sécurité (SOC)".
- Copiez l'URL de l'API Qualys.
Configurer la fonction Cloud
- Accédez à Cloud Functions dans la console Google Cloud .
- Cliquez sur Créer une fonction.
Configurez la fonction:
- Nom: saisissez un nom pour votre fonction (par exemple, fetch-qualys-cm-alerts).
- Région: sélectionnez une région proche de votre bucket.
- Environnement d'exécution: Python 3.10 (ou l'environnement d'exécution de votre choix).
- Déclencheur: choisissez un déclencheur HTTP si nécessaire ou Cloud Pub/Sub pour l'exécution planifiée.
- Authentification: sécurisez votre application avec l'authentification.
- Écrivez le code à l'aide d'un éditeur intégré:
```python from google.cloud import storage import requests import base64 import json # Google Cloud Storage Configuration BUCKET_NAME = "<bucket-name>" FILE_NAME = "qualys_cm_alerts.json" # Qualys API Credentials QUALYS_USERNAME = "<qualys-username>" QUALYS_PASSWORD = "<qualys-password>" QUALYS_BASE_URL = "https://<qualys_base_url>" def fetch_cm_alerts(): """Fetch alerts from Qualys Continuous Monitoring.""" auth = base64.b64encode(f"{QUALYS_USERNAME}:{QUALYS_PASSWORD}".encode()).decode() headers = { "Authorization": f"Basic {auth}", "Content-Type": "application/xml" } payload = """ <ServiceRequest> <filters> <Criteria field="alert.date" operator="GREATER">2024-01-01</Criteria> </filters> </ServiceRequest> """ response = requests.post(f"{QUALYS_BASE_URL}/qps/rest/2.0/search/cm/alert", headers=headers, data=payload) response.raise_for_status() return response.json() def upload_to_gcs(data): """Upload data to Google Cloud Storage.""" client = storage.Client() bucket = client.get_bucket(BUCKET_NAME) blob = bucket.blob(FILE_NAME) blob.upload_from_string(json.dumps(data, indent=2), content_type="application/json") def main(request): """Cloud Function entry point.""" try: alerts = fetch_cm_alerts() upload_to_gcs(alerts) return "Qualys CM alerts uploaded to Cloud Storage successfully!" except Exception as e: return f"An error occurred: {e}", 500 ```
Une fois la configuration terminée, cliquez sur Déployer.
Configurer Cloud Scheduler
- Accédez à Cloud Scheduler dans la console Google Cloud .
- Cliquez sur Créer une tâche.
Configurez la tâche:
- Nom: saisissez un nom pour votre tâche (par exemple, trigger-fetch-qualys-cm-alerts).
- Fréquence: utilisez la syntaxe cron pour spécifier la planification (par exemple,
0 * * * *
pour une exécution toutes les heures). - Fuseau horaire: définissez votre fuseau horaire préféré.
- Type de déclencheur: sélectionnez HTTP.
- URL du déclencheur: saisissez l'URL de la fonction Cloud (disponible dans les informations sur la fonction après le déploiement).
- Méthode: sélectionnez POST.
Créez la tâche.
Configurer un flux dans Google SecOps pour ingérer les journaux de surveillance continue Qualys
- Accédez à Paramètres du SIEM > Flux.
- Cliquez sur Ajouter.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, "Journaux de surveillance continue Qualys").
- Sélectionnez Google Cloud Storage comme Type de source.
- Sélectionnez Qualys Continuous Monitoring comme Type de journal.
- Cliquez sur Suivant.
Spécifiez les valeurs des paramètres d'entrée suivants:
- URI du bucket de stockage: URI source du bucket de stockage Google Cloud .
- Un URI est: sélectionnez Single file (Fichiers uniques).
- Options de suppression de la source: sélectionnez l'option de suppression en fonction de vos préférences.
- Espace de noms des éléments: espace de noms des éléments.
- Libellés d'ingestion: libellé à appliquer aux événements de ce flux.
Cliquez sur Suivant.
Vérifiez la configuration de votre nouveau flux dans l'écran Finaliser, puis cliquez sur Envoyer.
Tableau de mappage UDM
Champ de journal | Mappage UDM | Logique |
---|---|---|
Alert.alertInfo.appVersion | metadata.product_version | Mappé directement à partir de Alert.alertInfo.appVersion |
Alert.alertInfo.operatingSystem | principal.platform_version | Mappé directement à partir de Alert.alertInfo.operatingSystem |
Alert.alertInfo.port | additional.fields.value.string_value | Mappé directement à partir de Alert.alertInfo.port et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "Port d'alerte" |
Alert.alertInfo.protocol | network.ip_protocol | Mappé directement à partir de Alert.alertInfo.protocol |
Alert.alertInfo.sslIssuer | network.tls.client.certificate.issuer | Mappé directement à partir de Alert.alertInfo.sslIssuer |
Alert.alertInfo.sslName | additional.fields.value.string_value | Mappé directement à partir de Alert.alertInfo.sslName et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "Nom SSL" |
Alert.alertInfo.sslOrg | additional.fields.value.string_value | Mappé directement à partir de Alert.alertInfo.sslOrg et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "SSL Org" |
Alert.alertInfo.ticketId | additional.fields.value.string_value | Mappé directement à partir de Alert.alertInfo.ticketId et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "ID de demande" |
Alert.alertInfo.vpeConfidence | additional.fields.value.string_value | Mappé directement à partir de Alert.alertInfo.vpeConfidence et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "VPE Confidence" |
Alert.alertInfo.vpeStatus | additional.fields.value.string_value | Mappé directement à partir de Alert.alertInfo.vpeStatus et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "VPE Confidence" |
Alert.eventType | additional.fields.value.string_value | Mappé directement à partir de Alert.eventType et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "Type d'événement" |
Alert.hostname | principal.hostname | Mappé directement à partir de Alert.hostname |
Alert.id | security_result.threat_id | Mappé directement à partir de Alert.id |
Alert.ipAddress | principal.ip | Mappé directement à partir de Alert.ipAddress |
Alert.profile.id | additional.fields.value.string_value | Mappé directement à partir de Alert.profile.id et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "ID de profil" |
Alert.profile.title | additional.fields.value.string_value | Mappé directement à partir de Alert.profile.title et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "Titre du profil" |
Alert.qid | vulnerability.name | Mappé en tant que "QID: Alert.qid |
Alert.source | additional.fields.value.string_value | Mappé directement à partir de Alert.source et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "Source d'alerte" |
Alert.triggerUuid | metadata.product_log_id | Mappé directement à partir de Alert.triggerUuid |
Alert.vulnCategory | additional.fields.value.string_value | Mappé directement à partir de Alert.vulnCategory et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "Catégorie de faille" |
Alert.vulnSeverity | vulnerability.severity | Mappage en fonction de la valeur de Alert.vulnSeverity : 1 à 3: LOW, 4 à 6: MEDIUM, 7 à 8: HIGH |
Alert.vulnTitle | vulnerability.description | Mappé directement à partir de Alert.vulnTitle |
Alert.vulnType | additional.fields.value.string_value | Mappé directement à partir de Alert.vulnType et ajouté en tant que paire clé-valeur dans additional.fields avec la clé "Type de faille" |
Hôte | principal.ip | Analyse de la ligne de journal "Host: |
edr.client.ip_addresses | Copiés depuis l'appareil principal.ip |
|
edr.client.hostname | Copiés depuis l'appareil principal.hostname |
|
edr.raw_event_name | Défini sur "STATUS_UPDATE" si Alert.ipAddress , Alert.hostname ou src_ip sont présents, sinon défini sur "GENERIC_EVENT" |
|
metadata.event_timestamp | Extrait des champs Alert.eventDate ou timestamp . Alert.eventDate est prioritaire s'il existe, sinon timestamp est utilisé. Le code temporel est converti au format UTC. |
|
metadata.event_type | Même logique que pour edr.raw_event_name |
|
metadata.log_type | Défini sur "QUALYS_CONTINUOUS_MONITORING" | |
metadata.product_name | Défini sur "QUALYS_CONTINUOUS_MONITORING" | |
metadata.vendor_name | Défini sur "QUALYS_CONTINUOUS_MONITORING" | |
network.application_protocol | Analyse de la ligne de journal " |
|
network.http.method | Analyse de la ligne de journal " |
|
timestamp | event.timestamp | Extrait des champs Alert.eventDate ou timestamp . Alert.eventDate est prioritaire s'il existe, sinon timestamp est utilisé. Le code temporel est converti au format UTC. |
Modifications
2022-08-30
- Analyseur nouvellement créé.