Cisco vManage SD-WAN-Logs erfassen
In diesem Dokument wird beschrieben, wie Sie Cisco vManage SD-WAN-Logs mit Amazon S3 in Google Security Operations aufnehmen.
Hinweise
Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:
- Eine Google SecOps-Instanz
- Privilegierter Zugriff auf die Cisco vManage SD-WAN-Verwaltungskonsole
- Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)
Cisco vManage SD-WAN-Voraussetzungen (Anmeldedaten und Basis-URL) erfassen
- Melden Sie sich in der Cisco vManage Management Console an.
- Klicken Sie auf Administration > Einstellungen > Nutzer.
- Erstellen Sie einen neuen Nutzer oder verwenden Sie einen vorhandenen Administratornutzer mit API-Zugriffsberechtigungen.
- Kopieren und speichern Sie die folgenden Details an einem sicheren Ort:
- Nutzername
- Passwort
- vManage-Basis-URL (z. B.
https://your-vmanage-server:8443
)
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
cisco-sdwan-logs-bucket
). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) zur späteren Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
IAM-Richtlinie und ‑Rolle für S3-Uploads konfigurieren
- Rufen Sie in der AWS-Konsole IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
Geben Sie die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/state.json" } ] }
- Ersetzen Sie
cisco-sdwan-logs-bucket
, wenn Sie einen anderen Bucket-Namen eingegeben haben.
- Ersetzen Sie
Klicken Sie auf Weiter > Richtlinie erstellen.
Rufen Sie IAM > Rollen auf.
Klicken Sie auf Rolle erstellen > AWS-Service > Lambda.
Hängen Sie die neu erstellte Richtlinie an.
Geben Sie der Rolle den Namen
cisco-sdwan-lambda-role
und klicken Sie auf Rolle erstellen.
Lambda-Funktion erstellen
- Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
- Klicken Sie auf Von Grund auf erstellen.
Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert Name cisco-sdwan-log-collector
Laufzeit Python 3.13 Architektur x86_64 Ausführungsrolle cisco-sdwan-lambda-role
Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (
cisco-sdwan-log-collector.py
):import json import boto3 import os import urllib3 import urllib.parse from datetime import datetime, timezone from botocore.exceptions import ClientError # Disable SSL warnings for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Environment variables VMANAGE_HOST = os.environ['VMANAGE_HOST'] VMANAGE_USERNAME = os.environ['VMANAGE_USERNAME'] VMANAGE_PASSWORD = os.environ['VMANAGE_PASSWORD'] S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] s3_client = boto3.client('s3') http = urllib3.PoolManager(cert_reqs='CERT_NONE') class VManageAPI: def __init__(self, host, username, password): self.host = host.rstrip('/') self.username = username self.password = password self.cookies = None self.token = None def authenticate(self): """Authenticate with vManage and get session tokens""" try: # Login to get JSESSIONID login_url = f"{self.host}/j_security_check" login_data = urllib.parse.urlencode({ 'j_username': self.username, 'j_password': self.password }) response = http.request( 'POST', login_url, body=login_data, headers={'Content-Type': 'application/x-www-form-urlencoded'}, timeout=30 ) # Check if login was successful (vManage returns HTML on failure) if b'<html>' in response.data or response.status != 200: raise Exception("Authentication failed") # Extract cookies self.cookies = {} if 'Set-Cookie' in response.headers: cookie_header = response.headers['Set-Cookie'] for cookie in cookie_header.split(';'): if 'JSESSIONID=' in cookie: self.cookies['JSESSIONID'] = cookie.split('JSESSIONID=')[1].split(';')[0] break if not self.cookies.get('JSESSIONID'): raise Exception("Failed to get JSESSIONID") # Get XSRF token token_url = f"{self.host}/dataservice/client/token" headers = { 'Content-Type': 'application/json', 'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}" } response = http.request('GET', token_url, headers=headers, timeout=30) if response.status == 200: self.token = response.data.decode('utf-8') return True else: raise Exception(f"Failed to get XSRF token: {response.status}") except Exception as e: print(f"Authentication error: {e}") return False def get_headers(self): """Get headers for API requests""" return { 'Content-Type': 'application/json', 'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}", 'X-XSRF-TOKEN': self.token } def get_audit_logs(self, last_timestamp=None): """Get audit logs from vManage""" try: url = f"{self.host}/dataservice/auditlog" headers = self.get_headers() # Build query for recent logs query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of logs by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get audit logs: {response.status}") return None except Exception as e: print(f"Error getting audit logs: {e}") return None def get_alarms(self, last_timestamp=None): """Get alarms from vManage""" try: url = f"{self.host}/dataservice/alarms" headers = self.get_headers() # Build query for recent alarms query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of alarms by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get alarms: {response.status}") return None except Exception as e: print(f"Error getting alarms: {e}") return None def get_events(self, last_timestamp=None): """Get events from vManage""" try: url = f"{self.host}/dataservice/events" headers = self.get_headers() # Build query for recent events query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of events by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get events: {response.status}") return None except Exception as e: print(f"Error getting events: {e}") return None def get_last_run_time(): """Get the last successful run timestamp from S3""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state_data = json.loads(response['Body'].read()) return state_data.get('last_run_time') except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': print("No previous state found, collecting last hour of logs") return None else: print(f"Error reading state: {e}") return None except Exception as e: print(f"Error reading state: {e}") return None def update_last_run_time(timestamp): """Update the last successful run timestamp in S3""" try: state_data = { 'last_run_time': timestamp, 'updated_at': datetime.now(timezone.utc).isoformat() } s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state_data), ContentType='application/json' ) print(f"Updated state with timestamp: {timestamp}") except Exception as e: print(f"Error updating state: {e}") def upload_logs_to_s3(logs_data, log_type, timestamp): """Upload logs to S3 bucket""" try: if not logs_data or 'data' not in logs_data or not logs_data['data']: print(f"No {log_type} data to upload") return # Create filename with timestamp dt = datetime.now(timezone.utc) filename = f"{S3_PREFIX}{log_type}/{dt.strftime('%Y/%m/%d')}/{log_type}_{dt.strftime('%Y%m%d_%H%M%S')}.json" # Upload to S3 s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=json.dumps(logs_data), ContentType='application/json' ) print(f"Uploaded {len(logs_data['data'])} {log_type} records to s3://{S3_BUCKET}/{filename}") except Exception as e: print(f"Error uploading {log_type} to S3: {e}") def lambda_handler(event, context): """Main Lambda handler function""" print(f"Starting Cisco vManage log collection at {datetime.now(timezone.utc)}") try: # Get last run time last_run_time = get_last_run_time() # Initialize vManage API client vmanage = VManageAPI(VMANAGE_HOST, VMANAGE_USERNAME, VMANAGE_PASSWORD) # Authenticate if not vmanage.authenticate(): return { 'statusCode': 500, 'body': json.dumps('Failed to authenticate with vManage') } print("Successfully authenticated with vManage") # Current timestamp for state tracking (store as epoch milliseconds) current_time = int(datetime.now(timezone.utc).timestamp() * 1000) # Collect different types of logs log_types = [ ('audit_logs', vmanage.get_audit_logs), ('alarms', vmanage.get_alarms), ('events', vmanage.get_events) ] total_records = 0 for log_type, get_function in log_types: try: print(f"Collecting {log_type}...") logs_data = get_function(last_run_time) if logs_data: upload_logs_to_s3(logs_data, log_type, current_time) if 'data' in logs_data: total_records += len(logs_data['data']) except Exception as e: print(f"Error processing {log_type}: {e}") continue # Update state with current timestamp update_last_run_time(current_time) print(f"Collection completed. Total records processed: {total_records}") return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Log collection completed successfully', 'total_records': total_records, 'timestamp': datetime.now(timezone.utc).isoformat() }) } except Exception as e: print(f"Lambda execution error: {e}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') }
Rufen Sie Konfiguration > Umgebungsvariablen auf.
Klicken Sie auf Bearbeiten> Neue Umgebungsvariable hinzufügen.
Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte:
Schlüssel Beispielwert S3_BUCKET
cisco-sdwan-logs-bucket
S3_PREFIX
cisco-sdwan/
STATE_KEY
cisco-sdwan/state.json
VMANAGE_HOST
https://your-vmanage-server:8443
VMANAGE_USERNAME
your-vmanage-username
VMANAGE_PASSWORD
your-vmanage-password
Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite (oder öffnen Sie Lambda> Funktionen> cisco-sdwan-log-collector).
Wählen Sie den Tab Konfiguration aus.
Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.
Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.
EventBridge-Zeitplan erstellen
- Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Wiederkehrender Termin: Rate (
1 hour
) - Ziel: Ihre Lambda-Funktion
cisco-sdwan-log-collector
- Name:
cisco-sdwan-log-collector-1h
- Wiederkehrender Termin: Rate (
- Klicken Sie auf Zeitplan erstellen.
Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen
- Rufen Sie in der AWS-Konsole IAM > Nutzer > Nutzer hinzufügen auf.
- Klicken Sie auf Add users (Nutzer hinzufügen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer:
secops-reader
- Zugriffstyp: Zugriffsschlüssel – Programmatischer Zugriff
- Nutzer:
- Klicken Sie auf Nutzer erstellen.
- Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
Geben Sie im JSON-Editor die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket", "Condition": { "StringLike": { "s3:prefix": ["cisco-sdwan/*"] } } }, { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/*" } ] }
Nennen Sie die Richtlinie
secops-reader-policy
.Klicken Sie auf Richtlinie erstellen.
Kehren Sie zur Nutzererstellung zurück, suchen Sie nach
secops-reader-policy
und wählen Sie die Option aus.Klicken Sie auf Weiter: Tags.
Klicken Sie auf Weiter: Überprüfen.
Klicken Sie auf Nutzer erstellen.
Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).
Feed in Google SecOps konfigurieren, um Cisco vManage SD-WAN-Logs aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf + Neuen Feed hinzufügen.
- Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B.
Cisco SD-WAN logs
. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Cisco vManage SD-WAN als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://cisco-sdwan-logs-bucket/cisco-sdwan/
- Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Standardmäßig 180 Tage.
- Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Geheimer Zugriffsschlüssel: Der geheime Schlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Asset-Namespace: der Asset-Namespace.
- Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten