Recolha registos do Cisco vManage SD-WAN
Este documento explica como carregar registos de SD-WAN do Cisco vManage para o Google Security Operations através do Amazon S3.
Antes de começar
Certifique-se de que tem os seguintes pré-requisitos:
- Uma instância do Google SecOps
- Acesso privilegiado à consola de gestão do Cisco vManage SD-WAN
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Recolha os pré-requisitos do Cisco vManage SD-WAN (credenciais e URL base)
- Inicie sessão na consola de gestão do Cisco vManage.
- Aceda a Administração > Definições > Utilizadores.
- Crie um novo utilizador ou use um utilizador administrador existente com privilégios de acesso à API.
- Copie e guarde numa localização segura os seguintes detalhes:
- Nome de utilizador
- Palavra-passe
- URL base do vManage (por exemplo,
https://your-vmanage-server:8443
)
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
cisco-sdwan-logs-bucket
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/state.json" } ] }
- Substitua
cisco-sdwan-logs-bucket
se tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções.
Clique em Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
cisco-sdwan-lambda-role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome cisco-sdwan-log-collector
Runtime Python 3.13 Arquitetura x86_64 Função de execução cisco-sdwan-lambda-role
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
cisco-sdwan-log-collector.py
):import json import boto3 import os import urllib3 import urllib.parse from datetime import datetime, timezone from botocore.exceptions import ClientError # Disable SSL warnings for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Environment variables VMANAGE_HOST = os.environ['VMANAGE_HOST'] VMANAGE_USERNAME = os.environ['VMANAGE_USERNAME'] VMANAGE_PASSWORD = os.environ['VMANAGE_PASSWORD'] S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] s3_client = boto3.client('s3') http = urllib3.PoolManager(cert_reqs='CERT_NONE') class VManageAPI: def __init__(self, host, username, password): self.host = host.rstrip('/') self.username = username self.password = password self.cookies = None self.token = None def authenticate(self): """Authenticate with vManage and get session tokens""" try: # Login to get JSESSIONID login_url = f"{self.host}/j_security_check" login_data = urllib.parse.urlencode({ 'j_username': self.username, 'j_password': self.password }) response = http.request( 'POST', login_url, body=login_data, headers={'Content-Type': 'application/x-www-form-urlencoded'}, timeout=30 ) # Check if login was successful (vManage returns HTML on failure) if b'<html>' in response.data or response.status != 200: raise Exception("Authentication failed") # Extract cookies self.cookies = {} if 'Set-Cookie' in response.headers: cookie_header = response.headers['Set-Cookie'] for cookie in cookie_header.split(';'): if 'JSESSIONID=' in cookie: self.cookies['JSESSIONID'] = cookie.split('JSESSIONID=')[1].split(';')[0] break if not self.cookies.get('JSESSIONID'): raise Exception("Failed to get JSESSIONID") # Get XSRF token token_url = f"{self.host}/dataservice/client/token" headers = { 'Content-Type': 'application/json', 'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}" } response = http.request('GET', token_url, headers=headers, timeout=30) if response.status == 200: self.token = response.data.decode('utf-8') return True else: raise Exception(f"Failed to get XSRF token: {response.status}") except Exception as e: print(f"Authentication error: {e}") return False def get_headers(self): """Get headers for API requests""" return { 'Content-Type': 'application/json', 'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}", 'X-XSRF-TOKEN': self.token } def get_audit_logs(self, last_timestamp=None): """Get audit logs from vManage""" try: url = f"{self.host}/dataservice/auditlog" headers = self.get_headers() # Build query for recent logs query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of logs by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get audit logs: {response.status}") return None except Exception as e: print(f"Error getting audit logs: {e}") return None def get_alarms(self, last_timestamp=None): """Get alarms from vManage""" try: url = f"{self.host}/dataservice/alarms" headers = self.get_headers() # Build query for recent alarms query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of alarms by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get alarms: {response.status}") return None except Exception as e: print(f"Error getting alarms: {e}") return None def get_events(self, last_timestamp=None): """Get events from vManage""" try: url = f"{self.host}/dataservice/events" headers = self.get_headers() # Build query for recent events query = { "query": { "condition": "AND", "rules": [] }, "size": 10000 } # Add timestamp filter if provided if last_timestamp: # Convert timestamp to epoch milliseconds for vManage API if isinstance(last_timestamp, str): try: dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00')) epoch_ms = int(dt.timestamp() * 1000) except: epoch_ms = int(last_timestamp) else: epoch_ms = int(last_timestamp) query["query"]["rules"].append({ "value": [str(epoch_ms)], "field": "entry_time", "type": "date", "operator": "greater" }) else: # Get last 1 hour of events by default query["query"]["rules"].append({ "value": ["1"], "field": "entry_time", "type": "date", "operator": "last_n_hours" }) response = http.request( 'POST', url, body=json.dumps(query), headers=headers, timeout=60 ) if response.status == 200: return json.loads(response.data.decode('utf-8')) else: print(f"Failed to get events: {response.status}") return None except Exception as e: print(f"Error getting events: {e}") return None def get_last_run_time(): """Get the last successful run timestamp from S3""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state_data = json.loads(response['Body'].read()) return state_data.get('last_run_time') except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': print("No previous state found, collecting last hour of logs") return None else: print(f"Error reading state: {e}") return None except Exception as e: print(f"Error reading state: {e}") return None def update_last_run_time(timestamp): """Update the last successful run timestamp in S3""" try: state_data = { 'last_run_time': timestamp, 'updated_at': datetime.now(timezone.utc).isoformat() } s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state_data), ContentType='application/json' ) print(f"Updated state with timestamp: {timestamp}") except Exception as e: print(f"Error updating state: {e}") def upload_logs_to_s3(logs_data, log_type, timestamp): """Upload logs to S3 bucket""" try: if not logs_data or 'data' not in logs_data or not logs_data['data']: print(f"No {log_type} data to upload") return # Create filename with timestamp dt = datetime.now(timezone.utc) filename = f"{S3_PREFIX}{log_type}/{dt.strftime('%Y/%m/%d')}/{log_type}_{dt.strftime('%Y%m%d_%H%M%S')}.json" # Upload to S3 s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=json.dumps(logs_data), ContentType='application/json' ) print(f"Uploaded {len(logs_data['data'])} {log_type} records to s3://{S3_BUCKET}/{filename}") except Exception as e: print(f"Error uploading {log_type} to S3: {e}") def lambda_handler(event, context): """Main Lambda handler function""" print(f"Starting Cisco vManage log collection at {datetime.now(timezone.utc)}") try: # Get last run time last_run_time = get_last_run_time() # Initialize vManage API client vmanage = VManageAPI(VMANAGE_HOST, VMANAGE_USERNAME, VMANAGE_PASSWORD) # Authenticate if not vmanage.authenticate(): return { 'statusCode': 500, 'body': json.dumps('Failed to authenticate with vManage') } print("Successfully authenticated with vManage") # Current timestamp for state tracking (store as epoch milliseconds) current_time = int(datetime.now(timezone.utc).timestamp() * 1000) # Collect different types of logs log_types = [ ('audit_logs', vmanage.get_audit_logs), ('alarms', vmanage.get_alarms), ('events', vmanage.get_events) ] total_records = 0 for log_type, get_function in log_types: try: print(f"Collecting {log_type}...") logs_data = get_function(last_run_time) if logs_data: upload_logs_to_s3(logs_data, log_type, current_time) if 'data' in logs_data: total_records += len(logs_data['data']) except Exception as e: print(f"Error processing {log_type}: {e}") continue # Update state with current timestamp update_last_run_time(current_time) print(f"Collection completed. Total records processed: {total_records}") return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Log collection completed successfully', 'total_records': total_records, 'timestamp': datetime.now(timezone.utc).isoformat() }) } except Exception as e: print(f"Lambda execution error: {e}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') }
Aceda a Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:
Chave Valor de exemplo S3_BUCKET
cisco-sdwan-logs-bucket
S3_PREFIX
cisco-sdwan/
STATE_KEY
cisco-sdwan/state.json
VMANAGE_HOST
https://your-vmanage-server:8443
VMANAGE_USERNAME
your-vmanage-username
VMANAGE_PASSWORD
your-vmanage-password
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > cisco-sdwan-log-collector).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Horário recorrente: Taxa (
1 hour
) - Alvo: a sua função Lambda
cisco-sdwan-log-collector
- Nome:
cisco-sdwan-log-collector-1h
- Horário recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador:
secops-reader
- Tipo de acesso: chave de acesso – acesso programático
- Utilizador:
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket", "Condition": { "StringLike": { "s3:prefix": ["cisco-sdwan/*"] } } }, { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/*" } ] }
Atribua um nome à política
secops-reader-policy
.Clique em Criar política.
Regresse à criação do utilizador, pesquise e selecione
secops-reader-policy
.Clique em Seguinte: Etiquetas.
Clique em Seguinte: rever.
Clique em Criar utilizador.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos do Cisco vManage SD-WAN
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Cisco SD-WAN logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Cisco vManage SD-WAN como Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://cisco-sdwan-logs-bucket/cisco-sdwan/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. Predefinição de 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.