Übersicht über das Parsen von Protokollen

Unterstützt in:

Dieses Dokument bietet einen Überblick darüber, wie Google Security Operations Rohprotokolle in das Unified Data Model (UDM)-Format parst.

Google Security Operations kann Logdaten aus der folgenden Aufnahme empfangen Quellen:

  • Google Security Operations-Forwarder
  • Google Security Operations API-Feed
  • Google Security Operations Ingestion API
  • Technologiepartner eines Drittanbieters

Im Allgemeinen senden Kunden Daten als Original-Rohprotokolle. Einzigartige Sicherheit bei Google Security Operations das Gerät identifiziert, das die Protokolle mithilfe des LogType generiert hat. LogType steht für beide:

  • den Anbieter und das Gerät, von denen das Log generiert wurde, z. B. Cisco Firewall, Linux DHCP-Server oder Bro DNS.
  • Der Parser konvertiert das Rohlog in ein strukturiertes Unified Data Model (UDM). Es besteht eine 1:1-Beziehung zwischen einem Parser und einem LogType. Jeder Parser wandelt Daten um, die von einem einzelnen LogType empfangen wurden.

Google Security Operations bietet eine Reihe von Standardparsern, die die ursprünglichen Rohprotokolle lesen und Strukturierte UDM-Datensätze mithilfe von Daten im ursprünglichen Rohlog generieren Google Security Operations verwaltet diese Parser. Kunden können auch benutzerdefinierte Anweisungen für die Datenzuordnung definieren indem ein kundenspezifischer Parser erstellt wird. Google Security Operations kontaktieren für Informationen zum Erstellen eines kundenspezifischen Parsers.

Aufnahme- und Normalisierungsworkflow

Der Parser enthält Anweisungen zur Datenzuordnung. Sie definiert, wie Daten zugeordnet werden. aus dem ursprünglichen Rohprotokoll in ein oder mehrere Felder in der UDM-Datenstruktur.

Wenn keine Parsing-Fehler auftreten, erstellt Google Security Operations einen UDM-strukturierten Datensatz mithilfe von Daten aus dem Rohprotokoll. Der Prozess zum Konvertieren eines Rohlogs in einen UDM-Eintrag wird als Normalisierung bezeichnet.

Ein Standardparser könnte eine Teilmenge von Kernwerten aus dem Rohlog zuordnen. Normalerweise Diese Kernfelder sind für die Bereitstellung von Sicherheitsinformationen in Google Security Operations Nicht zugeordnete Werte verbleiben im Rohlog, werden aber nicht die im UDM-Eintrag gespeichert sind.

Kunden können auch die Ingestion API verwenden, um Daten im strukturierten Unified Data Model-Format (UDM) zu senden.

Parsen von aufgenommenen Daten anpassen

Google Security Operations bietet Kunden folgende Möglichkeiten: Passen Sie das Parsen von Daten für eingehende Original-Protokolldaten an.

  • Kundenspezifische Parser: Kunden erstellen einen benutzerdefinierten Parser für einen bestimmten Logtyp konfigurieren, der ihren spezifischen Anforderungen entspricht, Anforderungen. Ein kundenspezifischer Parser ersetzt Den Standardparser für den jeweiligen LogType. Google Security Operations kontaktieren für Informationen zum Erstellen eines kundenspezifischen Parsers.
  • Parser-Erweiterungen: Kunden können benutzerdefinierte Zuordnungsanweisungen hinzufügen in zur Parser-Standardkonfiguration hinzu. Jeder Kunde kann eigene Anweisungen für die Zuordnung erstellen. Diese Zuordnung Anweisungen zum Extrahieren und Transformieren zusätzlicher Felder aus ursprünglichen Rohlogs in UDM-Feldern. Eine Parsererweiterung ersetzt nicht den Standard- oder kundenspezifischen Parser.

Beispiel für die Verwendung eines Squid-Web-Proxy-Protokolls

Dieser Abschnitt enthält ein Beispiel für ein Squid-Web-Proxy-Protokoll und beschreibt, wie die werden einem UDM-Eintrag zugeordnet. Eine Beschreibung aller Felder im UDM-Schema Siehe Liste der Felder für einheitliche Datenmodell

Das Squid-Web-Proxy-Beispielprotokoll enthält durch Leerzeichen getrennte Werte. Jeder Datensatz stellt ein Ereignis dar und speichert die folgenden Daten: Zeitstempel, Dauer, Client, Ergebniscode/Ergebnisstatus, übertragene Byte, Anfragemethode, URL, Nutzer Hierarchiecode und Inhaltstyp. In diesem Beispiel sind die folgenden Felder extrahiert und einem UDM-Datensatz zugeordnet: Zeit, Client, Ergebnisstatus, Byte, und die URL angeben.

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Beispiel für einen Squid-Web-Proxy

Beachten Sie beim Vergleich dieser Strukturen, dass nur ein Teil der ursprünglichen Logdaten sind im UDM-Eintrag enthalten. Bestimmte Felder sind Pflichtfelder, andere sind optional. Außerdem ist nur ein Teil der Abschnitte im UDM-Eintrag Daten enthalten. Wenn der Parser der UDM keine Daten aus dem ursprünglichen Log zuordnet wird dieser Abschnitt des UDM-Eintrags in Google Security Operations nicht angezeigt.

UDM zugeordnete Logwerte

Im Abschnitt metadata wird der Ereigniszeitstempel gespeichert. Der Wert wurde umgerechnet, von EPOCH in RFC 3339-Format ändern. Diese Konvertierung ist optional. Der Zeitstempel kann im EPOCH-Format gespeichert, mit Vorverarbeitung zur Trennung von Sekunden und Millisekunden-Anteile in separate Felder ein.

Die metadata.event_type enthält den Wert NETWORK_HTTP, bei dem es sich um einen Aufzählungswert handelt. der den Ereignistyp identifiziert. Der Wert von metadata.event_type bestimmt, welche zusätzlichen UDM-Felder erforderlich und optional sind. Die product_name und vendor_name-Werte enthalten benutzerfreundliche Beschreibungen des Geräts, das das ursprüngliche Protokoll aufgezeichnet hat.

metadata.event_type in einem UDM-Ereigniseintrag ist nicht mit „log_type“ identisch definiert, wenn Daten mit der Ingestion API aufgenommen werden. Diese beiden Attribute speichern unterschiedliche Informationen.

Der Abschnitt network enthält Werte aus dem ursprünglichen Logereignis. Hinweis in dieser Beispiel, dass der Statuswert aus dem ursprünglichen Protokoll aus dem Feld „Ergebnis“ Code/Status bevor in den UDM-Eintrag geschrieben werden. Nur der Ergebniscode im UDM-Eintrag enthalten ist.

UDM zugeordnete Logwerte

Im Abschnitt principal werden die Clientinformationen aus dem ursprünglichen Log gespeichert. Die Im Abschnitt target werden sowohl die voll qualifizierte URL als auch die IP-Adresse gespeichert.

Im Abschnitt security_result wird einer der Aufzählungswerte für die Aktion gespeichert, die im ursprünglichen Log aufgezeichnet wurde.

Dies ist der im JSON-Format formatierte UDM-Eintrag. Beachten Sie, dass nur Abschnitte, enthalten sind. src, observer, intermediary, about, und extensions-Abschnitte sind nicht enthalten.

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

Schritte in Parser-Anweisungen

Anweisungen zur Datenzuordnung innerhalb eines Parsers folgen einem gemeinsamen Muster: folgt:

  1. Daten werden aus dem ursprünglichen Log geparst und extrahiert.
  2. Die extrahierten Daten bearbeiten. Dazu gehört auch die Verwendung bedingter Logik, Werte selektiv analysieren, Datentypen konvertieren, Teilzeichenfolgen in einer in Groß- oder Kleinbuchstaben umwandeln usw.
  3. Weisen Sie UDM-Feldern Werte zu.
  4. Geben Sie den zugeordneten UDM-Eintrag in den @output-Schlüssel aus.

Daten im ursprünglichen Log parsen und extrahieren

Filteranweisung festlegen

Die filter-Anweisung ist die erste Anweisung in der Gruppe der Parsing-Anweisungen. Alle zusätzlichen Parsing-Anweisungen sind in der Anweisung filter enthalten.

filter {

}

Variablen initialisieren, die extrahierte Werte speichern

Initialisieren Sie in der Anweisung filter Zwischenvariablen, die vom verwendet der Parser zum Speichern aus dem Log extrahierte Werte.

Diese Variablen werden jedes Mal verwendet, wenn ein einzelnes Protokoll geparst wird. Der Wert in wird jede Zwischenvariable später im Parsing-Anweisungen.

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

Einzelne Werte aus dem Protokoll extrahieren

Google Security Operations bietet eine Reihe von auf Logstash basierenden Filtern zum Extrahieren von Feldern aus den Original-Protokolldateien. Je nach Format des Protokolls verwenden Sie ein oder mehrere Extraktionsfilter, um alle Daten aus dem Log zu extrahieren. Wenn der String wie folgt lautet:

  • JSON verwendet, ähnelt die Parsersyntax der JSON-Filter, der JSON-formatierte Logs unterstützt. Verschachtelte JSON-Daten werden nicht unterstützt.
  • XML-Format, ähnelt die Parsersyntax der XML-Filter, der XML-formatierte Logs unterstützt.
  • Schlüssel/Wert-Paare verwendet, ähnelt die Parsersyntax der Kv-Filter, der Nachrichten im Schlüssel/Wert-Paar-Format unterstützt.
  • CSV-Format hat, ähnelt die Parsersyntax der CSV-Filter, der Nachrichten im CSV-Format unterstützt.
  • allen anderen Formaten verwenden, ähnelt die Parsersyntax der GROK-Filter mit integrierten GROK-Mustern . Hier werden Extraktionsanleitungen im Regex-Stil verwendet.

Google Security Operations bietet einen Teil der Funktionen, die im jeweiligen Filter verfügbar sind. Google Security Operations bietet auch eine benutzerdefinierte Datenzuordnungssyntax, die nicht verfügbar ist in den Filtern ein. Siehe Parser-Syntaxreferenz. finden Sie eine Beschreibung unterstützter Funktionen und benutzerdefinierter Funktionen.

Wir bleiben bei dem Beispiel mit dem Squid-Web-Proxy-Protokoll und der folgenden Datenextraktion -Anweisung umfasst eine Kombination aus Logstash Grok-Syntax und regulären Ausdrücke.

Die folgende Extraktionsanweisung speichert Werte in der Variablen:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

In dieser Beispielanweisung wird auch das Schlüsselwort overwrite verwendet, um die extrahierten Werte zu speichern in jeder Variablen an. Wenn der Extraktionsprozess einen Fehler zurückgibt, ist der Fehler on_error legt not_valid_log auf True fest.

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

Extrahierte Werte bearbeiten und transformieren

Google Security Operations nutzt die Funktionen des mutate-Filter-Plug-ins von Logstash, um die Bearbeitung von Werten zu ermöglichen, die aus dem ursprünglichen Protokolls. Google Security Operations stellt einen Teil der Funktionen des Plug-ins bereit. Eine Beschreibung der Funktionen finden Sie unter Parser-Syntax. unterstützte und benutzerdefinierte Funktionen wie:

  • Werte in einen anderen Datentyp umwandeln
  • Werte im String ersetzen
  • zwei Arrays zusammenführen oder einen String an ein Array anhängen. Zeichenfolgenwerte sind vor der Zusammenführung in ein Array konvertiert.
  • in Kleinbuchstaben oder Großbuchstaben konvertieren

Dieser Abschnitt enthält Beispiele für die Datentransformation, die auf dem Squid-Web-Proxy-Protokoll basieren. die ich zuvor vorgestellt habe.

Ereigniszeitstempel transformieren

Alle als UDM-Einträge gespeicherten Ereignisse müssen einen Ereigniszeitstempel haben. Dieses Beispiel Prüft, ob ein Wert für die Daten aus dem Protokoll extrahiert wurde. Dann wird die Methode Grok-Datumsfunktion um den Wert mit dem Zeitformat UNIX abzugleichen.

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

username-Wert transformieren

Mit der folgenden Beispielanweisung wird der Wert in der Variablen username konvertiert in Kleinbuchstaben.

mutate {
   lowercase => [ "username"]
   }

action-Wert transformieren

Im folgenden Beispiel wird der Wert in der Zwischenvariablen action ausgewertet und ändert den Wert entweder in ALLOW, BLOCK oder UNKNOWN_ACTION, gültige Werte für das UDM-Feld security_result.action. Das UDM von security_result.action ist ein Enum-Typ, der nur bestimmte Werte speichert.

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

Ziel-IP-Adresse transformieren

Im folgenden Beispiel wird in der Zwischenvariablen tgtip nach einem Wert gesucht. Wenn der Wert gefunden wird, wird er mithilfe eines vordefinierten Groks mit einem IP-Adressmuster abgeglichen. Muster zu ändern. Wenn beim Abgleich des Werts mit einem IP-Adressmuster ein Fehler auftritt, Die Funktion on_error setzt die Eigenschaft not_valid_tgtip auf True. Wenn die Übereinstimmung erfolgreich ist, ist das Attribut not_valid_tgtip nicht festgelegt.

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

Datentyp von „returnCode“ und „size“ ändern

Im folgenden Beispiel wird der Wert in der Variablen size in uinteger und den Wert in der Variablen returnCode auf integer. Dies ist erforderlich, weil die Variable size im network.received_bytes-UDM-Feld, in dem der Datentyp int64 gespeichert wird. Die Variable „returnCode“ wird im UDM-Feld „network.http.response_code“ gespeichert mit dem der Datentyp int32 gespeichert wird.

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

UDM-Feldern in einem Ereignis Werte zuweisen

Nachdem die Werte extrahiert und vorverarbeitet wurden, weisen Sie sie Feldern in einer UDM zu. Ereignisdatensatz. Sie können einem UDM-Feld sowohl extrahierte als auch statische Werte zuweisen.

Wenn Sie event.disambiguation_key ausfüllen, muss dieses Feld eindeutig sein jedes Ereignis, das für das jeweilige Protokoll generiert wird. Wenn zwei Ereignisse die disambiguation_key identisch, führt dies zu unerwartetem Verhalten im System.

Die Parserbeispiele in diesem Abschnitt basieren auf dem Squid-Webproxy-Logbeispiel. oben.

Ereigniszeitstempel speichern

Für jeden UDM-Ereigniseintrag muss ein Wert für metadata.event_timestamp festgelegt sein UDM ein. Im folgenden Beispiel wird der aus dem Log extrahierte Ereigniszeitstempel im @timestamp. Google Security Operations speichert dies im Standardmäßig das UDM-Feld metadata.event_timestamp.

mutate {
  rename => {
    "when" => "timestamp"
  }
}

Ereignistyp festlegen

Für jeden UDM-Ereigniseintrag muss ein Wert für die UDM-Datei „metadata.event_type“ festgelegt sein ein. Dieses Feld ist ein Aufzählungstyp. Der Wert dieses Feldes bestimmt, welche zusätzlichen UDM-Felder ausgefüllt werden müssen, damit der UDM-Eintrag gespeichert wird. Der Parsing- und Normalisierungsprozess schlägt fehl, wenn eines der Pflichtfelder die keine gültigen Daten enthalten.

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

Speichern Sie die Werte username und method mit der replace-Anweisung

Die Werte in den Zwischenfeldern username und method sind Strings. Die folgendes Beispiel prüft, ob ein gültiger Wert vorhanden ist, und speichert, falls dies der Fall ist, den Wert username in das UDM-Feld principal.user.userid und den Wert method in das UDM-Feld network.http.method ein.

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

Speichern Sie action im UDM-Feld security_result.action.

Im vorherigen Abschnitt war der Wert in der Zwischenvariablen action ausgewertet und in einen der Standardwerte für das UDM-Feld security_result.action umgewandelt wurde.

Die UDM-Felder security_result und action speichern ein Array von Elementen. Sie müssen also beim Speichern dieser Daten einen etwas anderen Ansatz verfolgen. Wert.

Speichern Sie zuerst den transformierten Wert in einer Zwischen-security_result.action. ein. Das Feld security_result ist dem Feld action übergeordnet.

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

Speichern Sie als Nächstes das Zwischenfeld security_result.action im security_result-UDM-Feld. Im UDM-Feld security_result ist ein Array von Elemente, sodass der Wert an dieses Feld angehängt wird.

# save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

Ziel-IP-Adresse und Quell-IP-Adresse mit der merge-Anweisung speichern

Speichern Sie die folgenden Werte im UDM-Ereigniseintrag:

  • Wert in der Zwischenvariablen srcip für das UDM-Feld principal.ip.
  • Wert in der Zwischenvariablen tgtip für das UDM-Feld target.ip.

Die UDM-Felder principal.ip und target.ip speichern ein Array von Elementen, sodass werden an jedes Feld angehängt.

Die folgenden Beispiele zeigen verschiedene Ansätze zum Speichern dieser Werte. Während des Transformationsschritts wurde die Zwischenvariable tgtip mit einem IP-Adresse mit einem vordefinierten Grok-Muster. Die folgende Beispielanweisung Prüft, ob das Attribut not_valid_tgtip wahr ist, was bedeutet, dass tgtip Der Wert konnte keinem IP-Adressmuster zugeordnet werden. Ist er „false“, wird die tgtip-Wert in das UDM-Feld target.ip.

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

Die Zwischenvariable srcip wurde nicht transformiert. Die folgende Anweisung Prüft, ob ein Wert aus dem ursprünglichen Protokoll extrahiert wurde, und speichert, ob dies der Fall ist den Wert in das UDM-Feld principal.ip ein.

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

Speichern Sie url, returnCode und size mit der rename-Anweisung

In der folgenden Beispielanweisung werden die folgenden Werte mithilfe der rename-Anweisung gespeichert.

  • Die Variable url, die im UDM-Feld target.url gespeichert wurde.
  • Die Zwischenvariable returnCode, die im UDM-Feld network.http.response_code gespeichert wurde.
  • Die Zwischenvariable size, die im UDM-Feld network.received_bytes gespeichert wurde.
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

UDM-Eintrag an die Ausgabe binden

Mit der letzten Anweisung in der Datenzuordnungsanweisung werden die verarbeiteten Daten ausgegeben. in einen UDM-Ereignisdatensatz.

mutate {
    merge => {
      "@output" => "event"
    }
  }

Vollständiger Parsercode

Dies ist das vollständige Codebeispiel für den Parser. Die Reihenfolge der Anweisungen entspricht nicht den in der gleichen Reihenfolge wie in den vorherigen Abschnitten dieses Dokuments, führt aber zu derselben Ausgabe.

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }


if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter