Recolha registos do Apache Cassandra

Compatível com:

Este documento explica como carregar registos do Apache Cassandra para o Google Security Operations usando o Bindplane. O analisador extrai campos, convertendo-os no modelo de dados unificado (UDM). Usa padrões grok para analisar a mensagem inicial e, em seguida, usa um filtro JSON para dados aninhados e realiza transformações condicionais para mapear vários campos para os respetivos equivalentes da UDM, processando diferentes níveis de registo e enriquecendo o resultado com metadados.

Antes de começar

Certifique-se de que tem os seguintes pré-requisitos:

  • Instância do Google SecOps
  • Windows 2016 ou posterior, ou anfitrião Linux com systemd
  • Se estiver a ser executado através de um proxy, as portas da firewall estão abertas
  • Acesso privilegiado a uma instância do Apache Cassandra

Obtenha o ficheiro de autenticação de carregamento do Google SecOps

  1. Inicie sessão na consola Google SecOps.
  2. Aceda a Definições do SIEM > Agentes de recolha.
  3. Transfira o ficheiro de autenticação de carregamento. Guarde o ficheiro de forma segura no sistema onde o Bindplane vai ser instalado.

Obtenha o ID de cliente do Google SecOps

  1. Inicie sessão na consola Google SecOps.
  2. Aceda a Definições do SIEM > Perfil.
  3. Copie e guarde o ID do cliente da secção Detalhes da organização.

Instale o agente do Bindplane

Instalação do Windows

  1. Abra a Linha de comandos ou o PowerShell como administrador.
  2. Execute o seguinte comando:

    msiexec /i `https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi` /quiet
    

Instalação do Linux

  1. Abra um terminal com privilégios de raiz ou sudo.
  2. Execute o seguinte comando:

    sudo sh -c `$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)` install_unix.sh
    

Recursos de instalação adicionais

Para ver opções de instalação adicionais, consulte o guia de instalação.

Configure o agente Bindplane para carregar o Syslog e enviá-lo para o Google SecOps

  1. Aceda ao ficheiro de configuração:
    • Localize o ficheiro config.yaml. Normalmente, encontra-se no diretório /etc/bindplane-agent/ no Linux ou no diretório de instalação no Windows.
    • Abra o ficheiro com um editor de texto (por exemplo, nano, vi ou Bloco de notas).
  2. Edite o ficheiro config.yaml da seguinte forma:

    receivers:
        udplog:
            # Replace the port and IP address as required
            listen_address: `0.0.0.0:514`
    
    exporters:
        chronicle/chronicle_w_labels:
            compression: gzip
            # Adjust the path to the credentials file you downloaded in Step 1
            creds: '/path/to/ingestion-authentication-file.json'
            # Replace with your actual customer ID from Step 2
            customer_id: <customer_id>
            endpoint: malachiteingestion-pa.googleapis.com
            # Add optional ingestion labels for better organization
            ingestion_labels:
                log_type: 'CASSANDRA'
                raw_log_field: body
    
    service:
        pipelines:
            logs/source0__chronicle_w_labels-0:
                receivers:
                    - udplog
                exporters:
                    - chronicle/chronicle_w_labels
    
  3. Substitua a porta e o endereço IP conforme necessário na sua infraestrutura.

  4. Substitua <customer_id> pelo ID de cliente real.

  5. Atualize /path/to/ingestion-authentication-file.json para o caminho onde o ficheiro de autenticação foi guardado na secção Obtenha o ficheiro de autenticação de carregamento do Google SecOps.

Reinicie o agente do Bindplane para aplicar as alterações

  • Para reiniciar o agente do Bindplane no Linux, execute o seguinte comando:

    sudo systemctl restart bindplane-agent
    
  • Para reiniciar o agente do Bindplane no Windows, pode usar a consola Services ou introduzir o seguinte comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configure a exportação do Syslog no Apache Cassandra

  1. Inicie sessão no anfitrião do Apache Cassandra através de SSH.
  2. Abra o ficheiro de configuração logback.xml e insira o seguinte código na linha 28:
    • Para a maioria das versões do Apache Cassandra, a localização seria $(CASSANDRA_HOME)/conf.
    • Se instalasse o pacote do Datastax Enterprise, a localização seria /etc/dse.
    • Para instalações de ficheiros TAR do DSE, a localização seria $(TARBALL_ROOT)/resources/cassandra/conf.
  3. Adicione a seguinte definição de Appender ao ficheiro logback.xml na linha 28:

    <appender name="SYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
        <syslogHost>bindplane-ip</syslogHost>
        <port>bindplane-port</port>
        <facility>LOCAL7</facility>
        <throwableExcluded>true</throwableExcluded>
        <suffixPattern>%thread:%level:%logger{36}:%msg</suffixPattern>
    </appender>
    
  4. Substitua bindplane-ip e bindplane-port pelo endereço IP e pela porta reais do agente do Bindplane.

  5. Adicione o seguinte código ao bloco do registador de raiz <root level=INFO> no ficheiro logback.xml:

    1. A localização onde esta linha é inserida depende da sua versão do Apache Cassandra:

      • Apache Cassandra 5.0.x, linha 123.
      • Apache Cassandra 4.0.x e 4.1.x, linha 115.
      • Apache Cassandra 3.11.x e 3.0.x, linha 92.
      • Datastax Enterprise (todas as versões), linha 121.
      <appender-ref ref=`SYSLOG` />
      

Tabela de mapeamento do UDM

Campo de registo Mapeamento de UDM Lógica
agent.ephemeral_id observer.labels.value Valor de agent.ephemeral_id da mensagem JSON interna.
agent.hostname observer.hostname Valor de agent.hostname da mensagem JSON interna.
agent.id observer.asset_id Concatenação de filebeat: e o valor de agent.id da mensagem JSON interna.
agent.name observer.user.userid Valor de agent.name da mensagem JSON interna.
agent.type observer.application Valor de agent.type da mensagem JSON interna.
agent.version observer.platform_version Valor de agent.version da mensagem JSON interna.
cloud.availability_zone principal.cloud.availability_zone Valor de cloud.availability_zone da mensagem JSON interna.
cloud.instance.id principal.resource.product_object_id Valor de cloud.instance.id da mensagem JSON interna.
cloud.instance.name principal.resource.name Valor de cloud.instance.name da mensagem JSON interna.
cloud.machine.type principal.resource.attribute.labels.value Valor de cloud.machine.type da mensagem JSON interna, em que o key correspondente é machine_type.
cloud.provider principal.resource.attribute.labels.value Valor de cloud.provider da mensagem JSON interna, em que o key correspondente é provider.
event_metadata._id metadata.product_log_id Valor de event_metadata._id da mensagem JSON interna.
event_metadata.version metadata.product_version Valor de event_metadata.version da mensagem JSON interna.
host.architecture target.asset.hardware.cpu_platform Valor de host.architecture da mensagem JSON interna.
host.fqdn target.administrative_domain Valor de host.fqdn da mensagem JSON interna.
host.hostname target.hostname Valor de host.hostname da mensagem JSON interna.
host.id target.asset.asset_id Concatenação de Host Id: e o valor de host.id da mensagem JSON interna.
host.ip target.asset.ip Matriz de endereços IP de host.ip na mensagem JSON interna.
host.mac target.mac Matriz de endereços MAC de host.mac na mensagem JSON interna.
host.os.kernel target.platform_patch_level Valor de host.os.kernel da mensagem JSON interna.
host.os.platform target.platform Definido como LINUX se host.os.platform for debian.
host.os.version target.platform_version Valor de host.os.version da mensagem JSON interna.
hostname principal.hostname Valor de hostname extraído do campo message através do grok.
key security_result.detection_fields.value Valor de key extraído do campo message através do grok, em que o key correspondente é key.
log.file.path principal.process.file.full_path Valor de log.file.path da mensagem JSON interna.
log_level security_result.severity Mapeado com base no valor de log_level: DEBUG, INFO, AUDIT mapeiam para INFORMATIONAL; ERROR mapeia para ERROR; WARNING mapeia para MEDIUM.
log_level security_result.severity_details Valor de log_level extraído do campo message através do grok.
log_type metadata.log_type Valor de log_type do registo não processado.
message security_result.description Descrição extraída do campo message através do grok.
message target.process.command_line Linha de comandos extraída do campo message através do grok.
now security_result.detection_fields.value Valor de now extraído do campo message através do grok, em que o key correspondente é now. Analisado a partir do campo event_time extraído do campo message através do grok. Definido como USER_RESOURCE_ACCESS se hostname e host.hostname estiverem presentes. Caso contrário, é definido como GENERIC_EVENT. Definido como CASSANDRA. Definido como CASSANDRA. Definido como ephemeral_id. Definido como VIRTUAL_MACHINE se cloud.instance.name estiver presente. Defina como key e now para os campos de deteção correspondentes.
timestamp timestamp Do campo create_time do registo não processado.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.