Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Overgrowth MCP Server - Network automation via GNS3 and SSH | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| from typing import Any, Dict, List, Optional | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent | |
| import requests | |
| from netmiko import ConnectHandler | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("overgrowth-mcp") | |
| # Configuration - support environment variable | |
| GNS3_SERVER = os.getenv("GNS3_SERVER", "http://localhost:3080") | |
| GNS3_API_BASE = f"{GNS3_SERVER}/v2" | |
| SSH_KEY_PATH = Path.home() / ".ssh" / "overgrowth_rsa" | |
| BACKUP_DIR = Path(__file__).parent / "backups" | |
| BACKUP_DIR.mkdir(exist_ok=True) | |
| # Initialize MCP server | |
| app = Server("overgrowth-mcp-server") | |
| class GNS3Client: | |
| """Client for GNS3 API interactions""" | |
| def __init__(self, base_url: str = GNS3_API_BASE): | |
| self.base_url = base_url | |
| def get_projects(self) -> List[Dict]: | |
| """Get all GNS3 projects""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get projects: {e}") | |
| return [] | |
| def get_project(self, project_id: str) -> Optional[Dict]: | |
| """Get specific project details""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects/{project_id}") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get project {project_id}: {e}") | |
| return None | |
| def get_nodes(self, project_id: str) -> List[Dict]: | |
| """Get all nodes in a project""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects/{project_id}/nodes") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get nodes for project {project_id}: {e}") | |
| return [] | |
| def get_links(self, project_id: str) -> List[Dict]: | |
| """Get all links in a project""" | |
| try: | |
| response = requests.get(f"{self.base_url}/projects/{project_id}/links") | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Failed to get links for project {project_id}: {e}") | |
| return [] | |
| def start_node(self, project_id: str, node_id: str) -> bool: | |
| """Start a node""" | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/projects/{project_id}/nodes/{node_id}/start" | |
| ) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to start node {node_id}: {e}") | |
| return False | |
| def stop_node(self, project_id: str, node_id: str) -> bool: | |
| """Stop a node""" | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/projects/{project_id}/nodes/{node_id}/stop" | |
| ) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to stop node {node_id}: {e}") | |
| return False | |
| class DeviceManager: | |
| """Manages SSH connections to network devices""" | |
| def __init__(self, ssh_key_path: Path = SSH_KEY_PATH): | |
| self.ssh_key_path = ssh_key_path | |
| def connect(self, host: str, username: str = "admin", port: int = 22) -> Optional[ConnectHandler]: | |
| """Connect to a device via SSH""" | |
| device_params = { | |
| 'device_type': 'cisco_ios', # Default, can be parameterized | |
| 'host': host, | |
| 'username': username, | |
| 'port': port, | |
| 'use_keys': True, | |
| 'key_file': str(self.ssh_key_path), | |
| 'timeout': 30, | |
| } | |
| try: | |
| connection = ConnectHandler(**device_params) | |
| return connection | |
| except Exception as e: | |
| logger.error(f"Failed to connect to {host}: {e}") | |
| return None | |
| def send_command(self, connection: ConnectHandler, command: str) -> str: | |
| """Send command to device""" | |
| try: | |
| output = connection.send_command(command) | |
| return output | |
| except Exception as e: | |
| logger.error(f"Failed to send command: {e}") | |
| return f"Error: {str(e)}" | |
| def send_config(self, connection: ConnectHandler, config_commands: List[str]) -> str: | |
| """Send configuration commands to device""" | |
| try: | |
| output = connection.send_config_set(config_commands) | |
| return output | |
| except Exception as e: | |
| logger.error(f"Failed to send config: {e}") | |
| return f"Error: {str(e)}" | |
| # Initialize clients | |
| gns3 = GNS3Client() | |
| device_mgr = DeviceManager() | |
| async def list_tools() -> List[Tool]: | |
| """List available MCP tools""" | |
| return [ | |
| Tool( | |
| name="get_projects", | |
| description="Get list of all GNS3 projects", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": {}, | |
| "required": [] | |
| } | |
| ), | |
| Tool( | |
| name="get_topology", | |
| description="Get topology information for a GNS3 project including nodes and links", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name of the GNS3 project" | |
| } | |
| }, | |
| "required": ["project_name"] | |
| } | |
| ), | |
| Tool( | |
| name="start_device", | |
| description="Start a network device in GNS3", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name of the GNS3 project" | |
| }, | |
| "device_name": { | |
| "type": "string", | |
| "description": "Name of the device to start" | |
| } | |
| }, | |
| "required": ["project_name", "device_name"] | |
| } | |
| ), | |
| Tool( | |
| name="stop_device", | |
| description="Stop a network device in GNS3", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name of the GNS3 project" | |
| }, | |
| "device_name": { | |
| "type": "string", | |
| "description": "Name of the device to stop" | |
| } | |
| }, | |
| "required": ["project_name", "device_name"] | |
| } | |
| ), | |
| Tool( | |
| name="get_device_config", | |
| description="Get running configuration from a network device via SSH", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "host": { | |
| "type": "string", | |
| "description": "IP address or hostname of device" | |
| }, | |
| "username": { | |
| "type": "string", | |
| "description": "SSH username (default: admin)", | |
| "default": "admin" | |
| }, | |
| "port": { | |
| "type": "integer", | |
| "description": "SSH port (default: 22)", | |
| "default": 22 | |
| } | |
| }, | |
| "required": ["host"] | |
| } | |
| ), | |
| Tool( | |
| name="configure_device", | |
| description="Send configuration commands to a network device via SSH", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "host": { | |
| "type": "string", | |
| "description": "IP address or hostname of device" | |
| }, | |
| "commands": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "List of configuration commands to send" | |
| }, | |
| "username": { | |
| "type": "string", | |
| "description": "SSH username (default: admin)", | |
| "default": "admin" | |
| }, | |
| "port": { | |
| "type": "integer", | |
| "description": "SSH port (default: 22)", | |
| "default": 22 | |
| } | |
| }, | |
| "required": ["host", "commands"] | |
| } | |
| ), | |
| Tool( | |
| name="backup_device_config", | |
| description="Backup device configuration to a file", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "host": { | |
| "type": "string", | |
| "description": "Device IP address or hostname" | |
| }, | |
| "device_name": { | |
| "type": "string", | |
| "description": "Device name for backup file" | |
| }, | |
| "username": { | |
| "type": "string", | |
| "description": "SSH username", | |
| "default": "admin" | |
| } | |
| }, | |
| "required": ["host", "device_name"] | |
| } | |
| ), | |
| Tool( | |
| name="build_network_from_description", | |
| description="Build a complete GNS3 network topology from a natural language description. Creates the living digital environment automatically.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "description": { | |
| "type": "string", | |
| "description": "Natural language description of the network (e.g., 'A coffee shop chain with 3 locations, each with POS and WiFi')" | |
| }, | |
| "project_name": { | |
| "type": "string", | |
| "description": "Name for the GNS3 project", | |
| "default": "overgrowth" | |
| }, | |
| "auto_configure": { | |
| "type": "boolean", | |
| "description": "Automatically configure devices after creation", | |
| "default": True | |
| } | |
| }, | |
| "required": ["description"] | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments: Any) -> List[TextContent]: | |
| """Handle tool calls""" | |
| if name == "get_projects": | |
| projects = gns3.get_projects() | |
| project_list = [{"name": p["name"], "id": p["project_id"], "status": p.get("status", "unknown")} | |
| for p in projects] | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(project_list, indent=2) | |
| )] | |
| elif name == "get_topology": | |
| project_name = arguments["project_name"] | |
| # Find project by name | |
| projects = gns3.get_projects() | |
| project = next((p for p in projects if p["name"] == project_name), None) | |
| if not project: | |
| return [TextContent( | |
| type="text", | |
| text=f"Project '{project_name}' not found" | |
| )] | |
| project_id = project["project_id"] | |
| nodes = gns3.get_nodes(project_id) | |
| links = gns3.get_links(project_id) | |
| topology = { | |
| "project": project["name"], | |
| "status": project.get("status", "unknown"), | |
| "nodes": [ | |
| { | |
| "name": n["name"], | |
| "node_type": n["node_type"], | |
| "status": n.get("status", "stopped"), | |
| "console": n.get("console"), | |
| "console_type": n.get("console_type"), | |
| "properties": n.get("properties", {}) | |
| } | |
| for n in nodes | |
| ], | |
| "links": [ | |
| { | |
| "nodes": [ | |
| {"node": link["nodes"][0].get("label", "unknown"), | |
| "port": link["nodes"][0].get("adapter_number", 0)}, | |
| {"node": link["nodes"][1].get("label", "unknown"), | |
| "port": link["nodes"][1].get("adapter_number", 0)} | |
| ] if len(link["nodes"]) >= 2 else [] | |
| } | |
| for link in links | |
| ] | |
| } | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(topology, indent=2) | |
| )] | |
| elif name == "start_device": | |
| project_name = arguments["project_name"] | |
| device_name = arguments["device_name"] | |
| # Find project and node | |
| projects = gns3.get_projects() | |
| project = next((p for p in projects if p["name"] == project_name), None) | |
| if not project: | |
| return [TextContent(type="text", text=f"Project '{project_name}' not found")] | |
| nodes = gns3.get_nodes(project["project_id"]) | |
| node = next((n for n in nodes if n["name"] == device_name), None) | |
| if not node: | |
| return [TextContent(type="text", text=f"Device '{device_name}' not found")] | |
| success = gns3.start_node(project["project_id"], node["node_id"]) | |
| return [TextContent( | |
| type="text", | |
| text=f"Device '{device_name}' {'started successfully' if success else 'failed to start'}" | |
| )] | |
| elif name == "stop_device": | |
| project_name = arguments["project_name"] | |
| device_name = arguments["device_name"] | |
| # Find project and node | |
| projects = gns3.get_projects() | |
| project = next((p for p in projects if p["name"] == project_name), None) | |
| if not project: | |
| return [TextContent(type="text", text=f"Project '{project_name}' not found")] | |
| nodes = gns3.get_nodes(project["project_id"]) | |
| node = next((n for n in nodes if n["name"] == device_name), None) | |
| if not node: | |
| return [TextContent(type="text", text=f"Device '{device_name}' not found")] | |
| success = gns3.stop_node(project["project_id"], node["node_id"]) | |
| return [TextContent( | |
| type="text", | |
| text=f"Device '{device_name}' {'stopped successfully' if success else 'failed to stop'}" | |
| )] | |
| elif name == "get_device_config": | |
| host = arguments["host"] | |
| username = arguments.get("username", "admin") | |
| port = arguments.get("port", 22) | |
| connection = device_mgr.connect(host, username, port) | |
| if not connection: | |
| return [TextContent(type="text", text=f"Failed to connect to {host}")] | |
| config = device_mgr.send_command(connection, "show running-config") | |
| connection.disconnect() | |
| return [TextContent(type="text", text=config)] | |
| elif name == "configure_device": | |
| host = arguments["host"] | |
| commands = arguments["commands"] | |
| username = arguments.get("username", "admin") | |
| port = arguments.get("port", 22) | |
| connection = device_mgr.connect(host, username, port) | |
| if not connection: | |
| return [TextContent(type="text", text=f"Failed to connect to {host}")] | |
| output = device_mgr.send_config(connection, commands) | |
| connection.disconnect() | |
| return [TextContent(type="text", text=output)] | |
| elif name == "backup_device_config": | |
| host = arguments["host"] | |
| device_name = arguments["device_name"] | |
| username = arguments.get("username", "admin") | |
| connection = device_mgr.connect(host, username) | |
| if not connection: | |
| return [TextContent(type="text", text=f"Failed to connect to {host}")] | |
| config = device_mgr.send_command(connection, "show running-config") | |
| connection.disconnect() | |
| # Save to file | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| backup_file = BACKUP_DIR / f"{device_name}-{timestamp}.cfg" | |
| backup_file.write_text(config) | |
| return [TextContent( | |
| type="text", | |
| text=f"Configuration backed up to {backup_file}" | |
| )] | |
| elif name == "build_network_from_description": | |
| description = arguments.get("description", "") | |
| project_name = arguments.get("project_name", "overgrowth") | |
| auto_configure = arguments.get("auto_configure", True) | |
| # Import the network builder | |
| import subprocess | |
| import sys | |
| # Run the network builder script | |
| script_path = Path(__file__).parent / "build_retail_network.py" | |
| try: | |
| result = subprocess.run( | |
| [sys.executable, str(script_path)], | |
| capture_output=True, | |
| text=True, | |
| timeout=60 | |
| ) | |
| if result.returncode == 0: | |
| output = result.stdout | |
| # If auto_configure is True, run the configuration script | |
| if auto_configure: | |
| config_script = Path(__file__).parent / "auto_configure_network.py" | |
| config_result = subprocess.run( | |
| [sys.executable, str(config_script)], | |
| capture_output=True, | |
| text=True, | |
| timeout=120 | |
| ) | |
| if config_result.returncode == 0: | |
| output += "\n\n" + config_result.stdout | |
| return [TextContent( | |
| type="text", | |
| text=f"β Network built and configured!\n\n{output}\n\nUser Request: {description}" | |
| )] | |
| else: | |
| return [TextContent( | |
| type="text", | |
| text=f"β οΈ Network built but configuration failed:\n{config_result.stderr}" | |
| )] | |
| return [TextContent( | |
| type="text", | |
| text=f"β Network topology created!\n\n{output}" | |
| )] | |
| else: | |
| return [TextContent( | |
| type="text", | |
| text=f"β Failed to build network:\n{result.stderr}" | |
| )] | |
| except subprocess.TimeoutExpired: | |
| return [TextContent( | |
| type="text", | |
| text="β Network build timed out (>60s)" | |
| )] | |
| except Exception as e: | |
| return [TextContent( | |
| type="text", | |
| text=f"β Error building network: {str(e)}" | |
| )] | |
| else: | |
| return [TextContent(type="text", text=f"Unknown tool: {name}")] | |
| async def main(): | |
| """Run the MCP server""" | |
| from mcp.server.stdio import stdio_server | |
| async with stdio_server() as (read_stream, write_stream): | |
| logger.info("Overgrowth MCP Server starting...") | |
| await app.run( | |
| read_stream, | |
| write_stream, | |
| app.create_initialization_options() | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |