Raccogliere i log di Elasticsearch

Supportato in:

Questo documento spiega come importare i log di Elasticsearch in Google Security Operations utilizzando Amazon S3. Il parser trasforma i log non elaborati in formato JSON in un modello unificato dei dati (UDM). Estrae i campi dalle strutture JSON nidificate, li mappa ai campi UDM e arricchisce i dati con il contesto pertinente per la sicurezza, come i livelli di gravità e i ruoli utente.

Prima di iniziare

  • Un'istanza Google SecOps
  • Accesso con privilegi all'amministrazione del cluster Elasticsearch
  • Accesso privilegiato ad AWS (S3, IAM, EC2)
  • Istanza EC2 o host permanente per eseguire Logstash

Recuperare i prerequisiti di Elasticsearch

  1. Accedi al tuo cluster Elasticsearch come amministratore.
  2. Verifica che il tuo abbonamento a Elasticsearch includa le funzionalità di sicurezza (necessarie per la registrazione degli audit).
  3. Prendi nota del nome e della versione del cluster Elasticsearch per riferimento.
  4. Identifica il percorso in cui verranno scritti i log di controllo (impostazione predefinita: $ES_HOME/logs/<clustername>_audit.json).

Abilita l'audit logging di Elasticsearch

  1. Su ogni nodo Elasticsearch, modifica il file di configurazione elasticsearch.yml.
  2. Aggiungi la seguente impostazione:

    xpack.security.audit.enabled: true
    
  3. Esegui un riavvio in sequenza del cluster per applicare le modifiche:

    • Disattiva l'allocazione degli shard: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • Arresta e riavvia ogni nodo uno alla volta.
    • Riattiva l'allocazione degli shard: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. Verifica che gli audit log vengano generati in <clustername>_audit.json nella directory dei log.

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, elastic-search-logs).
  3. Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca i criteri AmazonS3FullAccess.
  18. Seleziona la policy.
  19. Fai clic su Avanti.
  20. Fai clic su Aggiungi autorizzazioni.

Configura Logstash per inviare gli audit log a S3

  1. Installa Logstash su un'istanza EC2 o su un host permanente che possa accedere ai file di audit log di Elasticsearch.
  2. Installa il plug-in di output S3 se non è già presente:

    bin/logstash-plugin install logstash-output-s3
    
  3. Crea un file di configurazione Logstash (elastic-to-s3.conf):

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. Avvia Logstash con la configurazione:

    bin/logstash -f elastic-to-s3.conf
    

(Facoltativo) Crea un utente IAM con autorizzazione di sola lettura per Google SecOps

  1. Vai alla console AWS > IAM > Utenti > Aggiungi utenti.
  2. Fai clic su Add users (Aggiungi utenti).
  3. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci secops-reader.
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
  4. Fai clic su Crea utente.
  5. Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
  6. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. Imposta il nome su secops-reader-policy.

  8. Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  9. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  10. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configurare un feed in Google SecOps per importare i log Elasticsearch

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su + Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Elasticsearch Logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Elastic Search come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://elastic-search-logs/logs/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
Livello security_result.severity La logica controlla il valore del campo "Level" e lo mappa al livello di gravità UDM corrispondente:
- "INFO", "ALL", "OFF", "TRACE", "DEBUG" vengono mappati a "INFORMATIONAL".
- "WARN" è mappato su "LOW".
- "ERROR" è mappato su "ERROR".
- "FATAL" è mappato su "CRITICAL".
message.@timestamp timestamp Il timestamp viene analizzato dal campo "@timestamp" all'interno del campo "message" del log non elaborato, utilizzando il formato "yyyy-MM-ddTHH:mm:ss.SSS".
message.action security_result.action_details Il valore viene estratto dal campo "action" all'interno del campo "message" del log non elaborato.
message.event.action security_result.summary Il valore viene estratto dal campo "event.action" all'interno del campo "message" del log non elaborato.
message.event.type metadata.product_event_type Il valore viene estratto dal campo "event.type" all'interno del campo "message" del log non elaborato.
message.host.ip target.ip Il valore viene estratto dal campo "host.ip" all'interno del campo "message" del log non elaborato.
message.host.name target.hostname Il valore viene estratto dal campo "host.name" all'interno del campo "message" del log non elaborato.
message.indices target.labels.value Il valore viene estratto dal campo "indices" all'interno del campo "message" del log non elaborato.
message.mrId target.hostname Il valore viene estratto dal campo "mrId" all'interno del campo "message" del log non elaborato.
message.node.id principal.asset.product_object_id Il valore viene estratto dal campo "node.id" all'interno del campo "message" del log non elaborato.
message.node.name target.asset.hostname Il valore viene estratto dal campo "node.name" all'interno del campo "message" del log non elaborato.
message.origin.address principal.ip L'indirizzo IP viene estratto dal campo "origin.address" all'interno del campo "message" del log non elaborato rimuovendo il numero di porta.
message.origin.type principal.resource.resource_subtype Il valore viene estratto dal campo "origin.type" all'interno del campo "message" del log non elaborato.
message.properties.host_group principal.hostname Il valore viene estratto dal campo "properties.host_group" all'interno del campo "message" del log non elaborato.
message.properties.host_group target.group.group_display_name Il valore viene estratto dal campo "properties.host_group" all'interno del campo "message" del log non elaborato.
message.request.id target.resource.product_object_id Il valore viene estratto dal campo "request.id" all'interno del campo "message" del log non elaborato.
message.request.name target.resource.name Il valore viene estratto dal campo "request.name" all'interno del campo "message" del log non elaborato.
message.user.name principal.user.userid Il valore viene estratto dal campo "user.name" all'interno del campo "message" del log non elaborato.
message.user.realm principal.user.attribute.permissions.name Il valore viene estratto dal campo "user.realm" all'interno del campo "message" del log non elaborato.
message.user.roles about.user.attribute.roles.name Il valore viene estratto dal campo "user.roles" all'interno del campo "message" del log non elaborato.
metadata.event_type Valore hardcoded: "USER_RESOURCE_ACCESS"
metadata.log_type Valore hardcoded: "ELASTIC_SEARCH"
metadata.product_name Valore hardcoded: "ELASTICSEARCH"
metadata.vendor_name Valore hardcoded: "ELASTIC"
principal.port Il numero di porta viene estratto dal campo "origin.address" all'interno del campo "message" del log non elaborato.
target.labels.key Valore hardcoded: "Indice"

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.