Raccogliere i log di Cisco vManage SD-WAN
Questo documento spiega come importare i log SD-WAN di Cisco vManage in Google Security Operations utilizzando Amazon S3.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Accesso privilegiato alla console di gestione Cisco vManage SD-WAN
- Accesso con privilegi ad AWS (S3, IAM, Lambda, EventBridge)
Raccogli i prerequisiti di Cisco vManage SD-WAN (credenziali e URL di base)
- Accedi alla console di gestione Cisco vManage.
- Vai ad Amministrazione > Impostazioni > Utenti.
- Crea un nuovo utente o utilizzane uno amministratore esistente con privilegi di accesso API.
- Copia e salva in una posizione sicura i seguenti dettagli:
- Nome utente
- Password
- URL di base di vManage (ad esempio,
https://your-vmanage-server:8443
)
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
cisco-sdwan-logs-bucket
). - Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy > Crea policy > scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/state.json" } ] }
- Sostituisci
cisco-sdwan-logs-bucket
se hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli.
Fai clic su Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
cisco-sdwan-lambda-role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome cisco-sdwan-log-collector
Tempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione cisco-sdwan-lambda-role
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
cisco-sdwan-log-collector.py
):import json import boto3 import os import urllib3 import urllib.parse from datetime import datetime, timezone from botocore.exceptions import ClientError # Disable SSL warnings for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Environment variables VMANAGE_HOST = os.environ['VMANAGE_HOST'] VMANAGE_USERNAME = os.environ['VMANAGE_USERNAME'] VMANAGE_PASSWORD = os.environ['VMANAGE_PASSWORD'] S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] s3_client = boto3.client('s3') http = urllib3.PoolManager(cert_reqs='CERT_NONE') class VManageAPI: def __init__(self, host, username, password): self.host = host.rstrip('/') self.username = username self.password = password self.cookies = None self.token = None def authenticate(self): """Authenticate with vManage and get session tokens""" try: # Login to get JSESSIONID login_url = f"{self.host}/j_security_check" login_data = urllib.parse.urlencode({ 'j_username': self.username, 'j_password': self.password }) response = http.request( 'POST', login_url, body=login_data, headers={'Content-Type': 'application/x-www-form-urlencoded'}, timeout=30 ) # Check if login was successful (vManage returns HTML on failure) if b'<html>' in response.data or response.status != 200: raise Exception("Authentication failed") # Extract cookies self.cookies = {} if 'Set-Cookie' in response.headers: cookie_header = response.headers['Set-Cookie'] for cookie in cookie_header.split(';'): if 'JSESSIONID=' in cookie: self.cookies['JSESSIONID'] = cookie.split('JSESSIONID=')[1].split(';')[0] break if not self.cookies.get('JSESSIONID'): raise Exception("Failed to get JSESSIONID") # Get XSRF token token_url = f"{self.host}/dataservice/client/token" headers = { 'Content-Type': 'application/json', 'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}" } response = http.request('GET', token_url, headers=headers, timeout=30) if response.status == 200: self.token = response.data.decode('utf-8') return True else: raise Exception(f"Failed to get XSRF token: {response.status}") except Exception as e: print(f"Authentication error: {e}") return False def get_headers(self): """Get headers for API requests""" return { 'Content-Type': 'application/json', 'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}", 'X-XSRF-TOKEN': self.token } def get_audit_logs(self, last_timestamp=None): """Get audit logs from vManage""" try: url = f"{self.host}/dataservice/auditlog" headers = self.get_headers() # Build query for recent logs query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of logs by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get audit logs: {response.status}") return None except Exception as e: print(f"Error getting audit logs: {e}") return None def get_alarms(self, last_timestamp=None): """Get alarms from vManage""" try: url = f"{self.host}/dataservice/alarms" headers = self.get_headers() # Build query for recent alarms query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of alarms by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get alarms: {response.status}") return None except Exception as e: print(f"Error getting alarms: {e}") return None def get_events(self, last_timestamp=None): """Get events from vManage""" try: url = f"{self.host}/dataservice/events" headers = self.get_headers() # Build query for recent events query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of events by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get events: {response.status}") return None except Exception as e: print(f"Error getting events: {e}") return None def get_last_run_time(): """Get the last successful run timestamp from S3""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state_data = json.loads(response['Body'].read()) return state_data.get('last_run_time') except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': print("No previous state found, collecting last hour of logs") return None else: print(f"Error reading state: {e}") return None except Exception as e: print(f"Error reading state: {e}") return None def update_last_run_time(timestamp): """Update the last successful run timestamp in S3""" try: state_data = { 'last_run_time': timestamp, 'updated_at': datetime.now(timezone.utc).isoformat() } s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state_data), ContentType='application/json' ) print(f"Updated state with timestamp: {timestamp}") except Exception as e: print(f"Error updating state: {e}") def upload_logs_to_s3(logs_data, log_type, timestamp): """Upload logs to S3 bucket""" try: if not logs_data or 'data' not in logs_data or not logs_data['data']: print(f"No {log_type} data to upload") return # Create filename with timestamp dt = datetime.now(timezone.utc) filename = f"{S3_PREFIX}{log_type}/{dt.strftime('%Y/%m/%d')}/{log_type}_{dt.strftime('%Y%m%d_%H%M%S')}.json" # Upload to S3 s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=json.dumps(logs_data), ContentType='application/json' ) print(f"Uploaded {len(logs_data['data'])} {log_type} records to s3://{S3_BUCKET}/{filename}") except Exception as e: print(f"Error uploading {log_type} to S3: {e}") def lambda_handler(event, context): """Main Lambda handler function""" print(f"Starting Cisco vManage log collection at {datetime.now(timezone.utc)}") try: # Get last run time last_run_time = get_last_run_time() # Initialize vManage API client vmanage = VManageAPI(VMANAGE_HOST, VMANAGE_USERNAME, VMANAGE_PASSWORD) # Authenticate if not vmanage.authenticate(): return { 'statusCode': 500, 'body': json.dumps('Failed to authenticate with vManage') } print("Successfully authenticated with vManage") # Current timestamp for state tracking (store as epoch milliseconds) current_time = int(datetime.now(timezone.utc).timestamp() * 1000) # Collect different types of logs log_types = [ ('audit_logs', vmanage.get_audit_logs), ('alarms', vmanage.get_alarms), ('events', vmanage.get_events) ] total_records = 0 for log_type, get_function in log_types: try: print(f"Collecting {log_type}...") logs_data = get_function(last_run_time) if logs_data: upload_logs_to_s3(logs_data, log_type, current_time) if 'data' in logs_data: total_records += len(logs_data['data']) except Exception as e: print(f"Error processing {log_type}: {e}") continue # Update state with current timestamp update_last_run_time(current_time) print(f"Collection completed. Total records processed: {total_records}") return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Log collection completed successfully', 'total_records': total_records, 'timestamp': datetime.now(timezone.utc).isoformat() }) } except Exception as e: print(f"Lambda execution error: {e}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') }
Vai a Configurazione > Variabili di ambiente.
Fai clic su Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:
Chiave Valore di esempio S3_BUCKET
cisco-sdwan-logs-bucket
S3_PREFIX
cisco-sdwan/
STATE_KEY
cisco-sdwan/state.json
VMANAGE_HOST
https://your-vmanage-server:8443
VMANAGE_USERNAME
your-vmanage-username
VMANAGE_PASSWORD
your-vmanage-password
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Functions > cisco-sdwan-log-collector).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
) - Target: la tua funzione Lambda
cisco-sdwan-log-collector
- Nome:
cisco-sdwan-log-collector-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti > Aggiungi utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente:
secops-reader
- Tipo di accesso: Chiave di accesso - Accesso programmatico
- Utente:
- Fai clic su Crea utente.
- Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket", "Condition": { "StringLike": { "s3:prefix": ["cisco-sdwan/*"] } } }, { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/*" } ] }
Assegna al criterio il nome
secops-reader-policy
.Fai clic su Crea criterio.
Torna alla creazione dell'utente, cerca e seleziona
secops-reader-policy
.Fai clic su Avanti: tag.
Fai clic su Successivo: esamina.
Fai clic su Crea utente.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare i log SD-WAN di Cisco vManage
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Cisco SD-WAN logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Cisco vManage SD-WAN come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://cisco-sdwan-logs-bucket/cisco-sdwan/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Valore predefinito 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.