收集 Qualys Continuous Monitoring 日志
支持的平台:
Google SecOps
SIEM
此 Logstash 解析器代码首先使用 Grok 模式从原始日志消息中提取来源 IP、用户、方法和应用协议等字段。然后,它会将原始日志数据中的特定字段映射到统一数据模型 (UDM) 中的对应字段,执行数据类型转换,并使用其他标签和元数据丰富数据,最后以所需的 UDM 格式构建输出。
准备工作
- 确保您有一个 Google Security Operations 实例。
- 确保您拥有对 Google Cloud的特权访问权限。
- 确保您拥有对 Qualys 的特权访问权限。
启用必需的 API:
- 登录 Google Cloud 控制台。
- 依次前往 API 和服务 > 库。
- 搜索并启用以下 API:
- Cloud Functions API
- Cloud Scheduler API
- Cloud Pub/Sub(Cloud Scheduler 必须具备此服务才能调用函数)
创建 Google Cloud 存储分区
- 登录 Google Cloud 控制台。
前往 Cloud Storage 存储分区页面。
点击创建。
配置存储分区:
- 名称:输入符合存储分区名称要求的唯一名称(例如 qualys-asset-bucket)。
- 选择数据存储位置:选择一个位置。
- 为数据选择一个存储类别:为存储分区选择默认存储类别,或者选择 Autoclass 以自动管理存储类别。
- 选择如何控制对象的访问权限:选择否以强制执行禁止公开访问,然后为存储分区对象选择访问权限控制模型。
- 存储类别:根据您的需求进行选择(例如标准)。
点击创建。
创建 Google Cloud 服务账号
- 登录 Google Cloud 控制台。
- 依次前往 IAM 和管理 > 服务账号。
- 创建新的服务账号。
- 为其指定一个描述性名称(例如 qualys-user)。
- 向服务账号授予您在上一步中创建的 GCS 存储分区的 Storage Object Admin 角色。
- 向服务账号授予 Cloud Functions Invoker 角色。
- 为服务账号创建 SSH 密钥。
- 下载服务账号的 JSON 密钥文件。请妥善保管此文件。
可选:在 Qualys 中创建专用 API 用户
- 登录 Qualys 控制台。
- 前往用户。
- 依次点击新建 > 用户。
- 输入用户所需的一般信息。
- 选择用户角色标签页。
- 确保该角色的 API 访问权限复选框处于选中状态。
- 点击保存。
确定您的具体 Qualys API 网址
选项 1
按照平台识别中所述的方法识别您的网址。
选项 2
- 登录 Qualys 控制台。
- 依次前往帮助 > 关于。
- 滚动到“安全运维中心 (SOC)”下方查看此信息。
- 复制 Qualys API 网址。
配置 Cloud Functions 函数
- 在 Google Cloud 控制台中,前往 Cloud Functions。
- 点击创建函数。
配置函数:
- 名称:为函数输入一个名称(例如 fetch-qualys-cm-alerts)。
- 区域:选择靠近您存储分区的区域。
- 运行时:Python 3.10(或首选运行时)。
- 触发器:根据需要选择 HTTP 触发器,或选择 Cloud Pub/Sub 以进行定期执行。
- 身份验证:通过身份验证实现安全。
- 使用内嵌编辑器编写代码:
```python from google.cloud import storage import requests import base64 import json # Google Cloud Storage Configuration BUCKET_NAME = "<bucket-name>" FILE_NAME = "qualys_cm_alerts.json" # Qualys API Credentials QUALYS_USERNAME = "<qualys-username>" QUALYS_PASSWORD = "<qualys-password>" QUALYS_BASE_URL = "https://<qualys_base_url>" def fetch_cm_alerts(): """Fetch alerts from Qualys Continuous Monitoring.""" auth = base64.b64encode(f"{QUALYS_USERNAME}:{QUALYS_PASSWORD}".encode()).decode() headers = { "Authorization": f"Basic {auth}", "Content-Type": "application/xml" } payload = """ <ServiceRequest> <filters> <Criteria field="alert.date" operator="GREATER">2024-01-01</Criteria> </filters> </ServiceRequest> """ response = requests.post(f"{QUALYS_BASE_URL}/qps/rest/2.0/search/cm/alert", headers=headers, data=payload) response.raise_for_status() return response.json() def upload_to_gcs(data): """Upload data to Google Cloud Storage.""" client = storage.Client() bucket = client.get_bucket(BUCKET_NAME) blob = bucket.blob(FILE_NAME) blob.upload_from_string(json.dumps(data, indent=2), content_type="application/json") def main(request): """Cloud Function entry point.""" try: alerts = fetch_cm_alerts() upload_to_gcs(alerts) return "Qualys CM alerts uploaded to Cloud Storage successfully!" except Exception as e: return f"An error occurred: {e}", 500 ```
完成配置后,点击部署。
配置 Cloud Scheduler
- 在 Google Cloud 控制台中,前往 Cloud Scheduler。
- 点击创建作业。
配置作业:
- 名称:输入作业的名称(例如 trigger-fetch-qualys-cm-alerts)。
- 频率:使用 cron 语法指定时间表(例如,
0 * * * *
表示每小时运行一次)。 - 时区:设置您的首选时区。
- 触发器类型:选择 HTTP。
- 触发器网址:输入 Cloud Functions 函数的网址(可在部署后的函数详情中找到)。
- 方法:选择 POST。
创建作业。
在 Google SecOps 中配置 Feed 以提取 Qualys 持续监控日志
- 依次前往 SIEM 设置 > Feed。
- 点击新增。
- 在Feed 名称字段中,输入 Feed 的名称(例如 Qualys 持续监控日志)。
- 选择 Google Cloud Storage 作为来源类型。
- 选择 Qualys 持续监控作为日志类型。
- 点击下一步。
为以下输入参数指定值:
- 存储分区 URI: Google Cloud 存储分区来源 URI。
- URI 是:选择单个文件。
- 来源删除选项:根据您的偏好选择删除选项。
- 资源命名空间:资源命名空间。
- 提取标签:要应用于此 Feed 中的事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
Alert.alertInfo.appVersion | metadata.product_version | 直接从 Alert.alertInfo.appVersion 映射 |
Alert.alertInfo.operatingSystem | principal.platform_version | 直接从 Alert.alertInfo.operatingSystem 映射 |
Alert.alertInfo.port | additional.fields.value.string_value | 直接从 Alert.alertInfo.port 映射,并在 additional.fields 中作为键值对添加,键为“Alert port”(提醒端口) |
Alert.alertInfo.protocol | network.ip_protocol | 直接从 Alert.alertInfo.protocol 映射 |
Alert.alertInfo.sslIssuer | network.tls.client.certificate.issuer | 直接从 Alert.alertInfo.sslIssuer 映射 |
Alert.alertInfo.sslName | additional.fields.value.string_value | 直接从 Alert.alertInfo.sslName 映射,并作为键值对添加到 additional.fields 中,键为“SSL 名称” |
Alert.alertInfo.sslOrg | additional.fields.value.string_value | 直接从 Alert.alertInfo.sslOrg 映射,并在 additional.fields 中作为键值对添加,键为“SSL Org” |
Alert.alertInfo.ticketId | additional.fields.value.string_value | 直接从 Alert.alertInfo.ticketId 映射,并作为键值对添加到 additional.fields 中,键为“Ticket Id” |
Alert.alertInfo.vpeConfidence | additional.fields.value.string_value | 直接从 Alert.alertInfo.vpeConfidence 映射,并作为键值对添加到 additional.fields 中,键为“VPE 置信度” |
Alert.alertInfo.vpeStatus | additional.fields.value.string_value | 直接从 Alert.alertInfo.vpeStatus 映射,并作为键值对添加到 additional.fields 中,键为“VPE 置信度” |
Alert.eventType | additional.fields.value.string_value | 直接从 Alert.eventType 映射,并在 additional.fields 中作为键值对添加(键为“事件类型”) |
Alert.hostname | principal.hostname | 直接从 Alert.hostname 映射 |
Alert.id | security_result.threat_id | 直接从 Alert.id 映射 |
Alert.ipAddress | principal.ip | 直接从 Alert.ipAddress 映射 |
Alert.profile.id | additional.fields.value.string_value | 直接从 Alert.profile.id 映射,并在 additional.fields 中以键值对的形式添加(键为“Profile Id”) |
Alert.profile.title | additional.fields.value.string_value | 直接从 Alert.profile.title 映射,并作为键值对添加到 additional.fields 中,键为“Profile Title”(个人资料标题) |
Alert.qid | vulnerability.name | 从 Alert.qid 映射为“QID: |
Alert.source | additional.fields.value.string_value | 直接从 Alert.source 映射,并在 additional.fields 中作为键值对添加,键为“Alert Source”(提醒来源) |
Alert.triggerUuid | metadata.product_log_id | 直接从 Alert.triggerUuid 映射 |
Alert.vulnCategory | additional.fields.value.string_value | 直接从 Alert.vulnCategory 映射,并在 additional.fields 中作为键值对添加,键为“漏洞类别” |
Alert.vulnSeverity | vulnerability.severity | 根据 Alert.vulnSeverity 的值进行映射:1-3:低、4-6:中、7-8:高 |
Alert.vulnTitle | vulnerability.description | 直接从 Alert.vulnTitle 映射 |
Alert.vulnType | additional.fields.value.string_value | 直接从 Alert.vulnType 映射,并在 additional.fields 中以键值对的形式添加,键为“Vulnerability Type”(漏洞类型) |
主机 | principal.ip | 从日志行“Host: |
edr.client.ip_addresses | 从 principal.ip 复制的联系人 |
|
edr.client.hostname | 从 principal.hostname 复制的联系人 |
|
edr.raw_event_name | 如果存在 Alert.ipAddress 、Alert.hostname 或 src_ip ,则设置为“STATUS_UPDATE”,否则设置为“GENERIC_EVENT” |
|
metadata.event_timestamp | 从 Alert.eventDate 或 timestamp 字段中提取。系统会优先使用 Alert.eventDate (如果存在),否则使用 timestamp 。时间戳会转换为世界协调时间 (UTC)。 |
|
metadata.event_type | 与 edr.raw_event_name 相同的逻辑 |
|
metadata.log_type | 设置为“QUALYS_CONTINUOUS_MONITORING” | |
metadata.product_name | 设置为“QUALYS_CONTINUOUS_MONITORING” | |
metadata.vendor_name | 设置为“QUALYS_CONTINUOUS_MONITORING” | |
network.application_protocol | 从日志行“ |
|
network.http.method | 从日志行“ |
|
时间戳 | event.timestamp | 从 Alert.eventDate 或 timestamp 字段中提取。系统会优先使用 Alert.eventDate (如果存在),否则使用 timestamp 。时间戳会转换为世界协调时间 (UTC)。 |
更改
2022-08-30
- 新创建的解析器。