overgrowth / test_validation.py
Graham Paasch
feat: Add Stage 0 pre-flight validation (Todo #2)
a2079ba
#!/usr/bin/env python3
"""
Test schema validation and policy engine
"""
import logging
from agent.schema_validation import (
validate_network_model,
get_validation_errors,
VLANModel,
SubnetModel,
DeviceModel
)
from agent.policy_engine import NetworkPolicy
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def test_valid_network():
"""Test validation with a valid network model"""
print("\n=== Test 1: Valid Network Model ===")
model = {
"name": "test-network",
"version": "1.0.0",
"description": "Test network",
"business_requirements": ["High availability"],
"constraints": ["Budget friendly"],
"vlans": [
{"id": 10, "name": "Management", "purpose": "Network mgmt", "subnet": "10.0.10.0/24"},
{"id": 20, "name": "Users", "purpose": "Employee workstations", "subnet": "10.0.20.0/24"}
],
"subnets": [
{
"network": "10.0.10.0/24",
"gateway": "10.0.10.1",
"vlan": 10,
"purpose": "Management",
"dhcp_enabled": True,
"dhcp_range_start": "10.0.10.100",
"dhcp_range_end": "10.0.10.200"
},
{
"network": "10.0.20.0/24",
"gateway": "10.0.20.1",
"vlan": 20,
"purpose": "Users"
}
],
"devices": [
{
"name": "core-sw-01",
"role": "core",
"model": "Catalyst 9300",
"vendor": "cisco",
"mgmt_ip": "10.0.10.10",
"location": "Main IDF",
"interfaces": []
}
],
"services": ["DHCP", "DNS", "NTP"]
}
errors = get_validation_errors(model)
if errors:
print(f"βœ— Validation failed:")
for err in errors:
print(f" - {err}")
else:
validated = validate_network_model(model)
print(f"βœ“ Network model validated successfully")
print(f" - Name: {validated.name}")
print(f" - VLANs: {len(validated.vlans)}")
print(f" - Subnets: {len(validated.subnets)}")
print(f" - Devices: {len(validated.devices)}")
def test_invalid_vlan():
"""Test VLAN validation"""
print("\n=== Test 2: Invalid VLAN ID ===")
model = {
"name": "test",
"version": "1.0.0",
"description": "Test",
"vlans": [
{"id": 5000, "name": "Invalid"} # VLAN ID out of range
],
"devices": [],
"subnets": [],
"services": []
}
errors = get_validation_errors(model)
if errors:
print(f"βœ“ Correctly caught validation error:")
for err in errors:
print(f" - {err}")
else:
print("βœ— Should have failed validation")
def test_invalid_subnet():
"""Test subnet validation"""
print("\n=== Test 3: Invalid Subnet ===")
model = {
"name": "test",
"version": "1.0.0",
"description": "Test",
"vlans": [{"id": 10, "name": "Test"}],
"subnets": [
{
"network": "10.0.10.0/24",
"gateway": "192.168.1.1", # Gateway not in subnet
"vlan": 10
}
],
"devices": [],
"services": []
}
errors = get_validation_errors(model)
if errors:
print(f"βœ“ Correctly caught validation error:")
for err in errors:
print(f" - {err}")
else:
print("βœ— Should have failed validation")
def test_duplicate_vlan_ids():
"""Test duplicate VLAN detection"""
print("\n=== Test 4: Duplicate VLAN IDs ===")
model = {
"name": "test",
"version": "1.0.0",
"description": "Test",
"vlans": [
{"id": 10, "name": "VLAN10"},
{"id": 10, "name": "AlsoVLAN10"} # Duplicate!
],
"devices": [],
"subnets": [],
"services": []
}
errors = get_validation_errors(model)
if errors:
print(f"βœ“ Correctly caught duplicate VLAN IDs:")
for err in errors:
print(f" - {err}")
else:
print("βœ— Should have failed validation")
def test_policy_engine():
"""Test policy engine checks"""
print("\n=== Test 5: Policy Engine ===")
model = {
"name": "test-network",
"version": "1.0.0",
"description": "Test network",
"vlans": [
{"id": 1, "name": "Default"}, # VLAN 1 - should warn
{"id": 10, "name": "Mgmt"},
{"id": 20, "name": "Guest WiFi"} # Space in name - should warn
],
"subnets": [
{"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10},
{"network": "10.0.20.0/24", "gateway": "10.0.20.1", "vlan": 20}
],
"devices": [
{
"name": "switch1", # No role in name, no 2-digit suffix
"role": "core",
"model": "Test",
"vendor": "cisco",
"mgmt_ip": "10.0.10.10",
"location": "Office"
}
],
"services": ["DHCP"] # Missing DNS, NTP
}
policy = NetworkPolicy()
violations = policy.check_network_model(model)
print(f"Found {len(violations)} policy violations:")
print(policy.format_violations())
by_severity = policy.get_violations_by_severity()
print(f"\nβœ“ Policy check complete:")
print(f" - Errors: {len(by_severity['ERROR'])}")
print(f" - Warnings: {len(by_severity['WARNING'])}")
print(f" - Info: {len(by_severity['INFO'])}")
def test_overlapping_subnets():
"""Test overlapping subnet detection"""
print("\n=== Test 6: Overlapping Subnets ===")
model = {
"name": "test",
"version": "1.0.0",
"description": "Test",
"vlans": [
{"id": 10, "name": "Network1"},
{"id": 20, "name": "Network2"}
],
"subnets": [
{"network": "10.0.0.0/16", "gateway": "10.0.0.1", "vlan": 10},
{"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 20} # Overlaps!
],
"devices": [],
"services": []
}
policy = NetworkPolicy()
violations = policy.check_network_model(model)
overlap_errors = [v for v in violations if 'overlapping' in v.message.lower()]
if overlap_errors:
print(f"βœ“ Correctly detected overlapping subnets:")
for err in overlap_errors:
print(f" - {err}")
else:
print("βœ— Should have detected overlapping subnets")
def test_complete_validation():
"""Test complete validation flow"""
print("\n=== Test 7: Complete Validation Flow ===")
model = {
"name": "production-network",
"version": "1.0.0",
"description": "Production corporate network",
"business_requirements": ["HA", "Secure guest access", "QoS for VoIP"],
"constraints": ["Cisco only", "Budget $25K"],
"vlans": [
{"id": 10, "name": "Management", "purpose": "Network management"},
{"id": 20, "name": "Users", "purpose": "Employee workstations"},
{"id": 30, "name": "Guest", "purpose": "Guest WiFi"},
{"id": 40, "name": "Voice", "purpose": "VoIP phones"}
],
"subnets": [
{"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10, "purpose": "Management"},
{"network": "10.0.20.0/22", "gateway": "10.0.20.1", "vlan": 20, "purpose": "Users"},
{"network": "10.0.30.0/24", "gateway": "10.0.30.1", "vlan": 30, "purpose": "Guest"},
{"network": "10.0.40.0/24", "gateway": "10.0.40.1", "vlan": 40, "purpose": "Voice"}
],
"devices": [
{
"name": "core-sw-01",
"role": "core",
"model": "Catalyst 9300",
"vendor": "cisco",
"mgmt_ip": "10.0.10.10",
"location": "Main IDF"
},
{
"name": "core-sw-02",
"role": "core",
"model": "Catalyst 9300",
"vendor": "cisco",
"mgmt_ip": "10.0.10.11",
"location": "Main IDF"
},
{
"name": "access-sw-01",
"role": "access",
"model": "Catalyst 2960X",
"vendor": "cisco",
"mgmt_ip": "10.0.10.20",
"location": "Floor 1"
}
],
"routing": {
"protocol": "ospf",
"process_id": 1,
"router_id": "10.0.0.1",
"areas": ["0"]
},
"services": ["DHCP", "DNS", "NTP", "Syslog"]
}
# Schema validation
errors = get_validation_errors(model)
if errors:
print(f"βœ— Schema validation failed:")
for err in errors:
print(f" - {err}")
return
print("βœ“ Schema validation passed")
validated = validate_network_model(model)
print(f" - {len(validated.vlans)} VLANs")
print(f" - {len(validated.subnets)} subnets")
print(f" - {len(validated.devices)} devices")
# Policy validation
policy = NetworkPolicy()
violations = policy.check_network_model(model)
print(f"\nβœ“ Policy validation complete:")
by_severity = policy.get_violations_by_severity()
print(f" - Errors: {len(by_severity['ERROR'])}")
print(f" - Warnings: {len(by_severity['WARNING'])}")
print(f" - Info: {len(by_severity['INFO'])}")
if policy.has_errors():
print("\nβœ— Cannot proceed - fix errors first")
else:
print("\nβœ“ Ready for deployment")
if __name__ == "__main__":
print("\n" + "="*60)
print("Schema Validation & Policy Engine Test Suite")
print("="*60)
try:
test_valid_network()
test_invalid_vlan()
test_invalid_subnet()
test_duplicate_vlan_ids()
test_policy_engine()
test_overlapping_subnets()
test_complete_validation()
print("\n" + "="*60)
print("βœ“ All validation tests completed!")
print("="*60 + "\n")
except Exception as e:
print(f"\nβœ— Test failed: {e}")
import traceback
traceback.print_exc()
exit(1)