Coletar registros de monitoramento contínuo do Qualys
Esse código de analisador do Logstash extrai campos como IP de origem, usuário, método e protocolo do aplicativo de mensagens de registro bruto usando padrões grok. Em seguida, ele mapeia campos específicos dos dados brutos para os campos correspondentes no modelo de dados unificado (UDM, na sigla em inglês), realiza conversões de tipo de dados e enriquece os dados com rótulos e metadados adicionais antes de estruturar a saída no formato UDM desejado.
Antes de começar
- Verifique se você tem uma instância do Google Security Operations.
- Verifique se você tem acesso privilegiado a Google Cloud.
- Verifique se você tem acesso privilegiado aos Qualys.
Ative as APIs obrigatórias:
- Faça login no console do Google Cloud .
- Acesse APIs e serviços > Biblioteca.
- Pesquise e ative as seguintes APIs:
- API Cloud Functions
- API Cloud Scheduler
- Cloud Pub/Sub (necessário para que o Cloud Scheduler invoque funções)
Criar um bucket do Google Cloud Storage
- Faça login no console do Google Cloud .
Acesse a página Buckets do Cloud Storage.
Clique em Criar.
Configure o bucket:
- Nome: insira um nome exclusivo que atenda aos requisitos de nome de bucket (por exemplo, qualys-asset-bucket).
- Escolha onde armazenar seus dados: selecione um local.
- Escolha uma classe de armazenamento para seus dados: selecione uma classe de armazenamento padrão para o bucket ou selecione Classe automática para gerenciamento automático da classe de armazenamento.
- Escolha como controlar o acesso a objetos: selecione não para aplicar a prevenção de acesso público e selecione um modelo de controle de acesso para os objetos do bucket.
- Classe de armazenamento: escolha com base nas suas necessidades, por exemplo, Padrão.
Clique em Criar.
Criar uma conta de serviço do Google Cloud
- Faça login no console do Google Cloud .
- Acesse IAM e administrador > Contas de serviço.
- Crie uma nova conta de serviço.
- Dê um nome descritivo (por exemplo, qualys-user).
- Conceda à conta de serviço o papel de Administrador de objetos do Storage no bucket do GCS criado na etapa anterior.
- Conceda à conta de serviço o papel de Invocador do Cloud Functions.
- Crie uma chave SSH para a conta de serviço.
- Faça o download de um arquivo de chave JSON para a conta de serviço. Mantenha esse arquivo em segurança.
Opcional: crie um usuário de API dedicado no Qualys
- Faça login no console da Qualys.
- Acesse Usuários.
- Clique em Novo > Usuário.
- Insira as Informações gerais necessárias para o usuário.
- Selecione a guia Função do usuário.
- Verifique se a caixa de seleção Acesso à API está marcada.
- Clique em Salvar.
Identifique o URL específico da API Qualys
Opção 1
Identifique seus URLs conforme mencionado na identificação da plataforma.
Opção 2
- Faça login no console da Qualys.
- Acesse Ajuda > Sobre.
- Role a tela para conferir essas informações na Central de operações de segurança (SOC).
- Copie o URL da API Qualys.
Configurar a Função do Cloud
- Acesse o Cloud Functions no console Google Cloud .
- Clique em Criar função.
Configure a função:
- Nome: insira um nome para a função (por exemplo, fetch-qualys-cm-alerts).
- Região: selecione uma região próxima ao seu bucket.
- Ambiente de execução: Python 3.10 (ou o ambiente de execução de sua preferência).
- Gatilho: escolha o gatilho HTTP, se necessário, ou o Cloud Pub/Sub para execução programada.
- Autenticação: ofereça segurança com autenticação.
- Escrever o código com um editor inline:
```python from google.cloud import storage import requests import base64 import json # Google Cloud Storage Configuration BUCKET_NAME = "<bucket-name>" FILE_NAME = "qualys_cm_alerts.json" # Qualys API Credentials QUALYS_USERNAME = "<qualys-username>" QUALYS_PASSWORD = "<qualys-password>" QUALYS_BASE_URL = "https://<qualys_base_url>" def fetch_cm_alerts(): """Fetch alerts from Qualys Continuous Monitoring.""" auth = base64.b64encode(f"{QUALYS_USERNAME}:{QUALYS_PASSWORD}".encode()).decode() headers = { "Authorization": f"Basic {auth}", "Content-Type": "application/xml" } payload = """ <ServiceRequest> <filters> <Criteria field="alert.date" operator="GREATER">2024-01-01</Criteria> </filters> </ServiceRequest> """ response = requests.post(f"{QUALYS_BASE_URL}/qps/rest/2.0/search/cm/alert", headers=headers, data=payload) response.raise_for_status() return response.json() def upload_to_gcs(data): """Upload data to Google Cloud Storage.""" client = storage.Client() bucket = client.get_bucket(BUCKET_NAME) blob = bucket.blob(FILE_NAME) blob.upload_from_string(json.dumps(data, indent=2), content_type="application/json") def main(request): """Cloud Function entry point.""" try: alerts = fetch_cm_alerts() upload_to_gcs(alerts) return "Qualys CM alerts uploaded to Cloud Storage successfully!" except Exception as e: return f"An error occurred: {e}", 500 ```
Clique em Implantar depois de concluir a configuração.
Configurar o Cloud Scheduler
- Acesse o Cloud Scheduler no Google Cloud console.
- Clique em Criar job.
Configure o job:
- Nome: insira um nome para o job. Por exemplo, trigger-fetch-qualys-cm-alerts.
- Frequência: use a sintaxe cron para especificar a programação (por exemplo,
0 * * * *
para execução a cada hora). - Fuso horário: defina seu fuso horário preferido.
- Tipo de acionador: escolha HTTP.
- URL do gatilho: insira o URL da função do Cloud (encontrado nos detalhes da função após a implantação).
- Método: escolha POST.
Crie o job.
Configurar um feed no Google SecOps para processar os registros de monitoramento contínuo do Qualys
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo.
- No campo Nome do feed, insira um nome para o feed (por exemplo, Qualys Continuous Monitoring Logs).
- Selecione Google Cloud Storage como o Tipo de origem.
- Selecione Qualys Continuous Monitoring como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
- URI do bucket do Firebase Storage: o URI de origem do Google Cloud bucket do Firebase Storage.
- URI é um: selecione Arquivo único.
- Opções de exclusão da origem: selecione a opção de exclusão de acordo com sua preferência.
- Namespace de recursos: o namespace de recursos.
- Rótulos de ingestão: o rótulo a ser aplicado aos eventos desse feed.
Clique em Próxima.
Revise a configuração do novo feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
Alert.alertInfo.appVersion | metadata.product_version | Mapeado diretamente de Alert.alertInfo.appVersion |
Alert.alertInfo.operatingSystem | principal.platform_version | Mapeado diretamente de Alert.alertInfo.operatingSystem |
Alert.alertInfo.port | additional.fields.value.string_value | Mapeado diretamente de Alert.alertInfo.port e adicionado como um par de chave-valor em additional.fields com a chave "Alert port" |
Alert.alertInfo.protocol | network.ip_protocol | Mapeado diretamente de Alert.alertInfo.protocol |
Alert.alertInfo.sslIssuer | network.tls.client.certificate.issuer | Mapeado diretamente de Alert.alertInfo.sslIssuer |
Alert.alertInfo.sslName | additional.fields.value.string_value | Mapeado diretamente de Alert.alertInfo.sslName e adicionado como um par de chave-valor em additional.fields com a chave "Nome do SSL" |
Alert.alertInfo.sslOrg | additional.fields.value.string_value | Mapeado diretamente de Alert.alertInfo.sslOrg e adicionado como um par de chave-valor em additional.fields com a chave "SSL Org" |
Alert.alertInfo.ticketId | additional.fields.value.string_value | Mapeado diretamente de Alert.alertInfo.ticketId e adicionado como um par de chave-valor em additional.fields com a chave "Ticket Id" |
Alert.alertInfo.vpeConfidence | additional.fields.value.string_value | Mapeado diretamente de Alert.alertInfo.vpeConfidence e adicionado como um par de chave-valor em additional.fields com a chave "VPE Confidence" |
Alert.alertInfo.vpeStatus | additional.fields.value.string_value | Mapeado diretamente de Alert.alertInfo.vpeStatus e adicionado como um par de chave-valor em additional.fields com a chave "VPE Confidence" |
Alert.eventType | additional.fields.value.string_value | Mapeado diretamente de Alert.eventType e adicionado como um par de chave-valor em additional.fields com a chave "Tipo de evento" |
Alert.hostname | principal.hostname | Mapeado diretamente de Alert.hostname |
Alert.id | security_result.threat_id | Mapeado diretamente de Alert.id |
Alert.ipAddress | principal.ip | Mapeado diretamente de Alert.ipAddress |
Alert.profile.id | additional.fields.value.string_value | Mapeado diretamente de Alert.profile.id e adicionado como um par de chave-valor em additional.fields com a chave "ID do perfil" |
Alert.profile.title | additional.fields.value.string_value | Mapeado diretamente de Alert.profile.title e adicionado como um par de chave-valor em additional.fields com a chave "Título do perfil" |
Alert.qid | vulnerability.name | Mapeado como "QID: Alert.qid |
Alert.source | additional.fields.value.string_value | Mapeado diretamente de Alert.source e adicionado como um par de chave-valor em additional.fields com a chave "Alert Source" |
Alert.triggerUuid | metadata.product_log_id | Mapeado diretamente de Alert.triggerUuid |
Alert.vulnCategory | additional.fields.value.string_value | Mapeado diretamente de Alert.vulnCategory e adicionado como um par de chave-valor em additional.fields com a chave "Categoria de vulnerabilidade" |
Alert.vulnSeverity | vulnerability.severity | Mapeado com base no valor de Alert.vulnSeverity : 1 a 3: BAIXO, 4 a 6: MÉDIO, 7 a 8: ALTO |
Alert.vulnTitle | vulnerability.description | Mapeado diretamente de Alert.vulnTitle |
Alert.vulnType | additional.fields.value.string_value | Mapeado diretamente de Alert.vulnType e adicionado como um par de chave-valor em additional.fields com a chave "Vulnerability Type" |
Host | principal.ip | Analisado da linha de registro "Host: |
edr.client.ip_addresses | Copiados do principal.ip |
|
edr.client.hostname | Copiados do principal.hostname |
|
edr.raw_event_name | Definido como "STATUS_UPDATE" se Alert.ipAddress , Alert.hostname ou src_ip estiverem presentes. Caso contrário, será definido como "GENERIC_EVENT". |
|
metadata.event_timestamp | Extraídos dos campos Alert.eventDate ou timestamp . Alert.eventDate será priorizado se existir. Caso contrário, timestamp será usado. O carimbo de data/hora é convertido para UTC. |
|
metadata.event_type | Mesma lógica que edr.raw_event_name |
|
metadata.log_type | Definido como "QUALYS_CONTINUOUS_MONITORING" | |
metadata.product_name | Definido como "QUALYS_CONTINUOUS_MONITORING" | |
metadata.vendor_name | Definido como "QUALYS_CONTINUOUS_MONITORING" | |
network.application_protocol | Analisado da linha de registro " |
|
network.http.method | Analisado da linha de registro " |
|
timestamp | event.timestamp | Extraídos dos campos Alert.eventDate ou timestamp . Alert.eventDate será priorizado se existir. Caso contrário, timestamp será usado. O carimbo de data/hora é convertido para UTC. |
Alterações
2022-08-30
- Parser recém-criado.