Graham Paasch commited on
Commit
a2079ba
Β·
1 Parent(s): 74f2bea

feat: Add Stage 0 pre-flight validation (Todo #2)

Browse files

- Created schema_validation.py with Pydantic models for VLANs, subnets, devices, routing
- Validates VLAN IDs (1-4094), subnet CIDR notation, gateway within network, no duplicates
- Created policy_engine.py enforcing industry best practices:
* RFC1918 private addressing
* No VLAN 1 in production
* Naming conventions (hostnames, VLANs)
* Security (guest isolation, management VLAN)
* Design (redundancy, DHCP ranges, routing protocols)
- Integrated stage0_preflight() into pipeline - runs AFTER SoT generation, BEFORE deployment
- Pipeline blocks deployment if validation errors exist
- Updated UI to show pre-flight results (errors, warnings, info)
- Added pydantic>=2.0.0 to requirements
- Comprehensive test suites (test_validation.py, test_preflight.py) - all passing

Pre-flight prevents bad configs from ever touching devices. Schema catches typos/ranges,
policy enforces security/design standards. Batfish integration pending for static analysis.

Phase 2 of research recommendations - 2 of 6 todos complete.

agent/pipeline_engine.py CHANGED
@@ -180,6 +180,73 @@ class OvergrowthPipeline:
180
  logger.warning("NetBox not available - falling back to YAML files")
181
  self.use_netbox = False
182
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
  def stage1_consultation(self, user_input: str) -> NetworkIntent:
184
  """
185
  Stage 1: Capture user intent from natural language
@@ -571,6 +638,25 @@ Be specific and practical. Use RFC1918 addressing. Consider scalability and secu
571
  model = self.stage2_generate_sot(intent)
572
  results['model'] = model.to_dict()
573
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
574
  # Stage 3: Diagrams
575
  diagrams = self.stage3_generate_diagram(model)
576
  results['diagrams'] = diagrams
 
180
  logger.warning("NetBox not available - falling back to YAML files")
181
  self.use_netbox = False
182
 
183
+ def stage0_preflight(self, model: NetworkModel) -> Dict[str, Any]:
184
+ """
185
+ Stage 0: Pre-flight validation (BEFORE deployment)
186
+ Schema validation, policy checks, and eventually Batfish analysis
187
+ """
188
+ logger.info("Stage 0: Running pre-flight validation")
189
+
190
+ from agent.schema_validation import get_validation_errors, validate_network_model
191
+ from agent.policy_engine import NetworkPolicy
192
+
193
+ results = {
194
+ 'schema_valid': False,
195
+ 'policy_passed': False,
196
+ 'ready_to_deploy': False,
197
+ 'errors': [],
198
+ 'warnings': [],
199
+ 'info': []
200
+ }
201
+
202
+ # Convert NetworkModel to dict for validation
203
+ model_dict = model.to_dict()
204
+
205
+ # 1. Schema Validation (Pydantic)
206
+ logger.info("Running schema validation...")
207
+ schema_errors = get_validation_errors(model_dict)
208
+
209
+ if schema_errors:
210
+ results['errors'].extend([f"Schema: {e}" for e in schema_errors])
211
+ logger.error(f"Schema validation failed with {len(schema_errors)} errors")
212
+ else:
213
+ results['schema_valid'] = True
214
+ logger.info("βœ“ Schema validation passed")
215
+
216
+ # 2. Policy Engine
217
+ logger.info("Running policy checks...")
218
+ policy = NetworkPolicy()
219
+ violations = policy.check_network_model(model_dict)
220
+
221
+ by_severity = policy.get_violations_by_severity()
222
+ results['errors'].extend([str(v) for v in by_severity['ERROR']])
223
+ results['warnings'].extend([str(v) for v in by_severity['WARNING']])
224
+ results['info'].extend([str(v) for v in by_severity['INFO']])
225
+
226
+ if policy.has_errors():
227
+ logger.error(f"Policy validation failed with {len(by_severity['ERROR'])} errors")
228
+ else:
229
+ results['policy_passed'] = True
230
+ logger.info(f"βœ“ Policy validation passed ({len(by_severity['WARNING'])} warnings, {len(by_severity['INFO'])} info)")
231
+
232
+ # 3. Batfish Static Analysis (TODO)
233
+ # TODO: Implement Batfish integration
234
+ # - Snapshot network configs
235
+ # - Validate reachability
236
+ # - Check ACL behavior
237
+ # - Find routing loops
238
+ results['batfish_status'] = 'not_implemented'
239
+
240
+ # Overall result
241
+ results['ready_to_deploy'] = results['schema_valid'] and results['policy_passed']
242
+
243
+ if results['ready_to_deploy']:
244
+ logger.info("βœ“ Pre-flight validation PASSED - ready to deploy")
245
+ else:
246
+ logger.warning(f"βœ— Pre-flight validation FAILED - {len(results['errors'])} errors must be fixed")
247
+
248
+ return results
249
+
250
  def stage1_consultation(self, user_input: str) -> NetworkIntent:
251
  """
252
  Stage 1: Capture user intent from natural language
 
638
  model = self.stage2_generate_sot(intent)
639
  results['model'] = model.to_dict()
640
 
641
+ # Stage 0: Pre-flight Validation (runs AFTER SoT generation but BEFORE deployment)
642
+ preflight = self.stage0_preflight(model)
643
+ results['preflight'] = preflight
644
+
645
+ # Only proceed with deployment if pre-flight passed
646
+ if not preflight['ready_to_deploy']:
647
+ logger.warning("Pre-flight validation failed - stopping before deployment")
648
+ results['deployment_status'] = 'blocked'
649
+ results['deployment_reason'] = f"{len(preflight['errors'])} validation errors"
650
+
651
+ # Still generate diagrams and BOM for review
652
+ diagrams = self.stage3_generate_diagram(model)
653
+ results['diagrams'] = diagrams
654
+
655
+ bom = self.stage4_generate_bom(model)
656
+ results['bom'] = asdict(bom)
657
+
658
+ return results
659
+
660
  # Stage 3: Diagrams
661
  diagrams = self.stage3_generate_diagram(model)
662
  results['diagrams'] = diagrams
agent/policy_engine.py ADDED
@@ -0,0 +1,353 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Network Policy Engine
3
+ Enforces security, naming, and design best practices
4
+ """
5
+
6
+ from typing import List, Dict, Any, Tuple
7
+ from ipaddress import ip_network, ip_address
8
+ import re
9
+
10
+
11
+ class PolicyViolation:
12
+ """Represents a policy violation"""
13
+ def __init__(self, severity: str, category: str, message: str, location: str = ""):
14
+ self.severity = severity # ERROR, WARNING, INFO
15
+ self.category = category # security, naming, design, addressing
16
+ self.message = message
17
+ self.location = location
18
+
19
+ def __str__(self):
20
+ loc = f" [{self.location}]" if self.location else ""
21
+ return f"{self.severity} ({self.category}){loc}: {self.message}"
22
+
23
+
24
+ class NetworkPolicy:
25
+ """
26
+ Enforces network design policies
27
+ Checks against industry best practices and organizational standards
28
+ """
29
+
30
+ def __init__(self, config: Dict[str, Any] = None):
31
+ """
32
+ Initialize policy engine with configuration
33
+
34
+ Default policies:
35
+ - RFC1918 private addressing required
36
+ - VLAN 1 not allowed for production
37
+ - Management VLAN required
38
+ - Secure naming conventions
39
+ - Gateway is first usable IP in subnet
40
+ """
41
+ self.config = config or {}
42
+ self.violations: List[PolicyViolation] = []
43
+
44
+ def check_network_model(self, model: Dict[str, Any]) -> List[PolicyViolation]:
45
+ """
46
+ Run all policy checks on network model
47
+ Returns list of violations
48
+ """
49
+ self.violations = []
50
+
51
+ # Naming policies
52
+ self._check_naming_conventions(model)
53
+
54
+ # Addressing policies
55
+ self._check_addressing_policies(model)
56
+
57
+ # VLAN policies
58
+ self._check_vlan_policies(model)
59
+
60
+ # Security policies
61
+ self._check_security_policies(model)
62
+
63
+ # Design best practices
64
+ self._check_design_practices(model)
65
+
66
+ return self.violations
67
+
68
+ def _check_naming_conventions(self, model: Dict[str, Any]):
69
+ """Check naming follows standards"""
70
+ # Device naming
71
+ for device in model.get('devices', []):
72
+ name = device.get('name', '')
73
+ role = device.get('role', '')
74
+
75
+ # Should include role in name
76
+ if role and role not in name.lower():
77
+ self.violations.append(PolicyViolation(
78
+ severity="WARNING",
79
+ category="naming",
80
+ message=f"Device name should indicate role '{role}'",
81
+ location=f"device:{name}"
82
+ ))
83
+
84
+ # Should use hyphens not underscores
85
+ if '_' in name:
86
+ self.violations.append(PolicyViolation(
87
+ severity="INFO",
88
+ category="naming",
89
+ message="Consider using hyphens instead of underscores in device names",
90
+ location=f"device:{name}"
91
+ ))
92
+
93
+ # Check for sequential numbering
94
+ if not re.search(r'\d{2}$', name):
95
+ self.violations.append(PolicyViolation(
96
+ severity="INFO",
97
+ category="naming",
98
+ message="Device name should end with 2-digit number for scalability",
99
+ location=f"device:{name}"
100
+ ))
101
+
102
+ # VLAN naming
103
+ for vlan in model.get('vlans', []):
104
+ name = vlan.get('name', '')
105
+
106
+ # No spaces in VLAN names
107
+ if ' ' in name:
108
+ self.violations.append(PolicyViolation(
109
+ severity="WARNING",
110
+ category="naming",
111
+ message="VLAN names should not contain spaces",
112
+ location=f"vlan:{vlan.get('id')}"
113
+ ))
114
+
115
+ # Should be descriptive
116
+ if len(name) < 3:
117
+ self.violations.append(PolicyViolation(
118
+ severity="INFO",
119
+ category="naming",
120
+ message="VLAN name should be descriptive (3+ characters)",
121
+ location=f"vlan:{vlan.get('id')}"
122
+ ))
123
+
124
+ def _check_addressing_policies(self, model: Dict[str, Any]):
125
+ """Check IP addressing follows best practices"""
126
+
127
+ # Check for RFC1918 private addressing
128
+ require_private = self.config.get('require_rfc1918', True)
129
+
130
+ for subnet in model.get('subnets', []):
131
+ network = subnet.get('network', '')
132
+ gateway = subnet.get('gateway', '')
133
+
134
+ try:
135
+ net = ip_network(network, strict=False)
136
+
137
+ # Check if using private addressing
138
+ if require_private and not net.is_private:
139
+ self.violations.append(PolicyViolation(
140
+ severity="ERROR",
141
+ category="addressing",
142
+ message=f"Non-private address space detected: {network} (use RFC1918: 10/8, 172.16/12, 192.168/16)",
143
+ location=f"subnet:{network}"
144
+ ))
145
+
146
+ # Check gateway is first usable IP
147
+ if gateway:
148
+ gw = ip_address(gateway)
149
+ first_usable = list(net.hosts())[0] if net.num_addresses > 2 else None
150
+
151
+ if first_usable and gw != first_usable:
152
+ self.violations.append(PolicyViolation(
153
+ severity="INFO",
154
+ category="addressing",
155
+ message=f"Gateway {gateway} is not first usable IP ({first_usable})",
156
+ location=f"subnet:{network}"
157
+ ))
158
+
159
+ # Warn on wasteful subnets (e.g., /24 for 2 devices)
160
+ if net.num_addresses > 256:
161
+ self.violations.append(PolicyViolation(
162
+ severity="INFO",
163
+ category="addressing",
164
+ message=f"Large subnet ({net.num_addresses} IPs) - consider smaller subnets for better security segmentation",
165
+ location=f"subnet:{network}"
166
+ ))
167
+
168
+ except Exception as e:
169
+ self.violations.append(PolicyViolation(
170
+ severity="ERROR",
171
+ category="addressing",
172
+ message=f"Invalid subnet: {e}",
173
+ location=f"subnet:{network}"
174
+ ))
175
+
176
+ # Check for overlapping subnets
177
+ subnets_list = [s.get('network') for s in model.get('subnets', [])]
178
+ for i, subnet1 in enumerate(subnets_list):
179
+ for subnet2 in subnets_list[i+1:]:
180
+ try:
181
+ net1 = ip_network(subnet1, strict=False)
182
+ net2 = ip_network(subnet2, strict=False)
183
+ if net1.overlaps(net2):
184
+ self.violations.append(PolicyViolation(
185
+ severity="ERROR",
186
+ category="addressing",
187
+ message=f"Overlapping subnets: {subnet1} and {subnet2}",
188
+ location="subnets"
189
+ ))
190
+ except Exception:
191
+ pass
192
+
193
+ def _check_vlan_policies(self, model: Dict[str, Any]):
194
+ """Check VLAN configuration policies"""
195
+
196
+ vlan_ids = [v.get('id') for v in model.get('vlans', [])]
197
+ vlan_names = {v.get('id'): v.get('name', '') for v in model.get('vlans', [])}
198
+
199
+ # VLAN 1 should not be used
200
+ if 1 in vlan_ids:
201
+ self.violations.append(PolicyViolation(
202
+ severity="WARNING",
203
+ category="security",
204
+ message="VLAN 1 detected - default VLAN should not be used for production",
205
+ location="vlan:1"
206
+ ))
207
+
208
+ # Check for management VLAN
209
+ mgmt_vlans = [v for v in model.get('vlans', [])
210
+ if 'mgmt' in v.get('name', '').lower() or 'management' in v.get('name', '').lower()]
211
+
212
+ if not mgmt_vlans and len(model.get('devices', [])) > 0:
213
+ self.violations.append(PolicyViolation(
214
+ severity="WARNING",
215
+ category="design",
216
+ message="No dedicated management VLAN found - consider creating one for device management",
217
+ location="vlans"
218
+ ))
219
+
220
+ # Check VLAN ID ranges (common practice: 10-99 infrastructure, 100+ user)
221
+ for vlan in model.get('vlans', []):
222
+ vid = vlan.get('id')
223
+ name = vlan.get('name', '')
224
+
225
+ if vid and vid >= 1006 and vid <= 1024:
226
+ self.violations.append(PolicyViolation(
227
+ severity="WARNING",
228
+ category="design",
229
+ message=f"VLAN {vid} is in extended range reserved range - may not be supported on all devices",
230
+ location=f"vlan:{vid}"
231
+ ))
232
+
233
+ def _check_security_policies(self, model: Dict[str, Any]):
234
+ """Check security best practices"""
235
+
236
+ # Check for guest network isolation
237
+ guest_vlans = [v for v in model.get('vlans', [])
238
+ if 'guest' in v.get('name', '').lower() or 'visitor' in v.get('name', '').lower()]
239
+
240
+ if guest_vlans:
241
+ # Guest networks should be isolated (different subnet range)
242
+ guest_subnets = [s for s in model.get('subnets', [])
243
+ if any(s.get('vlan') == gv.get('id') for gv in guest_vlans)]
244
+
245
+ if not guest_subnets:
246
+ self.violations.append(PolicyViolation(
247
+ severity="WARNING",
248
+ category="security",
249
+ message="Guest VLAN exists but no dedicated subnet configured",
250
+ location="guest network"
251
+ ))
252
+
253
+ # Check for DHCP snooping (implied by DHCP configuration)
254
+ for subnet in model.get('subnets', []):
255
+ if subnet.get('dhcp_enabled'):
256
+ # Should have DHCP range defined
257
+ if not subnet.get('dhcp_range_start') or not subnet.get('dhcp_range_end'):
258
+ self.violations.append(PolicyViolation(
259
+ severity="WARNING",
260
+ category="design",
261
+ message="DHCP enabled but no pool range defined",
262
+ location=f"subnet:{subnet.get('network')}"
263
+ ))
264
+
265
+ # Warn if no redundancy in design
266
+ core_devices = [d for d in model.get('devices', []) if d.get('role') == 'core']
267
+ if len(core_devices) < 2 and len(model.get('devices', [])) > 3:
268
+ self.violations.append(PolicyViolation(
269
+ severity="WARNING",
270
+ category="design",
271
+ message="No redundant core devices - consider adding redundancy for HA",
272
+ location="devices"
273
+ ))
274
+
275
+ def _check_design_practices(self, model: Dict[str, Any]):
276
+ """Check general design best practices"""
277
+
278
+ # Every subnet should have a VLAN
279
+ vlans_with_subnets = {s.get('vlan') for s in model.get('subnets', []) if s.get('vlan')}
280
+ all_vlans = {v.get('id') for v in model.get('vlans', [])}
281
+
282
+ vlans_without_subnets = all_vlans - vlans_with_subnets
283
+ if vlans_without_subnets:
284
+ self.violations.append(PolicyViolation(
285
+ severity="INFO",
286
+ category="design",
287
+ message=f"VLANs without subnets: {sorted(vlans_without_subnets)}",
288
+ location="vlans/subnets"
289
+ ))
290
+
291
+ # Check routing protocol makes sense for network size
292
+ routing = model.get('routing')
293
+ device_count = len(model.get('devices', []))
294
+
295
+ if routing:
296
+ protocol = routing.get('protocol')
297
+
298
+ if protocol == 'ospf' and device_count < 3:
299
+ self.violations.append(PolicyViolation(
300
+ severity="INFO",
301
+ category="design",
302
+ message="OSPF may be overkill for small network (<3 devices) - consider static routing",
303
+ location="routing"
304
+ ))
305
+
306
+ if protocol == 'static' and device_count > 10:
307
+ self.violations.append(PolicyViolation(
308
+ severity="WARNING",
309
+ category="design",
310
+ message="Static routing challenging for large networks - consider dynamic protocol",
311
+ location="routing"
312
+ ))
313
+
314
+ # Check for services configuration
315
+ services = model.get('services', [])
316
+ recommended_services = {'DHCP', 'DNS', 'NTP'}
317
+ missing_services = recommended_services - set(services)
318
+
319
+ if missing_services:
320
+ self.violations.append(PolicyViolation(
321
+ severity="INFO",
322
+ category="design",
323
+ message=f"Consider adding services: {', '.join(missing_services)}",
324
+ location="services"
325
+ ))
326
+
327
+ def get_violations_by_severity(self) -> Dict[str, List[PolicyViolation]]:
328
+ """Group violations by severity"""
329
+ result = {'ERROR': [], 'WARNING': [], 'INFO': []}
330
+ for v in self.violations:
331
+ result[v.severity].append(v)
332
+ return result
333
+
334
+ def has_errors(self) -> bool:
335
+ """Check if there are any ERROR-level violations"""
336
+ return any(v.severity == 'ERROR' for v in self.violations)
337
+
338
+ def format_violations(self) -> str:
339
+ """Format violations as readable text"""
340
+ if not self.violations:
341
+ return "βœ“ No policy violations detected"
342
+
343
+ lines = []
344
+ by_severity = self.get_violations_by_severity()
345
+
346
+ for severity in ['ERROR', 'WARNING', 'INFO']:
347
+ viols = by_severity[severity]
348
+ if viols:
349
+ lines.append(f"\n{severity}S ({len(viols)}):")
350
+ for v in viols:
351
+ lines.append(f" β€’ {v}")
352
+
353
+ return "\n".join(lines)
agent/schema_validation.py ADDED
@@ -0,0 +1,319 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Schema Validation for Network Models
3
+ Uses Pydantic for type checking and data validation
4
+ """
5
+
6
+ from typing import List, Optional, Dict, Any, Literal
7
+ from pydantic import BaseModel, Field, validator, IPvAnyAddress, IPvAnyNetwork
8
+ from ipaddress import ip_network, ip_address
9
+ import re
10
+
11
+
12
+ class VLANModel(BaseModel):
13
+ """VLAN configuration schema"""
14
+ id: int = Field(..., ge=1, le=4094, description="VLAN ID (1-4094)")
15
+ name: str = Field(..., min_length=1, max_length=32, description="VLAN name")
16
+ purpose: Optional[str] = Field(None, description="VLAN purpose/description")
17
+ subnet: Optional[str] = Field(None, description="Associated subnet in CIDR notation")
18
+
19
+ @validator('name')
20
+ def validate_vlan_name(cls, v):
21
+ """Ensure VLAN name follows naming conventions"""
22
+ # No spaces, special characters
23
+ if not re.match(r'^[a-zA-Z0-9_-]+$', v):
24
+ raise ValueError("VLAN name must contain only letters, numbers, underscore, or hyphen")
25
+ return v
26
+
27
+ @validator('subnet')
28
+ def validate_subnet_format(cls, v):
29
+ """Validate CIDR notation"""
30
+ if v:
31
+ try:
32
+ ip_network(v, strict=False)
33
+ except ValueError as e:
34
+ raise ValueError(f"Invalid subnet format: {e}")
35
+ return v
36
+
37
+ class Config:
38
+ schema_extra = {
39
+ "example": {
40
+ "id": 10,
41
+ "name": "Management",
42
+ "purpose": "Network management and monitoring",
43
+ "subnet": "10.0.10.0/24"
44
+ }
45
+ }
46
+
47
+
48
+ class SubnetModel(BaseModel):
49
+ """IP Subnet schema"""
50
+ network: str = Field(..., description="Network address in CIDR notation")
51
+ gateway: str = Field(..., description="Default gateway IP address")
52
+ vlan: Optional[int] = Field(None, ge=1, le=4094, description="Associated VLAN ID")
53
+ purpose: Optional[str] = Field(None, description="Subnet purpose")
54
+ dhcp_enabled: bool = Field(False, description="DHCP server enabled")
55
+ dhcp_range_start: Optional[str] = Field(None, description="DHCP pool start IP")
56
+ dhcp_range_end: Optional[str] = Field(None, description="DHCP pool end IP")
57
+
58
+ @validator('network')
59
+ def validate_network(cls, v):
60
+ """Ensure valid CIDR notation"""
61
+ try:
62
+ net = ip_network(v, strict=False)
63
+ # Ensure it's a private network (RFC1918) unless explicitly public
64
+ # if not net.is_private:
65
+ # raise ValueError("Use RFC1918 private addressing (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16)")
66
+ return str(net)
67
+ except ValueError as e:
68
+ raise ValueError(f"Invalid network: {e}")
69
+
70
+ @validator('gateway')
71
+ def validate_gateway(cls, v, values):
72
+ """Ensure gateway is within the network"""
73
+ try:
74
+ gw = ip_address(v)
75
+ if 'network' in values:
76
+ net = ip_network(values['network'], strict=False)
77
+ if gw not in net:
78
+ raise ValueError(f"Gateway {v} not in network {values['network']}")
79
+ except ValueError as e:
80
+ raise ValueError(f"Invalid gateway: {e}")
81
+ return v
82
+
83
+ @validator('dhcp_range_end')
84
+ def validate_dhcp_range(cls, v, values):
85
+ """Ensure DHCP range is valid"""
86
+ if v and 'dhcp_range_start' in values and values['dhcp_range_start']:
87
+ start = ip_address(values['dhcp_range_start'])
88
+ end = ip_address(v)
89
+ if end <= start:
90
+ raise ValueError("DHCP range end must be greater than start")
91
+
92
+ # Ensure both are in the network
93
+ if 'network' in values:
94
+ net = ip_network(values['network'], strict=False)
95
+ if start not in net or end not in net:
96
+ raise ValueError("DHCP range must be within subnet")
97
+ return v
98
+
99
+ class Config:
100
+ schema_extra = {
101
+ "example": {
102
+ "network": "10.0.10.0/24",
103
+ "gateway": "10.0.10.1",
104
+ "vlan": 10,
105
+ "purpose": "Management network",
106
+ "dhcp_enabled": True,
107
+ "dhcp_range_start": "10.0.10.100",
108
+ "dhcp_range_end": "10.0.10.200"
109
+ }
110
+ }
111
+
112
+
113
+ class InterfaceModel(BaseModel):
114
+ """Network interface schema"""
115
+ name: str = Field(..., description="Interface name (e.g., GigabitEthernet1/0/1)")
116
+ type: Literal["ethernet", "management", "loopback", "vlan", "port-channel"] = Field(..., description="Interface type")
117
+ speed: Optional[str] = Field(None, description="Interface speed (e.g., 1000, 10G)")
118
+ mode: Optional[Literal["access", "trunk"]] = Field(None, description="Switchport mode")
119
+ vlan: Optional[int] = Field(None, description="Access VLAN or native VLAN for trunk")
120
+ allowed_vlans: Optional[List[int]] = Field(None, description="Allowed VLANs for trunk ports")
121
+ ip_address: Optional[str] = Field(None, description="IP address if L3 interface")
122
+ description: Optional[str] = Field(None, max_length=240, description="Interface description")
123
+ enabled: bool = Field(True, description="Interface administratively up")
124
+
125
+ @validator('allowed_vlans')
126
+ def validate_allowed_vlans(cls, v):
127
+ """Ensure VLAN IDs are valid"""
128
+ if v:
129
+ for vlan_id in v:
130
+ if vlan_id < 1 or vlan_id > 4094:
131
+ raise ValueError(f"Invalid VLAN ID {vlan_id} (must be 1-4094)")
132
+ return v
133
+
134
+ class Config:
135
+ schema_extra = {
136
+ "example": {
137
+ "name": "GigabitEthernet1/0/1",
138
+ "type": "ethernet",
139
+ "speed": "1000",
140
+ "mode": "trunk",
141
+ "vlan": 1,
142
+ "allowed_vlans": [10, 20, 30],
143
+ "description": "Uplink to core switch",
144
+ "enabled": True
145
+ }
146
+ }
147
+
148
+
149
+ class DeviceModel(BaseModel):
150
+ """Network device schema"""
151
+ name: str = Field(..., min_length=1, max_length=64, description="Device hostname")
152
+ role: Literal["core", "distribution", "access", "edge", "firewall", "router", "wireless"] = Field(..., description="Device role in network")
153
+ model: str = Field(..., description="Hardware model")
154
+ vendor: Literal["cisco", "juniper", "arista", "hp", "dell", "ubiquiti", "mikrotik", "other"] = Field(..., description="Vendor")
155
+ mgmt_ip: str = Field(..., description="Management IP address")
156
+ location: str = Field(..., description="Physical location")
157
+ interfaces: List[InterfaceModel] = Field(default_factory=list, description="Network interfaces")
158
+ os_version: Optional[str] = Field(None, description="OS version")
159
+ serial_number: Optional[str] = Field(None, description="Device serial number")
160
+
161
+ @validator('name')
162
+ def validate_hostname(cls, v):
163
+ """Ensure hostname follows RFC1123"""
164
+ if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$', v):
165
+ raise ValueError("Invalid hostname format (RFC1123)")
166
+ return v
167
+
168
+ @validator('mgmt_ip')
169
+ def validate_mgmt_ip(cls, v):
170
+ """Ensure valid IP address"""
171
+ try:
172
+ ip_address(v)
173
+ except ValueError as e:
174
+ raise ValueError(f"Invalid management IP: {e}")
175
+ return v
176
+
177
+ class Config:
178
+ schema_extra = {
179
+ "example": {
180
+ "name": "core-sw-01",
181
+ "role": "core",
182
+ "model": "Catalyst 9300",
183
+ "vendor": "cisco",
184
+ "mgmt_ip": "10.0.10.10",
185
+ "location": "Main Office - IDF1",
186
+ "os_version": "17.9.4",
187
+ "interfaces": []
188
+ }
189
+ }
190
+
191
+
192
+ class RoutingModel(BaseModel):
193
+ """Routing protocol configuration"""
194
+ protocol: Literal["static", "ospf", "bgp", "eigrp", "rip", "is-is"] = Field(..., description="Routing protocol")
195
+ autonomous_system: Optional[int] = Field(None, ge=1, le=4294967295, description="AS number for BGP/EIGRP")
196
+ process_id: Optional[int] = Field(None, ge=1, le=65535, description="Process ID for OSPF/EIGRP")
197
+ router_id: Optional[str] = Field(None, description="Router ID")
198
+ areas: Optional[List[str]] = Field(None, description="OSPF areas or IS-IS levels")
199
+ networks: Optional[List[str]] = Field(None, description="Networks to advertise")
200
+ neighbors: Optional[List[str]] = Field(None, description="BGP neighbors or static routes")
201
+
202
+ @validator('router_id')
203
+ def validate_router_id(cls, v):
204
+ """Ensure router ID is valid IP format"""
205
+ if v:
206
+ try:
207
+ ip_address(v)
208
+ except ValueError as e:
209
+ raise ValueError(f"Invalid router ID format: {e}")
210
+ return v
211
+
212
+ class Config:
213
+ schema_extra = {
214
+ "example": {
215
+ "protocol": "ospf",
216
+ "process_id": 1,
217
+ "router_id": "10.0.0.1",
218
+ "areas": ["0"],
219
+ "networks": ["10.0.0.0/8"]
220
+ }
221
+ }
222
+
223
+
224
+ class NetworkModelSchema(BaseModel):
225
+ """Complete network data model with validation"""
226
+ name: str = Field(..., min_length=1, max_length=64, description="Network name")
227
+ version: str = Field(..., pattern=r'^\d+\.\d+\.\d+$', description="Schema version (semantic versioning)")
228
+ description: Optional[str] = Field(None, description="Network description")
229
+ business_requirements: Optional[List[str]] = Field(default_factory=list, description="Business requirements")
230
+ constraints: Optional[List[str]] = Field(default_factory=list, description="Design constraints")
231
+ intent: Optional[Dict[str, Any]] = Field(None, description="Original network intent")
232
+
233
+ devices: List[DeviceModel] = Field(default_factory=list, description="Network devices")
234
+ vlans: List[VLANModel] = Field(default_factory=list, description="VLANs")
235
+ subnets: List[SubnetModel] = Field(default_factory=list, description="IP subnets")
236
+ routing: Optional[Dict[str, Any]] = Field(None, description="Routing configuration")
237
+ services: List[str] = Field(default_factory=lambda: ["DHCP", "DNS", "NTP"], description="Network services")
238
+
239
+ @validator('vlans')
240
+ def check_unique_vlan_ids(cls, v):
241
+ """Ensure no duplicate VLAN IDs"""
242
+ vlan_ids = [vlan.id for vlan in v]
243
+ if len(vlan_ids) != len(set(vlan_ids)):
244
+ duplicates = [vid for vid in vlan_ids if vlan_ids.count(vid) > 1]
245
+ raise ValueError(f"Duplicate VLAN IDs found: {duplicates}")
246
+ return v
247
+
248
+ @validator('devices')
249
+ def check_unique_device_names(cls, v):
250
+ """Ensure no duplicate device names"""
251
+ device_names = [d.name for d in v]
252
+ if len(device_names) != len(set(device_names)):
253
+ duplicates = [name for name in device_names if device_names.count(name) > 1]
254
+ raise ValueError(f"Duplicate device names found: {duplicates}")
255
+ return v
256
+
257
+ @validator('devices')
258
+ def check_unique_mgmt_ips(cls, v):
259
+ """Ensure no duplicate management IPs"""
260
+ mgmt_ips = [d.mgmt_ip for d in v]
261
+ if len(mgmt_ips) != len(set(mgmt_ips)):
262
+ duplicates = [ip for ip in mgmt_ips if mgmt_ips.count(ip) > 1]
263
+ raise ValueError(f"Duplicate management IPs found: {duplicates}")
264
+ return v
265
+
266
+ @validator('subnets')
267
+ def check_subnet_vlan_references(cls, v, values):
268
+ """Ensure referenced VLANs exist"""
269
+ if 'vlans' in values:
270
+ valid_vlan_ids = {vlan.id for vlan in values['vlans']}
271
+ for subnet in v:
272
+ if subnet.vlan and subnet.vlan not in valid_vlan_ids:
273
+ raise ValueError(f"Subnet {subnet.network} references non-existent VLAN {subnet.vlan}")
274
+ return v
275
+
276
+ class Config:
277
+ schema_extra = {
278
+ "example": {
279
+ "name": "corporate-network",
280
+ "version": "1.0.0",
281
+ "description": "Corporate office network",
282
+ "business_requirements": ["High availability", "Secure guest access"],
283
+ "constraints": ["Budget under $10K", "Cisco preferred"],
284
+ "devices": [],
285
+ "vlans": [],
286
+ "subnets": [],
287
+ "routing": None,
288
+ "services": ["DHCP", "DNS", "NTP"]
289
+ }
290
+ }
291
+
292
+
293
+ def validate_network_model(data: Dict[str, Any]) -> NetworkModelSchema:
294
+ """
295
+ Validate network model data against schema
296
+ Raises ValidationError if invalid
297
+ """
298
+ return NetworkModelSchema(**data)
299
+
300
+
301
+ def get_validation_errors(data: Dict[str, Any]) -> List[str]:
302
+ """
303
+ Get human-readable validation errors
304
+ Returns empty list if valid
305
+ """
306
+ try:
307
+ validate_network_model(data)
308
+ return []
309
+ except Exception as e:
310
+ # Parse pydantic validation errors
311
+ errors = []
312
+ if hasattr(e, 'errors'):
313
+ for error in e.errors():
314
+ loc = ' -> '.join(str(l) for l in error['loc'])
315
+ msg = error['msg']
316
+ errors.append(f"{loc}: {msg}")
317
+ else:
318
+ errors.append(str(e))
319
+ return errors
app.py CHANGED
@@ -265,17 +265,53 @@ def build_ui():
265
  # Run the pipeline
266
  results = pipeline.run_full_pipeline(user_input)
267
 
 
 
 
 
268
  # Format status
269
  status = "## πŸš€ Pipeline Execution Complete!\n\n"
270
- status += "### βœ… Completed Stages:\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271
  status += "1. βœ… Consultation - Intent captured\n"
272
  status += "2. βœ… Source of Truth - Network model generated\n"
273
  status += "3. βœ… Diagrams - Visualizations created\n"
274
  status += "4. βœ… Bill of Materials - Shopping list ready\n"
275
  status += "5. βœ… Setup Guide - Deployment instructions generated\n"
276
- status += "6. ⏳ Autonomous Deploy - Ready for execution\n"
277
- status += "7. ⏳ Observability - Ready for setup\n"
278
- status += "8. ⏳ Validation - Ready for verification\n\n"
 
 
 
 
 
 
 
279
  status += "### πŸ“ Files Created:\n"
280
  status += "- `infra/network_model.yaml` - Source of Truth\n"
281
  status += "- `infra/bill_of_materials.json` - BOM data\n"
@@ -294,6 +330,8 @@ def build_ui():
294
 
295
  except Exception as e:
296
  error = f"## ❌ Pipeline Error\n\n{str(e)}"
 
 
297
  return error, "", "", "", ""
298
 
299
  # Event Handlers
 
265
  # Run the pipeline
266
  results = pipeline.run_full_pipeline(user_input)
267
 
268
+ # Check pre-flight validation
269
+ preflight = results.get('preflight', {})
270
+ ready_to_deploy = preflight.get('ready_to_deploy', False)
271
+
272
  # Format status
273
  status = "## πŸš€ Pipeline Execution Complete!\n\n"
274
+
275
+ # Pre-flight validation section
276
+ if ready_to_deploy:
277
+ status += "### βœ… Pre-flight Validation PASSED\n"
278
+ status += f"- Schema validation: βœ“ Passed\n"
279
+ status += f"- Policy checks: βœ“ Passed\n"
280
+ if preflight.get('warnings'):
281
+ status += f"- Warnings: {len(preflight['warnings'])}\n"
282
+ if preflight.get('info'):
283
+ status += f"- Info: {len(preflight['info'])}\n"
284
+ status += "\n"
285
+ else:
286
+ status += "### ❌ Pre-flight Validation FAILED\n"
287
+ status += "**Deployment blocked until errors are fixed:**\n\n"
288
+ for error in preflight.get('errors', []):
289
+ status += f"- ❌ {error}\n"
290
+ status += "\n"
291
+ if preflight.get('warnings'):
292
+ status += "**Warnings:**\n"
293
+ for warning in preflight.get('warnings', []):
294
+ status += f"- ⚠️ {warning}\n"
295
+ status += "\n"
296
+
297
+ # Completed stages
298
+ status += "### Completed Stages:\n"
299
+ status += "0. " + ("βœ…" if ready_to_deploy else "❌") + " Pre-flight Validation\n"
300
  status += "1. βœ… Consultation - Intent captured\n"
301
  status += "2. βœ… Source of Truth - Network model generated\n"
302
  status += "3. βœ… Diagrams - Visualizations created\n"
303
  status += "4. βœ… Bill of Materials - Shopping list ready\n"
304
  status += "5. βœ… Setup Guide - Deployment instructions generated\n"
305
+
306
+ if ready_to_deploy:
307
+ status += "6. ⏳ Autonomous Deploy - Ready for execution\n"
308
+ status += "7. ⏳ Observability - Ready for setup\n"
309
+ status += "8. ⏳ Validation - Ready for verification\n\n"
310
+ else:
311
+ status += "6. 🚫 Autonomous Deploy - BLOCKED (fix validation errors)\n"
312
+ status += "7. 🚫 Observability - BLOCKED\n"
313
+ status += "8. 🚫 Validation - BLOCKED\n\n"
314
+
315
  status += "### πŸ“ Files Created:\n"
316
  status += "- `infra/network_model.yaml` - Source of Truth\n"
317
  status += "- `infra/bill_of_materials.json` - BOM data\n"
 
330
 
331
  except Exception as e:
332
  error = f"## ❌ Pipeline Error\n\n{str(e)}"
333
+ import traceback
334
+ error += f"\n\n```\n{traceback.format_exc()}\n```"
335
  return error, "", "", "", ""
336
 
337
  # Event Handlers
requirements.txt CHANGED
@@ -8,3 +8,4 @@ python-dotenv
8
  anthropic>=0.40.0
9
  pyyaml
10
  pynetbox>=7.0.0
 
 
8
  anthropic>=0.40.0
9
  pyyaml
10
  pynetbox>=7.0.0
11
+ pydantic>=2.0.0
test_preflight.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Test Stage 0: Pre-flight validation integration
4
+ """
5
+
6
+ import logging
7
+ from pathlib import Path
8
+ from agent.pipeline_engine import OvergrowthPipeline, NetworkIntent
9
+
10
+ logging.basicConfig(level=logging.INFO)
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ def test_preflight_pass():
15
+ """Test pre-flight with valid network"""
16
+ print("\n=== Test 1: Pre-flight Validation - PASS ===")
17
+
18
+ pipeline = OvergrowthPipeline()
19
+
20
+ # Create a good network intent
21
+ intent = NetworkIntent(
22
+ description="Corporate office network with guest WiFi",
23
+ business_requirements=["Secure guest access", "High availability", "QoS for VoIP"],
24
+ constraints=["Budget under $15K", "Cisco preferred"],
25
+ budget="$15000",
26
+ timeline="2 weeks"
27
+ )
28
+
29
+ # Generate source of truth
30
+ model = pipeline.stage2_generate_sot(intent)
31
+
32
+ # Run pre-flight validation
33
+ results = pipeline.stage0_preflight(model)
34
+
35
+ print(f"\nValidation Results:")
36
+ print(f" Schema Valid: {results['schema_valid']}")
37
+ print(f" Policy Passed: {results['policy_passed']}")
38
+ print(f" Ready to Deploy: {results['ready_to_deploy']}")
39
+ print(f" Errors: {len(results['errors'])}")
40
+ print(f" Warnings: {len(results['warnings'])}")
41
+ print(f" Info: {len(results['info'])}")
42
+
43
+ if results['errors']:
44
+ print("\nErrors:")
45
+ for err in results['errors']:
46
+ print(f" - {err}")
47
+
48
+ if results['warnings']:
49
+ print("\nWarnings:")
50
+ for warn in results['warnings'][:5]: # Show first 5
51
+ print(f" - {warn}")
52
+
53
+ if results['ready_to_deploy']:
54
+ print("\nβœ“ Pre-flight validation PASSED - ready to deploy")
55
+ else:
56
+ print("\nβœ— Pre-flight validation FAILED")
57
+
58
+
59
+ def test_full_pipeline_with_validation():
60
+ """Test complete pipeline including pre-flight"""
61
+ print("\n=== Test 2: Full Pipeline with Validation ===")
62
+
63
+ pipeline = OvergrowthPipeline()
64
+
65
+ consultation = "I need a network for a small retail store with guest WiFi, employee WiFi, POS systems, and security cameras. Budget is $5000."
66
+
67
+ results = pipeline.run_full_pipeline(consultation)
68
+
69
+ print(f"\nPipeline Results:")
70
+ print(f" Intent captured: {'intent' in results}")
71
+ print(f" Model generated: {'model' in results}")
72
+ print(f" Pre-flight complete: {'preflight' in results}")
73
+
74
+ if 'preflight' in results:
75
+ pf = results['preflight']
76
+ print(f"\nPre-flight:")
77
+ print(f" Ready to deploy: {pf['ready_to_deploy']}")
78
+ print(f" Errors: {len(pf['errors'])}")
79
+ print(f" Warnings: {len(pf['warnings'])}")
80
+
81
+ if 'deployment_status' in results:
82
+ print(f"\nDeployment Status: {results['deployment_status']}")
83
+ if 'deployment_reason' in results:
84
+ print(f" Reason: {results['deployment_reason']}")
85
+
86
+ print(f"\nOutputs:")
87
+ print(f" Diagrams: {'diagrams' in results}")
88
+ print(f" BOM: {'bom' in results}")
89
+ print(f" Setup Guide: {'setup_guide' in results}")
90
+
91
+ if 'bom' in results:
92
+ bom = results['bom']
93
+ print(f"\nBill of Materials:")
94
+ print(f" Total Cost: ${bom.get('total_estimated_cost', 0):,.2f}")
95
+ print(f" Devices: {len(bom.get('devices', []))}")
96
+
97
+ print("\nβœ“ Full pipeline test complete")
98
+
99
+
100
+ if __name__ == "__main__":
101
+ print("\n" + "="*60)
102
+ print("Stage 0: Pre-flight Validation Test")
103
+ print("="*60)
104
+
105
+ try:
106
+ test_preflight_pass()
107
+ test_full_pipeline_with_validation()
108
+
109
+ print("\n" + "="*60)
110
+ print("βœ“ All pre-flight tests passed!")
111
+ print("="*60 + "\n")
112
+
113
+ except Exception as e:
114
+ print(f"\nβœ— Test failed: {e}")
115
+ import traceback
116
+ traceback.print_exc()
117
+ exit(1)
test_validation.py ADDED
@@ -0,0 +1,344 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Test schema validation and policy engine
4
+ """
5
+
6
+ import logging
7
+ from agent.schema_validation import (
8
+ validate_network_model,
9
+ get_validation_errors,
10
+ VLANModel,
11
+ SubnetModel,
12
+ DeviceModel
13
+ )
14
+ from agent.policy_engine import NetworkPolicy
15
+
16
+ logging.basicConfig(level=logging.INFO)
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ def test_valid_network():
21
+ """Test validation with a valid network model"""
22
+ print("\n=== Test 1: Valid Network Model ===")
23
+
24
+ model = {
25
+ "name": "test-network",
26
+ "version": "1.0.0",
27
+ "description": "Test network",
28
+ "business_requirements": ["High availability"],
29
+ "constraints": ["Budget friendly"],
30
+ "vlans": [
31
+ {"id": 10, "name": "Management", "purpose": "Network mgmt", "subnet": "10.0.10.0/24"},
32
+ {"id": 20, "name": "Users", "purpose": "Employee workstations", "subnet": "10.0.20.0/24"}
33
+ ],
34
+ "subnets": [
35
+ {
36
+ "network": "10.0.10.0/24",
37
+ "gateway": "10.0.10.1",
38
+ "vlan": 10,
39
+ "purpose": "Management",
40
+ "dhcp_enabled": True,
41
+ "dhcp_range_start": "10.0.10.100",
42
+ "dhcp_range_end": "10.0.10.200"
43
+ },
44
+ {
45
+ "network": "10.0.20.0/24",
46
+ "gateway": "10.0.20.1",
47
+ "vlan": 20,
48
+ "purpose": "Users"
49
+ }
50
+ ],
51
+ "devices": [
52
+ {
53
+ "name": "core-sw-01",
54
+ "role": "core",
55
+ "model": "Catalyst 9300",
56
+ "vendor": "cisco",
57
+ "mgmt_ip": "10.0.10.10",
58
+ "location": "Main IDF",
59
+ "interfaces": []
60
+ }
61
+ ],
62
+ "services": ["DHCP", "DNS", "NTP"]
63
+ }
64
+
65
+ errors = get_validation_errors(model)
66
+ if errors:
67
+ print(f"βœ— Validation failed:")
68
+ for err in errors:
69
+ print(f" - {err}")
70
+ else:
71
+ validated = validate_network_model(model)
72
+ print(f"βœ“ Network model validated successfully")
73
+ print(f" - Name: {validated.name}")
74
+ print(f" - VLANs: {len(validated.vlans)}")
75
+ print(f" - Subnets: {len(validated.subnets)}")
76
+ print(f" - Devices: {len(validated.devices)}")
77
+
78
+
79
+ def test_invalid_vlan():
80
+ """Test VLAN validation"""
81
+ print("\n=== Test 2: Invalid VLAN ID ===")
82
+
83
+ model = {
84
+ "name": "test",
85
+ "version": "1.0.0",
86
+ "description": "Test",
87
+ "vlans": [
88
+ {"id": 5000, "name": "Invalid"} # VLAN ID out of range
89
+ ],
90
+ "devices": [],
91
+ "subnets": [],
92
+ "services": []
93
+ }
94
+
95
+ errors = get_validation_errors(model)
96
+ if errors:
97
+ print(f"βœ“ Correctly caught validation error:")
98
+ for err in errors:
99
+ print(f" - {err}")
100
+ else:
101
+ print("βœ— Should have failed validation")
102
+
103
+
104
+ def test_invalid_subnet():
105
+ """Test subnet validation"""
106
+ print("\n=== Test 3: Invalid Subnet ===")
107
+
108
+ model = {
109
+ "name": "test",
110
+ "version": "1.0.0",
111
+ "description": "Test",
112
+ "vlans": [{"id": 10, "name": "Test"}],
113
+ "subnets": [
114
+ {
115
+ "network": "10.0.10.0/24",
116
+ "gateway": "192.168.1.1", # Gateway not in subnet
117
+ "vlan": 10
118
+ }
119
+ ],
120
+ "devices": [],
121
+ "services": []
122
+ }
123
+
124
+ errors = get_validation_errors(model)
125
+ if errors:
126
+ print(f"βœ“ Correctly caught validation error:")
127
+ for err in errors:
128
+ print(f" - {err}")
129
+ else:
130
+ print("βœ— Should have failed validation")
131
+
132
+
133
+ def test_duplicate_vlan_ids():
134
+ """Test duplicate VLAN detection"""
135
+ print("\n=== Test 4: Duplicate VLAN IDs ===")
136
+
137
+ model = {
138
+ "name": "test",
139
+ "version": "1.0.0",
140
+ "description": "Test",
141
+ "vlans": [
142
+ {"id": 10, "name": "VLAN10"},
143
+ {"id": 10, "name": "AlsoVLAN10"} # Duplicate!
144
+ ],
145
+ "devices": [],
146
+ "subnets": [],
147
+ "services": []
148
+ }
149
+
150
+ errors = get_validation_errors(model)
151
+ if errors:
152
+ print(f"βœ“ Correctly caught duplicate VLAN IDs:")
153
+ for err in errors:
154
+ print(f" - {err}")
155
+ else:
156
+ print("βœ— Should have failed validation")
157
+
158
+
159
+ def test_policy_engine():
160
+ """Test policy engine checks"""
161
+ print("\n=== Test 5: Policy Engine ===")
162
+
163
+ model = {
164
+ "name": "test-network",
165
+ "version": "1.0.0",
166
+ "description": "Test network",
167
+ "vlans": [
168
+ {"id": 1, "name": "Default"}, # VLAN 1 - should warn
169
+ {"id": 10, "name": "Mgmt"},
170
+ {"id": 20, "name": "Guest WiFi"} # Space in name - should warn
171
+ ],
172
+ "subnets": [
173
+ {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10},
174
+ {"network": "10.0.20.0/24", "gateway": "10.0.20.1", "vlan": 20}
175
+ ],
176
+ "devices": [
177
+ {
178
+ "name": "switch1", # No role in name, no 2-digit suffix
179
+ "role": "core",
180
+ "model": "Test",
181
+ "vendor": "cisco",
182
+ "mgmt_ip": "10.0.10.10",
183
+ "location": "Office"
184
+ }
185
+ ],
186
+ "services": ["DHCP"] # Missing DNS, NTP
187
+ }
188
+
189
+ policy = NetworkPolicy()
190
+ violations = policy.check_network_model(model)
191
+
192
+ print(f"Found {len(violations)} policy violations:")
193
+ print(policy.format_violations())
194
+
195
+ by_severity = policy.get_violations_by_severity()
196
+ print(f"\nβœ“ Policy check complete:")
197
+ print(f" - Errors: {len(by_severity['ERROR'])}")
198
+ print(f" - Warnings: {len(by_severity['WARNING'])}")
199
+ print(f" - Info: {len(by_severity['INFO'])}")
200
+
201
+
202
+ def test_overlapping_subnets():
203
+ """Test overlapping subnet detection"""
204
+ print("\n=== Test 6: Overlapping Subnets ===")
205
+
206
+ model = {
207
+ "name": "test",
208
+ "version": "1.0.0",
209
+ "description": "Test",
210
+ "vlans": [
211
+ {"id": 10, "name": "Network1"},
212
+ {"id": 20, "name": "Network2"}
213
+ ],
214
+ "subnets": [
215
+ {"network": "10.0.0.0/16", "gateway": "10.0.0.1", "vlan": 10},
216
+ {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 20} # Overlaps!
217
+ ],
218
+ "devices": [],
219
+ "services": []
220
+ }
221
+
222
+ policy = NetworkPolicy()
223
+ violations = policy.check_network_model(model)
224
+
225
+ overlap_errors = [v for v in violations if 'overlapping' in v.message.lower()]
226
+
227
+ if overlap_errors:
228
+ print(f"βœ“ Correctly detected overlapping subnets:")
229
+ for err in overlap_errors:
230
+ print(f" - {err}")
231
+ else:
232
+ print("βœ— Should have detected overlapping subnets")
233
+
234
+
235
+ def test_complete_validation():
236
+ """Test complete validation flow"""
237
+ print("\n=== Test 7: Complete Validation Flow ===")
238
+
239
+ model = {
240
+ "name": "production-network",
241
+ "version": "1.0.0",
242
+ "description": "Production corporate network",
243
+ "business_requirements": ["HA", "Secure guest access", "QoS for VoIP"],
244
+ "constraints": ["Cisco only", "Budget $25K"],
245
+ "vlans": [
246
+ {"id": 10, "name": "Management", "purpose": "Network management"},
247
+ {"id": 20, "name": "Users", "purpose": "Employee workstations"},
248
+ {"id": 30, "name": "Guest", "purpose": "Guest WiFi"},
249
+ {"id": 40, "name": "Voice", "purpose": "VoIP phones"}
250
+ ],
251
+ "subnets": [
252
+ {"network": "10.0.10.0/24", "gateway": "10.0.10.1", "vlan": 10, "purpose": "Management"},
253
+ {"network": "10.0.20.0/22", "gateway": "10.0.20.1", "vlan": 20, "purpose": "Users"},
254
+ {"network": "10.0.30.0/24", "gateway": "10.0.30.1", "vlan": 30, "purpose": "Guest"},
255
+ {"network": "10.0.40.0/24", "gateway": "10.0.40.1", "vlan": 40, "purpose": "Voice"}
256
+ ],
257
+ "devices": [
258
+ {
259
+ "name": "core-sw-01",
260
+ "role": "core",
261
+ "model": "Catalyst 9300",
262
+ "vendor": "cisco",
263
+ "mgmt_ip": "10.0.10.10",
264
+ "location": "Main IDF"
265
+ },
266
+ {
267
+ "name": "core-sw-02",
268
+ "role": "core",
269
+ "model": "Catalyst 9300",
270
+ "vendor": "cisco",
271
+ "mgmt_ip": "10.0.10.11",
272
+ "location": "Main IDF"
273
+ },
274
+ {
275
+ "name": "access-sw-01",
276
+ "role": "access",
277
+ "model": "Catalyst 2960X",
278
+ "vendor": "cisco",
279
+ "mgmt_ip": "10.0.10.20",
280
+ "location": "Floor 1"
281
+ }
282
+ ],
283
+ "routing": {
284
+ "protocol": "ospf",
285
+ "process_id": 1,
286
+ "router_id": "10.0.0.1",
287
+ "areas": ["0"]
288
+ },
289
+ "services": ["DHCP", "DNS", "NTP", "Syslog"]
290
+ }
291
+
292
+ # Schema validation
293
+ errors = get_validation_errors(model)
294
+ if errors:
295
+ print(f"βœ— Schema validation failed:")
296
+ for err in errors:
297
+ print(f" - {err}")
298
+ return
299
+
300
+ print("βœ“ Schema validation passed")
301
+ validated = validate_network_model(model)
302
+ print(f" - {len(validated.vlans)} VLANs")
303
+ print(f" - {len(validated.subnets)} subnets")
304
+ print(f" - {len(validated.devices)} devices")
305
+
306
+ # Policy validation
307
+ policy = NetworkPolicy()
308
+ violations = policy.check_network_model(model)
309
+
310
+ print(f"\nβœ“ Policy validation complete:")
311
+ by_severity = policy.get_violations_by_severity()
312
+ print(f" - Errors: {len(by_severity['ERROR'])}")
313
+ print(f" - Warnings: {len(by_severity['WARNING'])}")
314
+ print(f" - Info: {len(by_severity['INFO'])}")
315
+
316
+ if policy.has_errors():
317
+ print("\nβœ— Cannot proceed - fix errors first")
318
+ else:
319
+ print("\nβœ“ Ready for deployment")
320
+
321
+
322
+ if __name__ == "__main__":
323
+ print("\n" + "="*60)
324
+ print("Schema Validation & Policy Engine Test Suite")
325
+ print("="*60)
326
+
327
+ try:
328
+ test_valid_network()
329
+ test_invalid_vlan()
330
+ test_invalid_subnet()
331
+ test_duplicate_vlan_ids()
332
+ test_policy_engine()
333
+ test_overlapping_subnets()
334
+ test_complete_validation()
335
+
336
+ print("\n" + "="*60)
337
+ print("βœ“ All validation tests completed!")
338
+ print("="*60 + "\n")
339
+
340
+ except Exception as e:
341
+ print(f"\nβœ— Test failed: {e}")
342
+ import traceback
343
+ traceback.print_exc()
344
+ exit(1)