Panoramica dell'analisi dei log

Supportato in:

Questo documento fornisce una panoramica di come Google Security Operations analizza i log non elaborati nel formato Unified Data Model (UDM).

Google Security Operations può ricevere i dati dei log provenienti dalle seguenti fonti di importazione:

  • Inoltro di Google Security Operations
  • Feed dell'API Google Security Operations
  • API Google Security Operations Ingestion
  • Partner tecnologico di terze parti

In genere, i clienti inviano i dati come log non elaborati originali. Google Security Operations identifica in modo univoco il dispositivo che ha generato i log utilizzando LogType. LogType identifica entrambi:

  • il fornitore e il dispositivo che ha generato il log, ad esempio Cisco Firewall, Linux DHCP Server o Bro DNS.
  • quale parser converte il log non elaborato in un modello di dati unificato (UDM) strutturato. Esiste una relazione uno a uno tra un parser e un LogType. Ogni parser converte i dati ricevuti da un singolo LogType.

Google Security Operations fornisce un insieme di analizzatori sintattici predefiniti che leggono i log non elaborati originali e generano record UDM strutturati utilizzando i dati nel log non elaborato originale. Operazioni di sicurezza di Google gestisce questi parser. I clienti possono anche definire istruzioni di mappatura dei dati personalizzate creando un parser specifico per il cliente. Contatta il tuo rappresentante di Google Security Operations per informazioni sulla creazione di un parser specifico per il cliente.

Flusso di lavoro di importazione e normalizzazione

Il parser contiene istruzioni di mappatura dei dati. Definisce il modo in cui i dati vengono mappati dal log non elaborato originale a uno o più campi nella struttura dei dati UDM.

Se non sono presenti errori di analisi, Google Security Operations crea un record strutturato UDM utilizzando i dati del log non elaborato. Il processo di conversione di un log non elaborato in un record UDM chiamata normalizzazione.

Un parser predefinito potrebbe mappare un sottoinsieme di valori principali dal log non elaborato. In genere, questi campi principali sono i più importanti per fornire approfondimenti sulla sicurezza in Google Security Operations. I valori non mappati rimangono nel log non elaborato, ma non vengono archiviati nel record UDM.

Un cliente può anche utilizzare l'API Ingestion, per inviare dati in formato UDM (Unified Data Model).

Personalizzare l'analisi dei dati importati

Google Security Operations fornisce le seguenti funzionalità che consentono ai clienti di personalizzare l'analisi dei dati di log originali in arrivo.

  • Parser specifici del cliente: i clienti creano un parser personalizzato configurazione per un tipo di log specifico che soddisfa le i tuoi requisiti. Un parser specifico per il cliente sostituisce il parser predefinito per il LogType specifico. Contatta il tuo team Google Security Operations per informazioni sulla creazione di un parser specifico del cliente.
  • Estensioni parser: i clienti possono aggiungere istruzioni di mappatura personalizzate in oltre alla configurazione predefinita del parser. Ogni cliente può creare un proprio insieme unico di istruzioni di mappatura personalizzata. Queste istruzioni di mappatura definiscono come estrarre e trasformare campi aggiuntivi dai log non elaborati originali ai campi UDM. L'estensione parser non sostituisce analizzatore sintattico predefinito o specifico per il cliente.

Un esempio che utilizza un log del proxy web Squid

Questa sezione fornisce un esempio di log del proxy web Squid e descrive come vengono mappati a un record UDM. Per la descrizione di tutti i campi nello schema UDM, consulta l'elenco dei campi del modello di dati unificato.

Il log del proxy web Squid di esempio contiene valori separati da spazi. Ogni record rappresenta un evento e memorizza i seguenti dati: timestamp, durata, cliente, codice/stato del risultato, byte trasmessi, metodo di richiesta, URL, utente, codice gerarchia e tipo di contenuto. In questo esempio, i campi seguenti sono estratti e mappati in un record UDM: ora, client, stato dei risultati, byte metodo di richiesta e URL.

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Proxy web Squid di esempio

Quando confronti queste strutture, tieni presente che solo un sottoinsieme dei dati del log originale è incluso nel record UDM. Alcuni campi sono obbligatori, mentre altri sono facoltativi. Inoltre, solo un sottoinsieme delle sezioni del record UDM contengono dati. Se il parser non mappa i dati del log originale all'UDM non vedrai quella sezione del record UDM in Google Security Operations.

Valori di log mappati a UDM

La sezione metadata memorizza il timestamp dell'evento. Tieni presente che il valore è stato convertito dal formato EPOCH a RFC 3339. Questa conversione è facoltativa. Il timestamp può essere memorizzati in formato EPOCH, con pre-elaborazione per separare i secondi e di millisecondi in campi separati.

Il campo metadata.event_type memorizza il valore NETWORK_HTTP, che è un valore enumerato che identifica il tipo di evento. Il valore di metadata.event_type determina quali campi UDM aggiuntivi sono obbligatori o facoltativi. Le product_name e I valori vendor_name contengono descrizioni semplici del dispositivo che registrato il log originale.

metadata.event_type in un record di evento UDM non corrisponde a log_type definito durante l'importazione dei dati utilizzando l'API di importazione. Questi due attributi memorizzano informazioni diverse.

La sezione network contiene i valori dell'evento log originale. In questo esempio, tieni presente che il valore dello stato del log originale è stato analizzato dal campo "result code/status" prima di essere scritto nel record UDM. Solo il valore result_code è stato incluso nel record UDM.

Valori dei log mappati all'UDM

Nella sezione principal vengono archiviate le informazioni sul client del log originale. La sezione target memorizza sia l'URL completo sia l'indirizzo IP.

La sezione security_result memorizza uno dei valori enum per rappresentare l'azione registrata nel log originale.

Si tratta del record UDM formattato come JSON. Tieni presente che sono incluse solo le sezioni che contengono dati. Le sezioni src, observer, intermediary, about e extensions non sono incluse.

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

Passaggi nelle istruzioni del parser

Le istruzioni di mappatura dei dati all'interno di un parser seguono un pattern comune, come segue:

  1. Analizza ed estrae i dati dal log originale.
  2. Manipolare i dati estratti. È incluso l'uso della logica condizionale per analizzare selettivamente i valori, convertire i tipi di dati, sostituire sottostringhe , convertire in maiuscolo o minuscolo e così via.
  3. Assegna valori ai campi UDM.
  4. Restituisci come output il record UDM mappato nella chiave @output.

Analizza ed estrae i dati dal log originale

Impostare l'istruzione di filtro

L'istruzione filter è la prima nell'insieme di istruzioni di analisi. Tutte le istruzioni di analisi aggiuntive sono contenute nell'istruzione filter.

filter {

}

Inizializza le variabili che memorizzeranno i valori estratti

All'interno dell'istruzione filter, inizializza le variabili intermedie che il parser utilizzerà per memorizzare i valori estratti dal log.

Queste variabili vengono utilizzate ogni volta che viene analizzato un singolo log. Il valore in ogni variabile intermedia verrà impostata su uno o più campi UDM più avanti nella le istruzioni di analisi.

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

Estrarre singoli valori dal log

Google Security Operations fornisce un insieme di filtri, basati su Logstash, per estrarre i campi dai file di log originali. A seconda del formato del log, utilizza uno o più filtri di estrazione per estrarre tutti i dati dal log. Se la stringa è:

  • JSON nativo, la sintassi del parser è simile al filtro JSON che supporta i log in formato JSON. JSON nidificato non è supportato.
  • Formato XML, la sintassi dell'analizzatore sintattico è simile al filtro XML che supporta i log nel formato XML.
  • coppie chiave-valore, la sintassi del parser è simile al filtro Kv che supporta i messaggi con formato chiave-valore.
  • CSV, la sintassi del parser è simile a Filtro CSV che supporta i messaggi in formato CSV.
  • Per tutti gli altri formati, la sintassi del parser è simile al filtro GROK con i pattern GROK integrati. Vengono utilizzate le istruzioni per l'estrazione in stile regex.

Google Security Operations fornisce un sottoinsieme delle funzionalità disponibili in ogni filtro. Google Security Operations fornisce anche una sintassi di mappatura dei dati personalizzata non disponibile nei filtri. Consulta il riferimento alla sintassi dell'analizzatore per una descrizione delle funzionalità supportate e delle funzioni personalizzate.

Continuando con l'esempio del log del proxy web Squid, la seguente istruzione di estrazione dei dati include una combinazione di sintassi Logstash Grok ed espressioni regolari.

La seguente istruzione di estrazione archivia i valori nel seguente intermedio variabili:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

Questa istruzione di esempio utilizza anche la parola chiave overwrite per archiviare i valori estratti in ogni variabile. Se il processo di estrazione restituisce un errore, l'istruzione on_error imposta not_valid_log su True.

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

Manipolare e trasformare i valori estratti

Google Security Operations sfrutta le funzionalità del plug-in filtro mutate di Logstash per consentire la manipolazione dei valori estratti dal log originale. Google Security Operations fornisce un sottoinsieme delle funzionalità disponibili nel plug-in. Per una descrizione delle funzionalità, consulta la sintassi dell'analizzatore sintattico. supportate e personalizzate, ad esempio:

  • Trasformare i valori in un tipo di dati diverso
  • sostituisci i valori nella stringa
  • unire due array o aggiungere una stringa a un array. I valori delle stringhe sono in un array prima dell'unione.
  • Converti in lettere minuscole o maiuscole

Questa sezione fornisce esempi di trasformazione dei dati basati sul log del proxy web Squid presentato in precedenza.

Trasforma il timestamp dell'evento

Tutti gli eventi archiviati come record UDM devono avere un timestamp. Questo esempio controlla se è stato estratto un valore per i dati dal log. Poi utilizza la funzione data di Grok per abbinare il valore al formato orario UNIX.

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

Trasforma il valore username

L'istruzione di esempio seguente converte il valore nella variabile username in minuscolo.

mutate {
   lowercase => [ "username"]
   }

Trasforma il valore action

L'esempio seguente valuta il valore nella variabile intermedia action e lo modifica in ALLOW, BLOCK o UNKNOWN_ACTION, che sono valori validi per il campo UDM action. L'UDM di security_result.action è un tipo enumerato che archivia solo valori specifici.

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

Trasforma l'indirizzo IP di destinazione

L'esempio seguente verifica la presenza di un valore nella variabile intermedia tgtip. Se trovato, il valore viene abbinato a un pattern di indirizzi IP utilizzando un elenco Grok predefinito pattern. Se si verifica un errore di corrispondenza del valore a un pattern di indirizzi IP, il valore La funzione on_error imposta la proprietà not_valid_tgtip su True. Se la corrispondenza è riuscita, la proprietà not_valid_tgtip non è impostata.

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

Modificare il tipo di dati di ReturnCode e la dimensione

L'esempio seguente esegue il casting del valore nella variabile size in uinteger e il valore nella variabile returnCode in integer. Questo è obbligatoria perché la variabile size verrà salvata nella network.received_bytes Campo UDM che memorizza un tipo di dati int64. La returnCode variabile verrà salvata nel campo UDM network.http.response_code che archivia un tipo di dati int32.

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

Assegnare valori ai campi UDM in un evento

Dopo aver estratto e pre-elaborato i valori, assegnali ai campi di un record di evento UDM. A un campo UDM puoi assegnare sia valori estratti che valori statici.

Se compili event.disambiguation_key, assicurati che questo campo sia univoco per ogni evento generato per il log specificato. Se due eventi diversi hanno stesso disambiguation_key, questo comporterà un comportamento imprevisto nel di un sistema operativo completo.

Gli esempi di parser in questa sezione si basano sull'esempio di log del proxy web Squid in alto.

Salva il timestamp dell'evento

Per ogni record evento UDM deve essere impostato un valore per il metadata.event_timestamp campo UDM. L'esempio seguente salva il timestamp dell'evento estratto dal log nella variabile interna @timestamp. Google Security Operations salva questo valore nel metadata.event_timestamp campo UDM per impostazione predefinita.

mutate {
  rename => {
    "when" => "timestamp"
  }
}

Impostare il tipo di evento

Ogni record di eventi UDM deve avere un valore impostato per l'UDM metadata.event_type . Questo campo è di tipo enumerato. Il valore di questo campo determina quali campi UDM aggiuntivi devono essere compilati per salvare il record UDM. Il processo di analisi e normalizzazione non andrà a buon fine se uno dei campi obbligatori non contiene dati validi.

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

Salva i valori username e method utilizzando l'istruzione replace

I valori nei campi intermedi username e method sono stringhe. L'esempio seguente verifica se esiste un valore valido e, in questo caso, memorizza il valore username nel campo UDM principal.user.userid e il valore method nel campo UDM network.http.method.

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

Salva action nel campo UDM security_result.action

Nella sezione precedente, il valore della variabile intermedia action era Vengono valutati e trasformati in uno dei valori standard per il campo UDM security_result.action.

Entrambi i campi UDM security_result e action memorizzano un array di elementi, il che significa che devi seguire un approccio leggermente diverso quando salvi questo valore.

Innanzitutto, salva il valore trasformato in un campo security_result.action intermediario. Il campo security_result è un padre del campo action.

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

Successivamente, salva il campo intermedio security_result.action nel campo UDM security_result. Il campo UDM security_result archivia un array di , in modo che il valore venga aggiunto a questo campo.

# save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

Archivia l'indirizzo IP di destinazione e l'indirizzo IP di origine utilizzando l'istruzione merge

Memorizza i seguenti valori nel record dell'evento UDM:

  • Valore nella variabile intermedia srcip nel campo UDM principal.ip.
  • Valore nella variabile intermedia tgtip al campo UDM target.ip.

Sia i campi UDM principal.ip che target.ip memorizzano un array di elementi, pertanto i valori vengono aggiunti a ciascun campo.

Gli esempi riportati di seguito mostrano diversi approcci per salvare questi valori. Durante la fase di trasformazione, la tgtipvariabile intermedia è stata abbinata a una Indirizzo IP con un pattern Grok predefinito. L'istruzione di esempio riportata di seguito controlla se la proprietà not_valid_tgtip è vera, indicando che non è stato possibile abbinare il valore tgtip a un pattern di indirizzi IP. Se è false, salva la tgtip al campo UDM target.ip.

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

La variabile intermedia srcip non è stata trasformata. L'istruzione seguente controlla se è stato estratto un valore dal log originale e, in caso affermativo, salva il valore nel campo UDM principal.ip.

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

Salvare url, returnCode e size utilizzando l'istruzione rename

L'istruzione di esempio riportata di seguito memorizza i seguenti valori utilizzando l'istruzione rename.

  • Variabile url salvata nel campo UDM target.url.
  • La variabile intermedia returnCode salvata nel campo UDM network.http.response_code.
  • La variabile intermedia size salvata nel campo UDM network.received_bytes.
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

Collega il record UDM all'output

L'istruzione finale dell'istruzione di mappatura dei dati restituisce i dati elaborati in un record di eventi UDM.

mutate {
    merge => {
      "@output" => "event"
    }
  }

Il codice completo del parser

Questo è l'esempio di codice completo del parser. L'ordine delle istruzioni non segue lo stesso ordine delle sezioni precedenti di questo documento, ma produce lo stesso output.

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }


if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter