Panoramica dell'analisi dei log

Supportato in:

Questo documento fornisce una panoramica di come Google Security Operations analizza i log non elaborati nel formato Unified Data Model (UDM).

Google Security Operations può ricevere i dati dei log provenienti dalle seguenti fonti di importazione:

  • Inoltro di Google Security Operations
  • Feed dell'API Google Security Operations
  • API Google Security Operations Ingestion
  • Partner tecnologico di terze parti

In genere, i clienti inviano i dati come log non elaborati originali. Google Security Operations identifica in modo univoco il dispositivo che ha generato i log utilizzando LogType. LogType identifica entrambi:

  • il fornitore e il dispositivo che ha generato il log, ad esempio il firewall Cisco, il server DHCP Linux o Bro DNS.
  • che converte il log non elaborato in UDM strutturato. Esiste una relazione uno a uno tra un parser e un LogType. Ogni parser converte i dati ricevuti da un singolo LogType.

Google Security Operations fornisce un insieme di analizzatori sintattici predefiniti che leggono i log non elaborati originali e generano record UDM strutturati utilizzando i dati nel log non elaborato originale. Google Security Operations gestisce questi analizzatori. I clienti possono anche definire istruzioni di mappatura dei dati personalizzate creando un parser specifico per il cliente.

Flusso di lavoro di importazione e normalizzazione

Il parser contiene istruzioni di mappatura dei dati. Definisce come vengono mappati i dati dal log non elaborato originale a uno o più campi nella struttura di dati UDM.

Se non ci sono errori di analisi, Google Security Operations crea un record strutturato UDM utilizzando i dati del log non elaborato. La procedura di conversione di un log non elaborato in un record UDM si chiama normalizzazione.

Un parser predefinito potrebbe mappare un sottoinsieme di valori di base dal log non elaborato. In genere, questi campi principali sono i più importanti per fornire approfondimenti sulla sicurezza in Google Security Operations. I valori non mappati rimangono nel log non elaborato, ma non vengono memorizzati nel record UDM.

Un cliente può anche utilizzare l'API di importazione per inviare i dati in formato UDM strutturato.

Personalizzare l'analisi dei dati importati

Google Security Operations fornisce le seguenti funzionalità che consentono ai clienti di personalizzare l'analisi dei dati sui dati di log originali in entrata.

  • Analizzatori specifici per il cliente: i clienti creano una configurazione di analizzatori personalizzati per un tipo di log specifico che soddisfa i loro requisiti specifici. Un parser specifico per il cliente sostituisce il parser predefinito per il LogType specifico. Per maggiori dettagli, vedi Gestire gli analizzatori predefiniti e personalizzati.
  • Estensioni dello scanner: i clienti possono aggiungere istruzioni di mappatura personalizzate oltre alla configurazione dello scanner predefinita. Ogni cliente può creare un proprio insieme unico di istruzioni di mappatura personalizzata. Queste istruzioni di mappatura definiscono come estrarre e trasformare campi aggiuntivi dai log non elaborati originali ai campi UDM. Un'estensione del parser non sostituisce il parser predefinito o specifico per il cliente.

Un esempio che utilizza un log del proxy web Squid

Questa sezione fornisce un esempio di log del proxy web Squid e descrive come i valori vengono mappati a un record UDM. Per la descrizione di tutti i campi nello schema UDM, consulta l'elenco dei campi del modello di dati unificato.

Il log del proxy web Squid di esempio contiene valori separati da spazi. Ogni record rappresenta un evento e memorizza i seguenti dati: timestamp, durata, cliente, codice/stato del risultato, byte trasmessi, metodo di richiesta, URL, utente, codice gerarchia e tipo di contenuto. In questo esempio, i seguenti campi vengono estratti e mappati in un record UDM: ora, client, stato del risultato, byte, metodo di richiesta e URL.

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Proxy web squid di esempio

Quando confronti queste strutture, tieni presente che solo un sottoinsieme dei dati del log originale è incluso nel record UDM. Alcuni campi sono obbligatori, mentre altri sono facoltativi. Inoltre, solo un sottoinsieme delle sezioni del record UDM contiene dati. Se il parser non mappa i dati del log originale al record UDM, non vedrai questa sezione del record UDM in Google Security Operations.

Valori dei log mappati all'UDM

La sezione metadata memorizza il timestamp dell'evento. Tieni presente che il valore è stato convertito dal formato EPOCH a RFC 3339. Questa conversione è facoltativa. Il timestamp può essere memorizzato come formato EPOCH, con preelaborazione per separare le parti di secondi e millisecondi in campi distinti.

Il campo metadata.event_type memorizza il valore NETWORK_HTTP, che è un valore enumerato che identifica il tipo di evento. Il valore di metadata.event_type determina quali campi UDM aggiuntivi sono obbligatori o facoltativi. I valori product_name e vendor_name contengono descrizioni facili da comprendere del dispositivo che ha registrato il log originale.

metadata.event_type in un record di evento UDM non corrisponde a log_type definito durante l'importazione dei dati utilizzando l'API di importazione. Questi due attributi memorizzano informazioni diverse.

La sezione network contiene i valori dell'evento log originale. In questo esempio, tieni presente che il valore dello stato del log originale è stato analizzato dal campo "result code/status" prima di essere scritto nel record UDM. Solo il valore result_code è stato incluso nel record UDM.

Valori dei log mappati all'UDM

La sezione principal memorizza le informazioni del client dal log originale. La sezione target memorizza sia l'URL completo sia l'indirizzo IP.

La sezione security_result memorizza uno dei valori dell'enum per rappresentare l'azione registrata nel log originale.

Questo è il record UDM formattato come JSON. Tieni presente che sono incluse solo le sezioni che contengono dati. Le sezioni src, observer, intermediary, about e extensions non sono incluse.

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

Passaggi all'interno delle istruzioni del parser

Le istruzioni di mappatura dei dati all'interno di un parser seguono un pattern comune, come segue:

  1. Analizza ed estrae i dati dal log originale.
  2. Manipolare i dati estratti. ad esempio l'utilizzo della logica condizionale per analizzare in modo selettivo i valori, convertire i tipi di dati, sostituire sottostringhe in un valore, convertire in lettere maiuscole o minuscole e così via.
  3. Assegna valori ai campi UDM.
  4. Mostra il record UDM mappato nella chiave @output.

Analizza ed estrae i dati dal log originale

Impostare l'istruzione di filtro

L'istruzione filter è la prima nell'insieme di istruzioni di analisi. Tutte le istruzioni di analisi aggiuntive sono contenute nell'istruzione filter.

filter {

}

Inizializza le variabili che memorizzeranno i valori estratti

All'interno dell'istruzione filter, inizializza le variabili intermedie che il parser utilizzerà per memorizzare i valori estratti dal log.

Queste variabili vengono utilizzate ogni volta che viene analizzato un singolo log. Il valore in ogni variabile intermedia verrà impostato su uno o più campi UDM più avanti nelle istruzioni di analisi.

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

Estrai i singoli valori dal log

Google Security Operations fornisce un insieme di filtri, basati su Logstash, per estrarre i campi dai file log originali. A seconda del formato del log, utilizza uno o più filtri di estrazione per estrarre tutti i dati dal log. Se la stringa è:

  • JSON nativo, la sintassi del parser è simile al filtro JSON che supporta i log in formato JSON. JSON nidificato non è supportato.
  • Formato XML, la sintassi dell'analizzatore sintattico è simile al filtro XML che supporta i log nel formato XML.
  • coppie chiave-valore, la sintassi del parser è simile al filtro Kv che supporta i messaggi con formato chiave-valore.
  • Formato CSV, la sintassi del parser è simile al filtro CSV che supporta i messaggi formattati in CSV.
  • Per tutti gli altri formati, la sintassi del parser è simile al filtro GROK con i pattern GROK integrati . Vengono utilizzate istruzioni di estrazione in stile Regex.

Google Security Operations fornisce un sottoinsieme delle funzionalità disponibili in ogni filtro. Google Security Operations fornisce anche la sintassi di mappatura dei dati personalizzati non disponibile nei filtri. Consulta il riferimento alla sintassi dell'analizzatore per una descrizione delle funzionalità supportate e delle funzioni personalizzate.

Continuando con l'esempio del log del proxy web Squid, la seguente istruzione di estrazione dei dati include una combinazione di sintassi Grok di Logstash ed espressioni regolari.

L'istruzione di estrazione seguente memorizza i valori nelle seguenti variabili intermediarie:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

Questa dichiarazione di esempio utilizza anche la parola chiave overwrite per memorizzare i valori estratti in ogni variabile. Se il processo di estrazione restituisce un errore, l'istruzione on_error imposta not_valid_log su True.

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

Manipolare e trasformare i valori estratti

Google Security Operations sfrutta le funzionalità del plug-in di filtro di mutazione di Logstash per consentire la manipolazione dei valori estratti dal log originale. Google Security Operations fornisce un sottoinsieme delle funzionalità disponibili nel plug-in. Consulta la sintassi del parser per una descrizione delle funzionalità supportate e delle funzioni personalizzate, ad esempio:

  • Trasmette i valori a un tipo di dati diverso
  • sostituisci i valori nella stringa
  • unire due array o aggiungere una stringa a un array. I valori delle stringhe vengono convertiti in un array prima dell'unione.
  • Converti in lettere minuscole o maiuscole

Questa sezione fornisce esempi di trasformazione dei dati basati sul log del proxy web Squid presentato in precedenza.

Trasforma il timestamp dell'evento

Tutti gli eventi archiviati come record UDM devono avere un timestamp. Questo esempio controlla se è stato estratto un valore per i dati dal log. Poi utilizza la funzione data di Grok per abbinare il valore al formato orario UNIX.

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

Trasforma il valore username

L'istruzione di esempio seguente converte il valore nella variabile username in minuscolo.

mutate {
   lowercase => [ "username"]
   }

Trasforma il valore action

L'esempio seguente valuta il valore nella variabile intermedia action e lo modifica in ALLOW, BLOCK o UNKNOWN_ACTION, che sono valori validi per il campo UDM action.security_result.action Il campo UDM security_result.action è un tipo enumerato che memorizza solo valori specifici.

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

Trasforma l'indirizzo IP di destinazione

L'esempio seguente verifica la presenza di un valore nella variabile intermedia tgtip. Se viene trovato, il valore viene associato a un pattern di indirizzo IP utilizzando un pattern Grok predefinito. Se si verifica un errore di corrispondenza del valore a un pattern di indirizzi IP, la funzione on_error imposta la proprietà not_valid_tgtip su True. Se la corrispondenza è riuscita, la proprietà not_valid_tgtip non è impostata.

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

Modificare il tipo di dati di returnCode e size

L'esempio seguente esegue il casting del valore nella variabile size in uinteger e il valore nella variabile returnCode in integer. Questo è obbligatorio perché la variabile size verrà salvata nel campo UDM network.received_bytes che memorizza un tipo di dati int64. La variabile returnCode verrà salvata nel campo UDM network.http.response_code che memorizza un tipo di dati int32.

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

Assegnare valori ai campi UDM in un evento

Dopo aver estratto e pre-elaborato i valori, assegnali ai campi di un record di evento UDM. A un campo UDM puoi assegnare sia valori estratti che valori statici.

Se compili event.disambiguation_key, assicurati che questo campo sia univoco per ogni evento generato per il log specificato. Se due eventi diversi hanno lo stesso disambiguation_key, si verificherà un comportamento imprevisto nel sistema.

Gli esempi di parser in questa sezione si basano sull'esempio precedente del log del proxy web Squid.

Salva il timestamp dell'evento

Per ogni record evento UDM deve essere impostato un valore per il campo metadata.event_timestamp UDM. L'esempio seguente salva il timestamp dell'evento estratto dal log nella variabile interna @timestamp. Google Security Operations salva questo valore nel metadata.event_timestamp campo UDM per impostazione predefinita.

mutate {
  rename => {
    "when" => "timestamp"
  }
}

Impostare il tipo di evento

Per ogni record evento UDM deve essere impostato un valore per il metadata.event_type campo UDM. Questo campo è di tipo enumerato. Il valore di questo campo determina quali campi UDM aggiuntivi devono essere compilati per salvare il record UDM. Il processo di analisi e normalizzazione non andrà a buon fine se uno dei campi obbligatori non contiene dati validi.

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

Salva i valori username e method utilizzando l'istruzione replace

I valori nei campi intermedi username e method sono stringhe. L'esempio seguente verifica se esiste un valore valido e, in questo caso, memorizza il valore username nel campo UDM principal.user.userid e il valore method nel campo UDM network.http.method.

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

Salva action nel campo UDM security_result.action

Nella sezione precedente, il valore nella variabile intermedia action è stato valutato e trasformato in uno dei valori standard per il campo UDM security_result.action.

Entrambi i campi UDM security_result e action memorizzano un array di elementi, il che significa che devi seguire un approccio leggermente diverso quando salvi questo valore.

Innanzitutto, salva il valore trasformato in un campo security_result.action intermediario. Il campo security_result è un campo principale del campo action.

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

Successivamente, salva il campo intermedio security_result.action nel campo UDM security_result. Il campo UDM security_result memorizza un array di elementi, pertanto il valore viene aggiunto a questo campo.

 # save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

Memorizza l'indirizzo IP di destinazione e l'indirizzo IP di origine utilizzando l'istruzione merge

Memorizza i seguenti valori nel record dell'evento UDM:

  • Valore nella variabile intermedia srcip al campo UDM principal.ip.
  • Valore nella variabile intermedia tgtip al campo UDM target.ip.

Sia i campi UDM principal.ip che target.ip memorizzano un array di elementi, pertanto i valori vengono aggiunti a ciascun campo.

I seguenti esempi mostrano approcci diversi per salvare questi valori. Durante il passaggio di trasformazione, la variabile intermedia tgtipè stata associata a un indirizzo IP utilizzando un pattern Grok predefinito. L'istruzione di esempio riportata di seguito controlla se la proprietà not_valid_tgtip è vera, indicando che non è stato possibile abbinare il valore tgtip a un pattern di indirizzi IP. Se è false, salva il valore tgtip nel campo UDM target.ip.

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

La variabile intermedia srcip non è stata trasformata. L'istruzione seguente controlla se è stato estratto un valore dal log originale e, in caso affermativo, lo salva nel campo UDM principal.ip.

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

Salvare url, returnCode e size utilizzando l'istruzione rename

L'istruzione di esempio seguente memorizza i valori utilizzando l'istruzione rename:

  • La variabile url salvata nel campo UDM target.url.
  • La variabile intermedia returnCode salvata nel campo UDM network.http.response_code.
  • La variabile intermedia size salvata nel campo UDM network.received_bytes.
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

Collega il record UDM all'output

L'istruzione finale nell'istruzione di mappatura dei dati genera i dati elaborati in un record evento UDM.

mutate {
    merge => {
      "@output" => "event"
    }
  }

Il codice completo dell'analizzatore

Questo è l'esempio completo di codice del parser. L'ordine delle istruzioni non segue lo stesso ordine delle sezioni precedenti di questo documento, ma produce lo stesso output.

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }


if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.