123 lines
4.1 KiB
Python
123 lines
4.1 KiB
Python
"""
|
|
Rules Configuration Schema and Loader
|
|
|
|
Provides Pydantic models for validating rules.yaml configuration.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import yaml
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
|
|
class Rule(BaseModel):
|
|
"""Single rule configuration"""
|
|
id: str = Field(..., description="Unique rule identifier")
|
|
name: Optional[str] = Field(None, description="Optional human-readable name")
|
|
type: str = Field(..., description="Rule type (e.g., 'window_setback@1.0')")
|
|
enabled: bool = Field(default=True, description="Whether this rule is enabled")
|
|
objects: dict[str, Any] = Field(default_factory=dict, description="Objects this rule monitors or controls")
|
|
params: dict[str, Any] = Field(default_factory=dict, description="Rule-specific parameters")
|
|
|
|
@field_validator('id')
|
|
@classmethod
|
|
def validate_id(cls, v: str) -> str:
|
|
if not v or not v.strip():
|
|
raise ValueError("Rule ID cannot be empty")
|
|
return v.strip()
|
|
|
|
@field_validator('type')
|
|
@classmethod
|
|
def validate_type(cls, v: str) -> str:
|
|
if not v or not v.strip():
|
|
raise ValueError("Rule type cannot be empty")
|
|
if '@' not in v:
|
|
raise ValueError(f"Rule type must include version (e.g., 'window_setback@1.0'), got: {v}")
|
|
return v.strip()
|
|
|
|
|
|
class RulesConfig(BaseModel):
|
|
"""Root configuration object"""
|
|
rules: list[Rule] = Field(..., description="List of all rules")
|
|
|
|
@field_validator('rules')
|
|
@classmethod
|
|
def validate_unique_ids(cls, rules: list[Rule]) -> list[Rule]:
|
|
"""Ensure all rule IDs are unique"""
|
|
ids = [rule.id for rule in rules]
|
|
duplicates = [id for id in ids if ids.count(id) > 1]
|
|
if duplicates:
|
|
raise ValueError(f"Duplicate rule IDs found: {set(duplicates)}")
|
|
return rules
|
|
|
|
|
|
def load_rules_config(config_path: str | Path = "config/rules.yaml") -> RulesConfig:
|
|
"""
|
|
Load and validate rules configuration from YAML file.
|
|
|
|
Args:
|
|
config_path: Path to rules.yaml file
|
|
|
|
Returns:
|
|
Validated RulesConfig object
|
|
|
|
Raises:
|
|
FileNotFoundError: If config file doesn't exist
|
|
ValueError: If YAML is invalid or validation fails
|
|
"""
|
|
config_path = Path(config_path)
|
|
|
|
if not config_path.exists():
|
|
raise FileNotFoundError(f"Rules configuration not found: {config_path}")
|
|
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
try:
|
|
data = yaml.safe_load(f)
|
|
except yaml.YAMLError as e:
|
|
raise ValueError(f"Invalid YAML in {config_path}: {e}") from e
|
|
|
|
if not data:
|
|
raise ValueError(f"Empty configuration file: {config_path}")
|
|
|
|
if 'rules' not in data:
|
|
raise ValueError(
|
|
f"Missing 'rules:' key in {config_path}. "
|
|
"Configuration must start with 'rules:' followed by a list of rule definitions."
|
|
)
|
|
|
|
try:
|
|
return RulesConfig(**data)
|
|
except Exception as e:
|
|
raise ValueError(f"Configuration validation failed: {e}") from e
|
|
|
|
|
|
def get_rule_by_id(config: RulesConfig, rule_id: str) -> Rule | None:
|
|
"""Get a specific rule by ID"""
|
|
for rule in config.rules:
|
|
if rule.id == rule_id:
|
|
return rule
|
|
return None
|
|
|
|
|
|
def get_rules_by_type(config: RulesConfig, rule_type: str) -> list[Rule]:
|
|
"""Get all rules of a specific type"""
|
|
return [rule for rule in config.rules if rule.type == rule_type]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test configuration loading
|
|
try:
|
|
config = load_rules_config()
|
|
print(f"✅ Loaded {len(config.rules)} rules:")
|
|
for rule in config.rules:
|
|
name = f" ({rule.name})" if rule.name else ""
|
|
enabled = "✓" if rule.enabled else "✗"
|
|
print(f" [{enabled}] {rule.id}{name}: {rule.type}")
|
|
if rule.objects:
|
|
obj_summary = ", ".join(f"{k}: {len(v) if isinstance(v, list) else v}"
|
|
for k, v in rule.objects.items())
|
|
print(f" Objects: {obj_summary}")
|
|
except Exception as e:
|
|
print(f"❌ Configuration error: {e}")
|