#!/usr/bin/env python3 """ Test schema validation and policy engine """ import logging from agent.schema_validation import ( validate_network_model, get_validation_errors, VLANModel, SubnetModel, DeviceModel ) from agent.policy_engine import NetworkPolicy logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def test_valid_network(): """Test validation with a valid network model""" print("\n=== Test 1: Valid Network Model ===") model = { "name": "test-network", "version": "1.0.0", "description": "Test network", "business_requirements": ["High availability"], "constraints": ["Budget friendly"], "vlans": [ {"id": 10, "name": "Management", "purpose": "Network mgmt", "subnet": "10.0.10.0/24"}, {"id": 20, "name": "Users", "purpose": "Employee workstations", "subnet": "10.0.20.0/24"} ], "subnets": [ { "network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10, "purpose": "Management", "dhcp_enabled": True, "dhcp_range_start": "10.0.10.100", "dhcp_range_end": "10.0.10.200" }, { "network": "10.0.20.0/24", "gateway": "10.0.20.1", "vlan": 20, "purpose": "Users" } ], "devices": [ { "name": "core-sw-01", "role": "core", "model": "Catalyst 9300", "vendor": "cisco", "mgmt_ip": "10.0.10.10", "location": "Main IDF", "interfaces": [] } ], "services": ["DHCP", "DNS", "NTP"] } errors = get_validation_errors(model) if errors: print(f"✗ Validation failed:") for err in errors: print(f" - {err}") else: validated = validate_network_model(model) print(f"✓ Network model validated successfully") print(f" - Name: {validated.name}") print(f" - VLANs: {len(validated.vlans)}") print(f" - Subnets: {len(validated.subnets)}") print(f" - Devices: {len(validated.devices)}") def test_invalid_vlan(): """Test VLAN validation""" print("\n=== Test 2: Invalid VLAN ID ===") model = { "name": "test", "version": "1.0.0", "description": "Test", "vlans": [ {"id": 5000, "name": "Invalid"} # VLAN ID out of range ], "devices": [], "subnets": [], "services": [] } errors = get_validation_errors(model) if errors: print(f"✓ Correctly caught validation error:") for err in errors: print(f" - {err}") else: print("✗ Should have failed validation") def test_invalid_subnet(): """Test subnet validation""" print("\n=== Test 3: Invalid Subnet ===") model = { "name": "test", "version": "1.0.0", "description": "Test", "vlans": [{"id": 10, "name": "Test"}], "subnets": [ { "network": "10.0.10.0/24", "gateway": "192.168.1.1", # Gateway not in subnet "vlan": 10 } ], "devices": [], "services": [] } errors = get_validation_errors(model) if errors: print(f"✓ Correctly caught validation error:") for err in errors: print(f" - {err}") else: print("✗ Should have failed validation") def test_duplicate_vlan_ids(): """Test duplicate VLAN detection""" print("\n=== Test 4: Duplicate VLAN IDs ===") model = { "name": "test", "version": "1.0.0", "description": "Test", "vlans": [ {"id": 10, "name": "VLAN10"}, {"id": 10, "name": "AlsoVLAN10"} # Duplicate! ], "devices": [], "subnets": [], "services": [] } errors = get_validation_errors(model) if errors: print(f"✓ Correctly caught duplicate VLAN IDs:") for err in errors: print(f" - {err}") else: print("✗ Should have failed validation") def test_policy_engine(): """Test policy engine checks""" print("\n=== Test 5: Policy Engine ===") model = { "name": "test-network", "version": "1.0.0", "description": "Test network", "vlans": [ {"id": 1, "name": "Default"}, # VLAN 1 - should warn {"id": 10, "name": "Mgmt"}, {"id": 20, "name": "Guest WiFi"} # Space in name - should warn ], "subnets": [ {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10}, {"network": "10.0.20.0/24", "gateway": "10.0.20.1", "vlan": 20} ], "devices": [ { "name": "switch1", # No role in name, no 2-digit suffix "role": "core", "model": "Test", "vendor": "cisco", "mgmt_ip": "10.0.10.10", "location": "Office" } ], "services": ["DHCP"] # Missing DNS, NTP } policy = NetworkPolicy() violations = policy.check_network_model(model) print(f"Found {len(violations)} policy violations:") print(policy.format_violations()) by_severity = policy.get_violations_by_severity() print(f"\n✓ Policy check complete:") print(f" - Errors: {len(by_severity['ERROR'])}") print(f" - Warnings: {len(by_severity['WARNING'])}") print(f" - Info: {len(by_severity['INFO'])}") def test_overlapping_subnets(): """Test overlapping subnet detection""" print("\n=== Test 6: Overlapping Subnets ===") model = { "name": "test", "version": "1.0.0", "description": "Test", "vlans": [ {"id": 10, "name": "Network1"}, {"id": 20, "name": "Network2"} ], "subnets": [ {"network": "10.0.0.0/16", "gateway": "10.0.0.1", "vlan": 10}, {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 20} # Overlaps! ], "devices": [], "services": [] } policy = NetworkPolicy() violations = policy.check_network_model(model) overlap_errors = [v for v in violations if 'overlapping' in v.message.lower()] if overlap_errors: print(f"✓ Correctly detected overlapping subnets:") for err in overlap_errors: print(f" - {err}") else: print("✗ Should have detected overlapping subnets") def test_complete_validation(): """Test complete validation flow""" print("\n=== Test 7: Complete Validation Flow ===") model = { "name": "production-network", "version": "1.0.0", "description": "Production corporate network", "business_requirements": ["HA", "Secure guest access", "QoS for VoIP"], "constraints": ["Cisco only", "Budget $25K"], "vlans": [ {"id": 10, "name": "Management", "purpose": "Network management"}, {"id": 20, "name": "Users", "purpose": "Employee workstations"}, {"id": 30, "name": "Guest", "purpose": "Guest WiFi"}, {"id": 40, "name": "Voice", "purpose": "VoIP phones"} ], "subnets": [ {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10, "purpose": "Management"}, {"network": "10.0.20.0/22", "gateway": "10.0.20.1", "vlan": 20, "purpose": "Users"}, {"network": "10.0.30.0/24", "gateway": "10.0.30.1", "vlan": 30, "purpose": "Guest"}, {"network": "10.0.40.0/24", "gateway": "10.0.40.1", "vlan": 40, "purpose": "Voice"} ], "devices": [ { "name": "core-sw-01", "role": "core", "model": "Catalyst 9300", "vendor": "cisco", "mgmt_ip": "10.0.10.10", "location": "Main IDF" }, { "name": "core-sw-02", "role": "core", "model": "Catalyst 9300", "vendor": "cisco", "mgmt_ip": "10.0.10.11", "location": "Main IDF" }, { "name": "access-sw-01", "role": "access", "model": "Catalyst 2960X", "vendor": "cisco", "mgmt_ip": "10.0.10.20", "location": "Floor 1" } ], "routing": { "protocol": "ospf", "process_id": 1, "router_id": "10.0.0.1", "areas": ["0"] }, "services": ["DHCP", "DNS", "NTP", "Syslog"] } # Schema validation errors = get_validation_errors(model) if errors: print(f"✗ Schema validation failed:") for err in errors: print(f" - {err}") return print("✓ Schema validation passed") validated = validate_network_model(model) print(f" - {len(validated.vlans)} VLANs") print(f" - {len(validated.subnets)} subnets") print(f" - {len(validated.devices)} devices") # Policy validation policy = NetworkPolicy() violations = policy.check_network_model(model) print(f"\n✓ Policy validation complete:") by_severity = policy.get_violations_by_severity() print(f" - Errors: {len(by_severity['ERROR'])}") print(f" - Warnings: {len(by_severity['WARNING'])}") print(f" - Info: {len(by_severity['INFO'])}") if policy.has_errors(): print("\n✗ Cannot proceed - fix errors first") else: print("\n✓ Ready for deployment") if __name__ == "__main__": print("\n" + "="*60) print("Schema Validation & Policy Engine Test Suite") print("="*60) try: test_valid_network() test_invalid_vlan() test_invalid_subnet() test_duplicate_vlan_ids() test_policy_engine() test_overlapping_subnets() test_complete_validation() print("\n" + "="*60) print("✓ All validation tests completed!") print("="*60 + "\n") except Exception as e: print(f"\n✗ Test failed: {e}") import traceback traceback.print_exc() exit(1)