Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Overgrowth MCP Server - Network automation via GNS3 and SSH | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import subprocess | |
| import sys | |
| from typing import Any, Dict, List, Optional | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent | |
| import requests | |
| from netmiko import ConnectHandler | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("overgrowth-mcp") | |
| # Configuration - support environment variable | |
| GNS3_SERVER = os.getenv("GNS3_SERVER", "http://localhost:3080") | |
| GNS3_API_BASE = f"{GNS3_SERVER}/v2" | |
| SSH_KEY_PATH = Path.home() / ".ssh" / "overgrowth_rsa" | |
| BACKUP_DIR = Path(__file__).parent / "backups" | |
| BACKUP_DIR.mkdir(exist_ok=True) | |
| # Initialize MCP server | |
| app = Server("overgrowth-mcp-server") | |
| class GNS3Client: | |
| """Client for GNS3 API interactions""" | |
| def __init__(self, base_url: str = GNS3_API_BASE): | |
| self.base_url = base_url | |
| def create_project(self, name: str, project_id: Optional[str] = None) -> Optional[Dict]: | |
| """Create a GNS3 project (idempotent if name already exists).""" | |
| try: | |
| # If project exists, return it | |
| existing = next((p for p in self.get_projects() if p.get("name") == name), None) | |
| if existing: | |
| return existing | |
| payload: Dict[str, Any] = {"name": name} | |
| if project_id: | |
| payload["project_id"] = project_id | |
| response = requests.post(f"{self.base_url}/projects", json=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to create project {name}: {e}") | |
| return None | |
| def get_projects(self) -> List[Dict]: | |
| """Get all GNS3 projects""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get projects: {e}") | |
| return [] | |
| def get_project(self, project_id: str) -> Optional[Dict]: | |
| """Get specific project details""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects/{project_id}") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get project {project_id}: {e}") | |
| return None | |
| def get_nodes(self, project_id: str) -> List[Dict]: | |
| """Get all nodes in a project""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects/{project_id}/nodes") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get nodes for project {project_id}: {e}") | |
| return [] | |
| def get_links(self, project_id: str) -> List[Dict]: | |
| """Get all links in a project""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects/{project_id}/links") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get links for project {project_id}: {e}") | |
| return [] | |
| def set_startup_config(self, project_id: str, node_id: str, config_text: str) -> bool: | |
| """ | |
| Set startup config for a node. Applies to platforms that honor startup-config (e.g., IOSvL2/IOSv). | |
| """ | |
| try: | |
| url = f"{self.base_url}/projects/{project_id}/nodes/{node_id}/startup-config" | |
| response = requests.put(url, data=config_text.encode("utf-8"), headers={"Content-Type": "text/plain"}) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to set startup config for node {node_id}: {e}") | |
| return False | |
| def start_node(self, project_id: str, node_id: str) -> bool: | |
| """Start a node""" | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/projects/{project_id}/nodes/{node_id}/start" | |
| ) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to start node {node_id}: {e}") | |
| return False | |
| def stop_node(self, project_id: str, node_id: str) -> bool: | |
| """Stop a node""" | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/projects/{project_id}/nodes/{node_id}/stop" | |
| ) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to stop node {node_id}: {e}") | |
| return False | |
| class DeviceManager: | |
| """Manages SSH connections to network devices""" | |
| def __init__(self, ssh_key_path: Path = SSH_KEY_PATH): | |
| self.ssh_key_path = ssh_key_path | |
| def connect(self, host: str, username: str = "admin", port: int = 22) -> Optional[ConnectHandler]: | |
| """Connect to a device via SSH""" | |
| device_params = { | |
| 'device_type': 'cisco_ios', # Default, can be parameterized | |
| 'host': host, | |
| 'username': username, | |
| 'port': port, | |
| 'use_keys': True, | |
| 'key_file': str(self.ssh_key_path), | |
| 'timeout': 30, | |
| } | |
| try: | |
| connection = ConnectHandler(**device_params) | |
| return connection | |
| except Exception as e: | |
| logger.error(f"Failed to connect to {host}: {e}") | |
| return None | |
| def send_command(self, connection: ConnectHandler, command: str) -> str: | |
| """Send command to device""" | |
| try: | |
| output = connection.send_command(command) | |
| return output | |
| except Exception as e: | |
| logger.error(f"Failed to send command: {e}") | |
| return f"Error: {str(e)}" | |
| def send_config(self, connection: ConnectHandler, config_commands: List[str]) -> str: | |
| """Send configuration commands to device""" | |
| try: | |
| output = connection.send_config_set(config_commands) | |
| return output | |
| except Exception as e: | |
| logger.error(f"Failed to send config: {e}") | |
| return f"Error: {str(e)}" | |
| # Initialize clients | |
| gns3 = GNS3Client() | |
| device_mgr = DeviceManager() | |
| async def list_tools() -> List[Tool]: | |
| """List available MCP tools""" | |
| return [ | |
| Tool( | |
| name="get_projects", | |
| description="Get list of all GNS3 projects", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": {}, | |
| "required": [] | |
| } | |
| ), | |
| Tool( | |
| name="get_topology", | |
| description="Get topology information for a GNS3 project including nodes and links", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name of the GNS3 project" | |
| } | |
| }, | |
| "required": ["project_name"] | |
| } | |
| ), | |
| Tool( | |
| name="create_project", | |
| description="Create or return an existing GNS3 project", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name for the project" | |
| }, | |
| "project_id": { | |
| "type": "string", | |
| "description": "Optional explicit project_id to use" | |
| } | |
| }, | |
| "required": ["project_name"] | |
| } | |
| ), | |
| Tool( | |
| name="start_device", | |
| description="Start a network device in GNS3", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name of the GNS3 project" | |
| }, | |
| "device_name": { | |
| "type": "string", | |
| "description": "Name of the device to start" | |
| } | |
| }, | |
| "required": ["project_name", "device_name"] | |
| } | |
| ), | |
| Tool( | |
| name="stop_device", | |
| description="Stop a network device in GNS3", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name of the GNS3 project" | |
| }, | |
| "device_name": { | |
| "type": "string", | |
| "description": "Name of the device to stop" | |
| } | |
| }, | |
| "required": ["project_name", "device_name"] | |
| } | |
| ), | |
| Tool( | |
| name="get_device_config", | |
| description="Get running configuration from a network device via SSH", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "host": { | |
| "type": "string", | |
| "description": "IP address or hostname of device" | |
| }, | |
| "username": { | |
| "type": "string", | |
| "description": "SSH username (default: admin)", | |
| "default": "admin" | |
| }, | |
| "port": { | |
| "type": "integer", | |
| "description": "SSH port (default: 22)", | |
| "default": 22 | |
| } | |
| }, | |
| "required": ["host"] | |
| } | |
| ), | |
| Tool( | |
| name="configure_device", | |
| description="Send configuration commands to a network device via SSH", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "host": { | |
| "type": "string", | |
| "description": "IP address or hostname of device" | |
| }, | |
| "commands": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "List of configuration commands to send" | |
| }, | |
| "username": { | |
| "type": "string", | |
| "description": "SSH username (default: admin)", | |
| "default": "admin" | |
| }, | |
| "port": { | |
| "type": "integer", | |
| "description": "SSH port (default: 22)", | |
| "default": 22 | |
| } | |
| }, | |
| "required": ["host", "commands"] | |
| } | |
| ), | |
| Tool( | |
| name="backup_device_config", | |
| description="Backup device configuration to a file", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "host": { | |
| "type": "string", | |
| "description": "Device IP address or hostname" | |
| }, | |
| "device_name": { | |
| "type": "string", | |
| "description": "Device name for backup file" | |
| }, | |
| "username": { | |
| "type": "string", | |
| "description": "SSH username", | |
| "default": "admin" | |
| } | |
| }, | |
| "required": ["host", "device_name"] | |
| } | |
| ), | |
| Tool( | |
| name="build_network_from_description", | |
| description="Build a complete GNS3 network topology from a natural language description. Creates the living digital environment automatically.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "description": { | |
| "type": "string", | |
| "description": "Natural language description of the network (e.g., 'A coffee shop chain with 3 locations, each with POS and WiFi')" | |
| }, | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name for the GNS3 project", | |
| "default": "overgrowth" | |
| }, | |
| "site_count": { | |
| "type": "integer", | |
| "description": "Requested number of branch/store sites (excluding HQ)", | |
| "minimum": 1, | |
| "maximum": 10, | |
| "default": 3 | |
| }, | |
| "auto_configure": { | |
| "type": "boolean", | |
| "description": "Automatically configure devices after creation", | |
| "default": True | |
| } | |
| }, | |
| "required": ["description"] | |
| } | |
| ), | |
| Tool( | |
| name="seed_node_config", | |
| description="Set startup config for a node in a GNS3 project", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": {"type": "string"}, | |
| "device_name": {"type": "string"}, | |
| "config": {"type": "string"} | |
| }, | |
| "required": ["project_name", "device_name", "config"] | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments: Any) -> List[TextContent]: | |
| """Handle tool calls""" | |
| if name == "get_projects": | |
| projects = gns3.get_projects() | |
| project_list = [{"name": p["name"], "id": p["project_id"], "status": p.get("status", "unknown")} | |
| for p in projects] | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(project_list, indent=2) | |
| )] | |
| elif name == "create_project": | |
| project_name = arguments["project_name"] | |
| project_id = arguments.get("project_id") | |
| project = gns3.create_project(project_name, project_id=project_id) | |
| if not project: | |
| return [TextContent(type="text", text=json.dumps({ | |
| "success": False, | |
| "error": f"Failed to create project {project_name}" | |
| }))] | |
| return [TextContent(type="text", text=json.dumps({ | |
| "success": True, | |
| "project": { | |
| "name": project.get("name"), | |
| "id": project.get("project_id"), | |
| "status": project.get("status", "unknown") | |
| } | |
| }))] | |
| elif name == "seed_node_config": | |
| project_name = arguments["project_name"] | |
| device_name = arguments["device_name"] | |
| config = arguments["config"] | |
| projects = gns3.get_projects() | |
| project = next((p for p in projects if p.get("name") == project_name), None) | |
| if not project: | |
| return [TextContent(type="text", text=json.dumps({"success": False, "error": f"Project {project_name} not found"}))] | |
| project_id = project.get("project_id") | |
| nodes = gns3.get_nodes(project_id) | |
| node = next((n for n in nodes if n.get("name") == device_name), None) | |
| if not node: | |
| return [TextContent(type="text", text=json.dumps({"success": False, "error": f"Device {device_name} not found"}))] | |
| success = gns3.set_startup_config(project_id, node.get("node_id"), config) | |
| return [TextContent(type="text", text=json.dumps({ | |
| "success": success, | |
| "project_id": project_id, | |
| "project_name": project_name, | |
| "device": device_name | |
| }))] | |
| elif name == "get_topology": | |
| project_name = arguments["project_name"] | |
| # Find project by name | |
| projects = gns3.get_projects() | |
| project = next((p for p in projects if p["name"] == project_name), None) | |
| if not project: | |
| return [TextContent( | |
| type="text", | |
| text=f"Project '{project_name}' not found" | |
| )] | |
| project_id = project["project_id"] | |
| nodes = gns3.get_nodes(project_id) | |
| links = gns3.get_links(project_id) | |
| topology = { | |
| "project": project["name"], | |
| "status": project.get("status", "unknown"), | |
| "nodes": [ | |
| { | |
| "name": n["name"], | |
| "node_type": n["node_type"], | |
| "status": n.get("status", "stopped"), | |
| "console": n.get("console"), | |
| "console_type": n.get("console_type"), | |
| "properties": n.get("properties", {}) | |
| } | |
| for n in nodes | |
| ], | |
| "links": [ | |
| { | |
| "nodes": [ | |
| {"node": link["nodes"][0].get("label", "unknown"), | |
| "port": link["nodes"][0].get("adapter_number", 0)}, | |
| {"node": link["nodes"][1].get("label", "unknown"), | |
| "port": link["nodes"][1].get("adapter_number", 0)} | |
| ] if len(link["nodes"]) >= 2 else [] | |
| } | |
| for link in links | |
| ] | |
| } | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(topology, indent=2) | |
| )] | |
| elif name == "start_device": | |
| project_name = arguments["project_name"] | |
| device_name = arguments["device_name"] | |
| # Find project and node | |
| projects = gns3.get_projects() | |
| project = next((p for p in projects if p["name"] == project_name), None) | |
| if not project: | |
| return [TextContent(type="text", text=f"Project '{project_name}' not found")] | |
| nodes = gns3.get_nodes(project["project_id"]) | |
| node = next((n for n in nodes if n["name"] == device_name), None) | |
| if not node: | |
| return [TextContent(type="text", text=f"Device '{device_name}' not found")] | |
| success = gns3.start_node(project["project_id"], node["node_id"]) | |
| return [TextContent( | |
| type="text", | |
| text=f"Device '{device_name}' {'started successfully' if success else 'failed to start'}" | |
| )] | |
| elif name == "stop_device": | |
| project_name = arguments["project_name"] | |
| device_name = arguments["device_name"] | |
| # Find project and node | |
| projects = gns3.get_projects() | |
| project = next((p for p in projects if p["name"] == project_name), None) | |
| if not project: | |
| return [TextContent(type="text", text=f"Project '{project_name}' not found")] | |
| nodes = gns3.get_nodes(project["project_id"]) | |
| node = next((n for n in nodes if n["name"] == device_name), None) | |
| if not node: | |
| return [TextContent(type="text", text=f"Device '{device_name}' not found")] | |
| success = gns3.stop_node(project["project_id"], node["node_id"]) | |
| return [TextContent( | |
| type="text", | |
| text=f"Device '{device_name}' {'stopped successfully' if success else 'failed to stop'}" | |
| )] | |
| elif name == "get_device_config": | |
| host = arguments["host"] | |
| username = arguments.get("username", "admin") | |
| port = arguments.get("port", 22) | |
| connection = device_mgr.connect(host, username, port) | |
| if not connection: | |
| return [TextContent(type="text", text=f"Failed to connect to {host}")] | |
| config = device_mgr.send_command(connection, "show running-config") | |
| connection.disconnect() | |
| return [TextContent(type="text", text=config)] | |
| elif name == "configure_device": | |
| host = arguments["host"] | |
| commands = arguments["commands"] | |
| username = arguments.get("username", "admin") | |
| port = arguments.get("port", 22) | |
| connection = device_mgr.connect(host, username, port) | |
| if not connection: | |
| return [TextContent(type="text", text=f"Failed to connect to {host}")] | |
| output = device_mgr.send_config(connection, commands) | |
| connection.disconnect() | |
| return [TextContent(type="text", text=output)] | |
| elif name == "backup_device_config": | |
| host = arguments["host"] | |
| device_name = arguments["device_name"] | |
| username = arguments.get("username", "admin") | |
| connection = device_mgr.connect(host, username) | |
| if not connection: | |
| return [TextContent(type="text", text=f"Failed to connect to {host}")] | |
| config = device_mgr.send_command(connection, "show running-config") | |
| connection.disconnect() | |
| # Save to file | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| backup_file = BACKUP_DIR / f"{device_name}-{timestamp}.cfg" | |
| backup_file.write_text(config) | |
| return [TextContent( | |
| type="text", | |
| text=f"Configuration backed up to {backup_file}" | |
| )] | |
| elif name == "build_network_from_description": | |
| description = arguments.get("description", "") | |
| project_name = arguments.get("project_name", "overgrowth") | |
| auto_configure = arguments.get("auto_configure", True) | |
| site_count_raw = arguments.get("site_count", 3) | |
| try: | |
| site_count = int(site_count_raw) | |
| except Exception: | |
| site_count = 3 | |
| site_count = max(1, min(10, site_count)) | |
| project = gns3.create_project(project_name, project_id=arguments.get("project_id")) | |
| if not project: | |
| return [ | |
| TextContent( | |
| type="text", | |
| text=json.dumps( | |
| { | |
| "success": False, | |
| "error": f"Failed to create project {project_name}", | |
| "project_name": project_name, | |
| } | |
| ), | |
| ) | |
| ] | |
| response_data: Dict[str, Any] = { | |
| "success": True, | |
| "description": description, | |
| "project_name": project_name, | |
| "project_id": project.get("project_id") or project.get("id"), | |
| "site_count_requested": site_count, | |
| } | |
| try: | |
| from build_retail_network import build_retail_network | |
| build_summary = build_retail_network(GNS3_API_BASE, project_name, site_count) | |
| response_data.update(build_summary or {}) | |
| response_data["success"] = response_data.get("success", True) | |
| except Exception as e: | |
| logger.error(f"build_network_from_description failed: {e}") | |
| response_data["success"] = False | |
| response_data["error"] = str(e) | |
| if auto_configure and response_data.get("success"): | |
| try: | |
| config_script = Path(__file__).parent / "auto_configure_network.py" | |
| if config_script.exists(): | |
| config_result = subprocess.run( | |
| [sys.executable, str(config_script)], | |
| capture_output=True, | |
| text=True, | |
| timeout=180, | |
| ) | |
| response_data["auto_configure"] = { | |
| "success": config_result.returncode == 0, | |
| "stdout": config_result.stdout, | |
| "stderr": config_result.stderr, | |
| } | |
| else: | |
| response_data["auto_configure"] = { | |
| "success": False, | |
| "error": "auto_configure_network.py not found", | |
| } | |
| except Exception as e: | |
| response_data["auto_configure"] = {"success": False, "error": str(e)} | |
| return [TextContent(type="text", text=json.dumps(response_data, indent=2))] | |
| else: | |
| return [TextContent(type="text", text=f"Unknown tool: {name}")] | |
| async def main(): | |
| """Run the MCP server""" | |
| from mcp.server.stdio import stdio_server | |
| async with stdio_server() as (read_stream, write_stream): | |
| logger.info("Overgrowth MCP Server starting...") | |
| await app.run( | |
| read_stream, | |
| write_stream, | |
| app.create_initialization_options() | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |