Elasticsearch-Logs erfassen
In diesem Dokument wird beschrieben, wie Sie Elasticsearch-Logs mit Amazon S3 in Google Security Operations aufnehmen. Der Parser wandelt Rohlogs im JSON-Format in ein einheitliches Datenmodell (Unified Data Model, UDM) um. Sie extrahiert Felder aus verschachtelten JSON-Strukturen, ordnet sie UDM-Feldern zu und reichert die Daten mit sicherheitsrelevantem Kontext wie Schweregraden und Nutzerrollen an.
Hinweise
- Eine Google SecOps-Instanz
- Berechtigter Zugriff auf die Elasticsearch-Clusterverwaltung
- Privilegierter Zugriff auf AWS (S3, IAM, EC2)
- EC2-Instanz oder dauerhafter Host zum Ausführen von Logstash
Elasticsearch-Voraussetzungen
- Melden Sie sich als Administrator in Ihrem Elasticsearch-Cluster an.
- Prüfen Sie, ob Ihr Elasticsearch-Abo Sicherheitsfunktionen enthält (für die Audit-Protokollierung erforderlich).
- Notieren Sie sich den Namen und die Version Ihres Elasticsearch-Clusters.
- Geben Sie den Pfad an, in den Audit-Logs geschrieben werden sollen (Standard:
$ES_HOME/logs/<clustername>_audit.json
).
Elasticsearch-Audit-Logging aktivieren
- Bearbeiten Sie auf jedem Elasticsearch-Knoten die Konfigurationsdatei elasticsearch.yml.
Fügen Sie die folgende Einstellung hinzu:
xpack.security.audit.enabled: true
Führen Sie einen rollierenden Neustart des Clusters durch, um die Änderungen zu übernehmen:
- Shard-Zuweisung deaktivieren:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- Beenden Sie jeden Knoten einzeln und starten Sie ihn dann neu.
- Aktivieren Sie die Shard-Zuweisung wieder:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Shard-Zuweisung deaktivieren:
Prüfen Sie, ob Audit-Logs im Logverzeichnis unter
<clustername>_audit.json
generiert werden.
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu der Anleitung unter Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
elastic-search-logs
). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie Drittanbieterdienst als Anwendungsfall aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) für die zukünftige Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess.
- Wählen Sie die Richtlinie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
Logstash so konfigurieren, dass Audit-Logs an S3 gesendet werden
- Installieren Sie Logstash auf einer EC2-Instanz oder einem persistenten Host, der auf die Elasticsearch-Audit-Logdateien zugreifen kann.
Installieren Sie das S3-Ausgabe-Plug-in, falls es noch nicht vorhanden ist:
bin/logstash-plugin install logstash-output-s3
Erstellen Sie eine Logstash-Konfigurationsdatei (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
Starten Sie Logstash mit der Konfiguration:
bin/logstash -f elastic-to-s3.conf
Optional: IAM-Nutzer mit Lesezugriff für Google SecOps erstellen
- Rufen Sie die AWS-Konsole > IAM > Nutzer > Nutzer hinzufügen auf.
- Klicken Sie auf Add users (Nutzer hinzufügen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer: Geben Sie
secops-reader
ein. - Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
- Nutzer: Geben Sie
- Klicken Sie auf Nutzer erstellen.
- Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
Geben Sie im JSON-Editor die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
Legen Sie
secops-reader-policy
als Name fest.Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.
Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.
Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).
Feed in Google SecOps konfigurieren, um Elasticsearch-Logs aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf + Neuen Feed hinzufügen.
- Geben Sie im Feld Feedname einen Namen für den Feed ein, z. B.
Elasticsearch Logs
. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Elastic Search als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://elastic-search-logs/logs/
- Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Der Standardwert ist 180 Tage.
- Zugriffsschlüssel-ID: Nutzerzugriffsschlüssel mit Zugriff auf den S3-Bucket.
- Geheimer Zugriffsschlüssel: Der geheime Schlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Asset-Namespace: Der Asset-Namespace.
- Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
UDM-Zuordnungstabelle
Logfeld | UDM-Zuordnung | Logik |
---|---|---|
Level | security_result.severity | Die Logik prüft den Wert des Felds „Level“ und ordnet ihn der entsprechenden UDM-Schweregradstufe zu: – „INFO“, „ALL“, „OFF“, „TRACE“, „DEBUG“ werden „INFORMATIONAL“ zugeordnet. – „WARN“ wird „LOW“ zugeordnet. – „ERROR“ wird „ERROR“ zugeordnet. – „FATAL“ wird „CRITICAL“ zugeordnet. |
message.@timestamp | timestamp | Der Zeitstempel wird aus dem Feld „@timestamp“ im Feld „message“ des Rohprotokolls im Format „JJJJ-MM-TTTHH:mm:ss.SSS“ geparst. |
message.action | security_result.action_details | Der Wert wird aus dem Feld „action“ im Feld „message“ des Rohlogs übernommen. |
message.event.action | security_result.summary | Der Wert wird aus dem Feld „event.action“ im Feld „message“ des Rohlogs übernommen. |
message.event.type | metadata.product_event_type | Der Wert wird aus dem Feld „event.type“ im Feld „message“ des Rohlogs übernommen. |
message.host.ip | target.ip | Der Wert wird aus dem Feld „host.ip“ im Feld „message“ des Rohlogs übernommen. |
message.host.name | target.hostname | Der Wert wird aus dem Feld „host.name“ im Feld „message“ des Rohlogs übernommen. |
message.indices | target.labels.value | Der Wert wird aus dem Feld „indices“ im Feld „message“ des Rohlogs übernommen. |
message.mrId | target.hostname | Der Wert wird aus dem Feld „mrId“ im Feld „message“ des Rohlogs übernommen. |
message.node.id | principal.asset.product_object_id | Der Wert wird aus dem Feld „node.id“ im Feld „message“ des Rohlogs übernommen. |
message.node.name | target.asset.hostname | Der Wert wird aus dem Feld „node.name“ im Feld „message“ des Rohlogs übernommen. |
message.origin.address | principal.ip | Die IP-Adresse wird aus dem Feld „origin.address“ im Feld „message“ des Rohlogs extrahiert, indem die Portnummer entfernt wird. |
message.origin.type | principal.resource.resource_subtype | Der Wert wird aus dem Feld „origin.type“ im Feld „message“ des Rohlogs übernommen. |
message.properties.host_group | principal.hostname | Der Wert wird aus dem Feld „properties.host_group“ im Feld „message“ des Rohlogs übernommen. |
message.properties.host_group | target.group.group_display_name | Der Wert wird aus dem Feld „properties.host_group“ im Feld „message“ des Rohlogs übernommen. |
message.request.id | target.resource.product_object_id | Der Wert wird aus dem Feld „request.id“ im Feld „message“ des Rohlogs übernommen. |
message.request.name | target.resource.name | Der Wert wird aus dem Feld „request.name“ im Feld „message“ des Rohlogs übernommen. |
message.user.name | principal.user.userid | Der Wert wird aus dem Feld „user.name“ im Feld „message“ des Rohlogs übernommen. |
message.user.realm | principal.user.attribute.permissions.name | Der Wert wird aus dem Feld „user.realm“ im Feld „message“ des Rohlogs übernommen. |
message.user.roles | about.user.attribute.roles.name | Der Wert wird aus dem Feld „user.roles“ im Feld „message“ des Rohlogs übernommen. |
metadata.event_type | Fest codierter Wert: „USER_RESOURCE_ACCESS“ | |
metadata.log_type | Fest codierter Wert: „ELASTIC_SEARCH“ | |
metadata.product_name | Fest codierter Wert: „ELASTICSEARCH“ | |
metadata.vendor_name | Fest codierter Wert: „ELASTIC“ | |
principal.port | Die Portnummer wird aus dem Feld „origin.address“ im Feld „message“ des Rohlogs extrahiert. | |
target.labels.key | Fest codierter Wert: „Indice“ |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten