Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Test schema validation and policy engine | |
| """ | |
| import logging | |
| from agent.schema_validation import ( | |
| validate_network_model, | |
| get_validation_errors, | |
| VLANModel, | |
| SubnetModel, | |
| DeviceModel | |
| ) | |
| from agent.policy_engine import NetworkPolicy | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def test_valid_network(): | |
| """Test validation with a valid network model""" | |
| print("\n=== Test 1: Valid Network Model ===") | |
| model = { | |
| "name": "test-network", | |
| "version": "1.0.0", | |
| "description": "Test network", | |
| "business_requirements": ["High availability"], | |
| "constraints": ["Budget friendly"], | |
| "vlans": [ | |
| {"id": 10, "name": "Management", "purpose": "Network mgmt", "subnet": "10.0.10.0/24"}, | |
| {"id": 20, "name": "Users", "purpose": "Employee workstations", "subnet": "10.0.20.0/24"} | |
| ], | |
| "subnets": [ | |
| { | |
| "network": "10.0.10.0/24", | |
| "gateway": "10.0.10.1", | |
| "vlan": 10, | |
| "purpose": "Management", | |
| "dhcp_enabled": True, | |
| "dhcp_range_start": "10.0.10.100", | |
| "dhcp_range_end": "10.0.10.200" | |
| }, | |
| { | |
| "network": "10.0.20.0/24", | |
| "gateway": "10.0.20.1", | |
| "vlan": 20, | |
| "purpose": "Users" | |
| } | |
| ], | |
| "devices": [ | |
| { | |
| "name": "core-sw-01", | |
| "role": "core", | |
| "model": "Catalyst 9300", | |
| "vendor": "cisco", | |
| "mgmt_ip": "10.0.10.10", | |
| "location": "Main IDF", | |
| "interfaces": [] | |
| } | |
| ], | |
| "services": ["DHCP", "DNS", "NTP"] | |
| } | |
| errors = get_validation_errors(model) | |
| if errors: | |
| print(f"β Validation failed:") | |
| for err in errors: | |
| print(f" - {err}") | |
| else: | |
| validated = validate_network_model(model) | |
| print(f"β Network model validated successfully") | |
| print(f" - Name: {validated.name}") | |
| print(f" - VLANs: {len(validated.vlans)}") | |
| print(f" - Subnets: {len(validated.subnets)}") | |
| print(f" - Devices: {len(validated.devices)}") | |
| def test_invalid_vlan(): | |
| """Test VLAN validation""" | |
| print("\n=== Test 2: Invalid VLAN ID ===") | |
| model = { | |
| "name": "test", | |
| "version": "1.0.0", | |
| "description": "Test", | |
| "vlans": [ | |
| {"id": 5000, "name": "Invalid"} # VLAN ID out of range | |
| ], | |
| "devices": [], | |
| "subnets": [], | |
| "services": [] | |
| } | |
| errors = get_validation_errors(model) | |
| if errors: | |
| print(f"β Correctly caught validation error:") | |
| for err in errors: | |
| print(f" - {err}") | |
| else: | |
| print("β Should have failed validation") | |
| def test_invalid_subnet(): | |
| """Test subnet validation""" | |
| print("\n=== Test 3: Invalid Subnet ===") | |
| model = { | |
| "name": "test", | |
| "version": "1.0.0", | |
| "description": "Test", | |
| "vlans": [{"id": 10, "name": "Test"}], | |
| "subnets": [ | |
| { | |
| "network": "10.0.10.0/24", | |
| "gateway": "192.168.1.1", # Gateway not in subnet | |
| "vlan": 10 | |
| } | |
| ], | |
| "devices": [], | |
| "services": [] | |
| } | |
| errors = get_validation_errors(model) | |
| if errors: | |
| print(f"β Correctly caught validation error:") | |
| for err in errors: | |
| print(f" - {err}") | |
| else: | |
| print("β Should have failed validation") | |
| def test_duplicate_vlan_ids(): | |
| """Test duplicate VLAN detection""" | |
| print("\n=== Test 4: Duplicate VLAN IDs ===") | |
| model = { | |
| "name": "test", | |
| "version": "1.0.0", | |
| "description": "Test", | |
| "vlans": [ | |
| {"id": 10, "name": "VLAN10"}, | |
| {"id": 10, "name": "AlsoVLAN10"} # Duplicate! | |
| ], | |
| "devices": [], | |
| "subnets": [], | |
| "services": [] | |
| } | |
| errors = get_validation_errors(model) | |
| if errors: | |
| print(f"β Correctly caught duplicate VLAN IDs:") | |
| for err in errors: | |
| print(f" - {err}") | |
| else: | |
| print("β Should have failed validation") | |
| def test_policy_engine(): | |
| """Test policy engine checks""" | |
| print("\n=== Test 5: Policy Engine ===") | |
| model = { | |
| "name": "test-network", | |
| "version": "1.0.0", | |
| "description": "Test network", | |
| "vlans": [ | |
| {"id": 1, "name": "Default"}, # VLAN 1 - should warn | |
| {"id": 10, "name": "Mgmt"}, | |
| {"id": 20, "name": "Guest WiFi"} # Space in name - should warn | |
| ], | |
| "subnets": [ | |
| {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10}, | |
| {"network": "10.0.20.0/24", "gateway": "10.0.20.1", "vlan": 20} | |
| ], | |
| "devices": [ | |
| { | |
| "name": "switch1", # No role in name, no 2-digit suffix | |
| "role": "core", | |
| "model": "Test", | |
| "vendor": "cisco", | |
| "mgmt_ip": "10.0.10.10", | |
| "location": "Office" | |
| } | |
| ], | |
| "services": ["DHCP"] # Missing DNS, NTP | |
| } | |
| policy = NetworkPolicy() | |
| violations = policy.check_network_model(model) | |
| print(f"Found {len(violations)} policy violations:") | |
| print(policy.format_violations()) | |
| by_severity = policy.get_violations_by_severity() | |
| print(f"\nβ Policy check complete:") | |
| print(f" - Errors: {len(by_severity['ERROR'])}") | |
| print(f" - Warnings: {len(by_severity['WARNING'])}") | |
| print(f" - Info: {len(by_severity['INFO'])}") | |
| def test_overlapping_subnets(): | |
| """Test overlapping subnet detection""" | |
| print("\n=== Test 6: Overlapping Subnets ===") | |
| model = { | |
| "name": "test", | |
| "version": "1.0.0", | |
| "description": "Test", | |
| "vlans": [ | |
| {"id": 10, "name": "Network1"}, | |
| {"id": 20, "name": "Network2"} | |
| ], | |
| "subnets": [ | |
| {"network": "10.0.0.0/16", "gateway": "10.0.0.1", "vlan": 10}, | |
| {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 20} # Overlaps! | |
| ], | |
| "devices": [], | |
| "services": [] | |
| } | |
| policy = NetworkPolicy() | |
| violations = policy.check_network_model(model) | |
| overlap_errors = [v for v in violations if 'overlapping' in v.message.lower()] | |
| if overlap_errors: | |
| print(f"β Correctly detected overlapping subnets:") | |
| for err in overlap_errors: | |
| print(f" - {err}") | |
| else: | |
| print("β Should have detected overlapping subnets") | |
| def test_complete_validation(): | |
| """Test complete validation flow""" | |
| print("\n=== Test 7: Complete Validation Flow ===") | |
| model = { | |
| "name": "production-network", | |
| "version": "1.0.0", | |
| "description": "Production corporate network", | |
| "business_requirements": ["HA", "Secure guest access", "QoS for VoIP"], | |
| "constraints": ["Cisco only", "Budget $25K"], | |
| "vlans": [ | |
| {"id": 10, "name": "Management", "purpose": "Network management"}, | |
| {"id": 20, "name": "Users", "purpose": "Employee workstations"}, | |
| {"id": 30, "name": "Guest", "purpose": "Guest WiFi"}, | |
| {"id": 40, "name": "Voice", "purpose": "VoIP phones"} | |
| ], | |
| "subnets": [ | |
| {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10, "purpose": "Management"}, | |
| {"network": "10.0.20.0/22", "gateway": "10.0.20.1", "vlan": 20, "purpose": "Users"}, | |
| {"network": "10.0.30.0/24", "gateway": "10.0.30.1", "vlan": 30, "purpose": "Guest"}, | |
| {"network": "10.0.40.0/24", "gateway": "10.0.40.1", "vlan": 40, "purpose": "Voice"} | |
| ], | |
| "devices": [ | |
| { | |
| "name": "core-sw-01", | |
| "role": "core", | |
| "model": "Catalyst 9300", | |
| "vendor": "cisco", | |
| "mgmt_ip": "10.0.10.10", | |
| "location": "Main IDF" | |
| }, | |
| { | |
| "name": "core-sw-02", | |
| "role": "core", | |
| "model": "Catalyst 9300", | |
| "vendor": "cisco", | |
| "mgmt_ip": "10.0.10.11", | |
| "location": "Main IDF" | |
| }, | |
| { | |
| "name": "access-sw-01", | |
| "role": "access", | |
| "model": "Catalyst 2960X", | |
| "vendor": "cisco", | |
| "mgmt_ip": "10.0.10.20", | |
| "location": "Floor 1" | |
| } | |
| ], | |
| "routing": { | |
| "protocol": "ospf", | |
| "process_id": 1, | |
| "router_id": "10.0.0.1", | |
| "areas": ["0"] | |
| }, | |
| "services": ["DHCP", "DNS", "NTP", "Syslog"] | |
| } | |
| # Schema validation | |
| errors = get_validation_errors(model) | |
| if errors: | |
| print(f"β Schema validation failed:") | |
| for err in errors: | |
| print(f" - {err}") | |
| return | |
| print("β Schema validation passed") | |
| validated = validate_network_model(model) | |
| print(f" - {len(validated.vlans)} VLANs") | |
| print(f" - {len(validated.subnets)} subnets") | |
| print(f" - {len(validated.devices)} devices") | |
| # Policy validation | |
| policy = NetworkPolicy() | |
| violations = policy.check_network_model(model) | |
| print(f"\nβ Policy validation complete:") | |
| by_severity = policy.get_violations_by_severity() | |
| print(f" - Errors: {len(by_severity['ERROR'])}") | |
| print(f" - Warnings: {len(by_severity['WARNING'])}") | |
| print(f" - Info: {len(by_severity['INFO'])}") | |
| if policy.has_errors(): | |
| print("\nβ Cannot proceed - fix errors first") | |
| else: | |
| print("\nβ Ready for deployment") | |
| if __name__ == "__main__": | |
| print("\n" + "="*60) | |
| print("Schema Validation & Policy Engine Test Suite") | |
| print("="*60) | |
| try: | |
| test_valid_network() | |
| test_invalid_vlan() | |
| test_invalid_subnet() | |
| test_duplicate_vlan_ids() | |
| test_policy_engine() | |
| test_overlapping_subnets() | |
| test_complete_validation() | |
| print("\n" + "="*60) | |
| print("β All validation tests completed!") | |
| print("="*60 + "\n") | |
| except Exception as e: | |
| print(f"\nβ Test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| exit(1) | |