收集 Cisco vManage SD-WAN 日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Cisco vManage SD-WAN 日志注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • Cisco vManage SD-WAN 管理控制台的特权访问权限
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

收集 Cisco vManage SD-WAN 前提条件(凭据和基本网址)

  1. 登录 Cisco vManage 管理控制台
  2. 依次前往管理 > 设置 > 用户
  3. 创建新用户或使用具有 API 访问权限的现有管理员用户。
  4. 复制以下详细信息并将其保存在安全的位置:
    • 用户名
    • 密码
    • vManage 基本网址(例如 https://your-vmanage-server:8443

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 cisco-sdwan-logs-bucket)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/state.json"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 cisco-sdwan-logs-bucket
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色

  5. 依次点击创建角色> AWS 服务 > Lambda

  6. 附加新创建的政策。

  7. 将角色命名为 cisco-sdwan-lambda-role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 cisco-sdwan-log-collector
    运行时 Python 3.13
    架构 x86_64
    执行角色 cisco-sdwan-lambda-role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (cisco-sdwan-log-collector.py):

    import json
    import boto3
    import os
    import urllib3
    import urllib.parse
    from datetime import datetime, timezone
    from botocore.exceptions import ClientError
    
    # Disable SSL warnings for self-signed certificates
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Environment variables
    VMANAGE_HOST = os.environ['VMANAGE_HOST']
    VMANAGE_USERNAME = os.environ['VMANAGE_USERNAME']  
    VMANAGE_PASSWORD = os.environ['VMANAGE_PASSWORD']
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ['S3_PREFIX']
    STATE_KEY = os.environ['STATE_KEY']
    
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager(cert_reqs='CERT_NONE')
    
    class VManageAPI:
        def __init__(self, host, username, password):
            self.host = host.rstrip('/')
            self.username = username
            self.password = password
            self.cookies = None
            self.token = None
    
        def authenticate(self):
            """Authenticate with vManage and get session tokens"""
            try:
                # Login to get JSESSIONID
                login_url = f"{self.host}/j_security_check"
                login_data = urllib.parse.urlencode({
                    'j_username': self.username,
                    'j_password': self.password
                })
    
                response = http.request(
                    'POST',
                    login_url,
                    body=login_data,
                    headers={'Content-Type': 'application/x-www-form-urlencoded'},
                    timeout=30
                )
    
                # Check if login was successful (vManage returns HTML on failure)
                if b'<html>' in response.data or response.status != 200:
                    raise Exception("Authentication failed")
    
                # Extract cookies
                self.cookies = {}
                if 'Set-Cookie' in response.headers:
                    cookie_header = response.headers['Set-Cookie']
                    for cookie in cookie_header.split(';'):
                        if 'JSESSIONID=' in cookie:
                            self.cookies['JSESSIONID'] = cookie.split('JSESSIONID=')[1].split(';')[0]
                            break
    
                if not self.cookies.get('JSESSIONID'):
                    raise Exception("Failed to get JSESSIONID")
    
                # Get XSRF token
                token_url = f"{self.host}/dataservice/client/token"
                headers = {
                    'Content-Type': 'application/json',
                    'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}"
                }
    
                response = http.request('GET', token_url, headers=headers, timeout=30)
    
                if response.status == 200:
                    self.token = response.data.decode('utf-8')
                    return True
                else:
                    raise Exception(f"Failed to get XSRF token: {response.status}")
    
            except Exception as e:
                print(f"Authentication error: {e}")
                return False
    
        def get_headers(self):
            """Get headers for API requests"""
            return {
                'Content-Type': 'application/json',
                'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}",
                'X-XSRF-TOKEN': self.token
            }
    
        def get_audit_logs(self, last_timestamp=None):
            """Get audit logs from vManage"""
            try:
                url = f"{self.host}/dataservice/auditlog"
                headers = self.get_headers()
    
                # Build query for recent logs
                query = {
                    "query": {
                        "condition": "AND",
                        "rules": []
                    },
                    "size": 10000
                }
    
                # Add timestamp filter if provided
                if last_timestamp:
                    # Convert timestamp to epoch milliseconds for vManage API
                    if isinstance(last_timestamp, str):
                        try:
                            dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00'))
                            epoch_ms = int(dt.timestamp() * 1000)
                        except:
                            epoch_ms = int(last_timestamp)
                    else:
                        epoch_ms = int(last_timestamp)
    
                    query["query"]["rules"].append({
                        "value": [str(epoch_ms)],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "greater"
                    })
                else:
                    # Get last 1 hour of logs by default
                    query["query"]["rules"].append({
                        "value": ["1"],
                        "field": "entry_time", 
                        "type": "date",
                        "operator": "last_n_hours"
                    })
    
                response = http.request(
                    'POST',
                    url,
                    body=json.dumps(query),
                    headers=headers,
                    timeout=60
                )
    
                if response.status == 200:
                    return json.loads(response.data.decode('utf-8'))
                else:
                    print(f"Failed to get audit logs: {response.status}")
                    return None
    
            except Exception as e:
                print(f"Error getting audit logs: {e}")
                return None
    
        def get_alarms(self, last_timestamp=None):
            """Get alarms from vManage"""
            try:
                url = f"{self.host}/dataservice/alarms"
                headers = self.get_headers()
    
                # Build query for recent alarms
                query = {
                    "query": {
                        "condition": "AND",
                        "rules": []
                    },
                    "size": 10000
                }
    
                # Add timestamp filter if provided
                if last_timestamp:
                    # Convert timestamp to epoch milliseconds for vManage API
                    if isinstance(last_timestamp, str):
                        try:
                            dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00'))
                            epoch_ms = int(dt.timestamp() * 1000)
                        except:
                            epoch_ms = int(last_timestamp)
                    else:
                        epoch_ms = int(last_timestamp)
    
                    query["query"]["rules"].append({
                        "value": [str(epoch_ms)],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "greater"
                    })
                else:
                    # Get last 1 hour of alarms by default
                    query["query"]["rules"].append({
                        "value": ["1"],
                        "field": "entry_time",
                        "type": "date", 
                        "operator": "last_n_hours"
                    })
    
                response = http.request(
                    'POST',
                    url,
                    body=json.dumps(query),
                    headers=headers,
                    timeout=60
                )
    
                if response.status == 200:
                    return json.loads(response.data.decode('utf-8'))
                else:
                    print(f"Failed to get alarms: {response.status}")
                    return None
    
            except Exception as e:
                print(f"Error getting alarms: {e}")
                return None
    
        def get_events(self, last_timestamp=None):
            """Get events from vManage"""
            try:
                url = f"{self.host}/dataservice/events"
                headers = self.get_headers()
    
                # Build query for recent events
                query = {
                    "query": {
                        "condition": "AND",
                        "rules": []
                    },
                    "size": 10000
                }
    
                # Add timestamp filter if provided  
                if last_timestamp:
                    # Convert timestamp to epoch milliseconds for vManage API
                    if isinstance(last_timestamp, str):
                        try:
                            dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00'))
                            epoch_ms = int(dt.timestamp() * 1000)
                        except:
                            epoch_ms = int(last_timestamp)
                    else:
                        epoch_ms = int(last_timestamp)
    
                    query["query"]["rules"].append({
                        "value": [str(epoch_ms)],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "greater"
                    })
                else:
                    # Get last 1 hour of events by default
                    query["query"]["rules"].append({
                        "value": ["1"],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "last_n_hours"
                    })
    
                response = http.request(
                    'POST',
                    url,
                    body=json.dumps(query),
                    headers=headers,
                    timeout=60
                )
    
                if response.status == 200:
                    return json.loads(response.data.decode('utf-8'))
                else:
                    print(f"Failed to get events: {response.status}")
                    return None
    
            except Exception as e:
                print(f"Error getting events: {e}")
                return None
    
    def get_last_run_time():
        """Get the last successful run timestamp from S3"""
        try:
            response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            state_data = json.loads(response['Body'].read())
            return state_data.get('last_run_time')
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                print("No previous state found, collecting last hour of logs")
                return None
            else:
                print(f"Error reading state: {e}")
                return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_last_run_time(timestamp):
        """Update the last successful run timestamp in S3"""
        try:
            state_data = {
                'last_run_time': timestamp,
                'updated_at': datetime.now(timezone.utc).isoformat()
            }
    
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state_data),
                ContentType='application/json'
            )
    
            print(f"Updated state with timestamp: {timestamp}")
    
        except Exception as e:
            print(f"Error updating state: {e}")
    
    def upload_logs_to_s3(logs_data, log_type, timestamp):
        """Upload logs to S3 bucket"""
        try:
            if not logs_data or 'data' not in logs_data or not logs_data['data']:
                print(f"No {log_type} data to upload")
                return
    
            # Create filename with timestamp
            dt = datetime.now(timezone.utc)
            filename = f"{S3_PREFIX}{log_type}/{dt.strftime('%Y/%m/%d')}/{log_type}_{dt.strftime('%Y%m%d_%H%M%S')}.json"
    
            # Upload to S3
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=filename,
                Body=json.dumps(logs_data),
                ContentType='application/json'
            )
    
            print(f"Uploaded {len(logs_data['data'])} {log_type} records to s3://{S3_BUCKET}/{filename}")
    
        except Exception as e:
            print(f"Error uploading {log_type} to S3: {e}")
    
    def lambda_handler(event, context):
        """Main Lambda handler function"""
        print(f"Starting Cisco vManage log collection at {datetime.now(timezone.utc)}")
    
        try:
            # Get last run time
            last_run_time = get_last_run_time()
    
            # Initialize vManage API client
            vmanage = VManageAPI(VMANAGE_HOST, VMANAGE_USERNAME, VMANAGE_PASSWORD)
    
            # Authenticate
            if not vmanage.authenticate():
                return {
                    'statusCode': 500,
                    'body': json.dumps('Failed to authenticate with vManage')
                }
    
            print("Successfully authenticated with vManage")
    
            # Current timestamp for state tracking (store as epoch milliseconds)
            current_time = int(datetime.now(timezone.utc).timestamp() * 1000)
    
            # Collect different types of logs
            log_types = [
                ('audit_logs', vmanage.get_audit_logs),
                ('alarms', vmanage.get_alarms), 
                ('events', vmanage.get_events)
            ]
    
            total_records = 0
    
            for log_type, get_function in log_types:
                try:
                    print(f"Collecting {log_type}...")
                    logs_data = get_function(last_run_time)
    
                    if logs_data:
                        upload_logs_to_s3(logs_data, log_type, current_time)
                        if 'data' in logs_data:
                            total_records += len(logs_data['data'])
    
                except Exception as e:
                    print(f"Error processing {log_type}: {e}")
                    continue
    
            # Update state with current timestamp
            update_last_run_time(current_time)
    
            print(f"Collection completed. Total records processed: {total_records}")
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Log collection completed successfully',
                    'total_records': total_records,
                    'timestamp': datetime.now(timezone.utc).isoformat()
                })
            }
    
        except Exception as e:
            print(f"Lambda execution error: {e}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入以下环境变量,并替换为您的值:

    示例值
    S3_BUCKET cisco-sdwan-logs-bucket
    S3_PREFIX cisco-sdwan/
    STATE_KEY cisco-sdwan/state.json
    VMANAGE_HOST https://your-vmanage-server:8443
    VMANAGE_USERNAME your-vmanage-username
    VMANAGE_PASSWORD your-vmanage-password
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > cisco-sdwan-log-collector)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排频率 (1 hour)
    • 目标:您的 Lambda 函数 cisco-sdwan-log-collector
    • 名称cisco-sdwan-log-collector-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. AWS 控制台中,依次前往 IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户secops-reader
    • 访问类型访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["cisco-sdwan/*"]
            }
          }
        },
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/*"
        }
      ]
    }
    
  7. 将政策命名为 secops-reader-policy

  8. 点击创建政策

  9. 返回到用户创建界面,搜索并选择 secops-reader-policy

  10. 点击 Next: Tags

  11. 点击下一步:检查

  12. 点击创建用户

  13. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Cisco vManage SD-WAN 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Cisco SD-WAN logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Cisco vManage SD-WAN 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://cisco-sdwan-logs-bucket/cisco-sdwan/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资源命名空间资源命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。