""" Rules Configuration Schema and Loader Provides Pydantic models for validating rules.yaml configuration. """ from pathlib import Path from typing import Any, Optional import yaml from pydantic import BaseModel, Field, field_validator class Rule(BaseModel): """Single rule configuration""" id: str = Field(..., description="Unique rule identifier") name: Optional[str] = Field(None, description="Optional human-readable name") type: str = Field(..., description="Rule type (e.g., 'window_setback@1.0')") enabled: bool = Field(default=True, description="Whether this rule is enabled") objects: dict[str, Any] = Field(default_factory=dict, description="Objects this rule monitors or controls") params: dict[str, Any] = Field(default_factory=dict, description="Rule-specific parameters") @field_validator('id') @classmethod def validate_id(cls, v: str) -> str: if not v or not v.strip(): raise ValueError("Rule ID cannot be empty") return v.strip() @field_validator('type') @classmethod def validate_type(cls, v: str) -> str: if not v or not v.strip(): raise ValueError("Rule type cannot be empty") if '@' not in v: raise ValueError(f"Rule type must include version (e.g., 'window_setback@1.0'), got: {v}") return v.strip() class RulesConfig(BaseModel): """Root configuration object""" rules: list[Rule] = Field(..., description="List of all rules") @field_validator('rules') @classmethod def validate_unique_ids(cls, rules: list[Rule]) -> list[Rule]: """Ensure all rule IDs are unique""" ids = [rule.id for rule in rules] duplicates = [id for id in ids if ids.count(id) > 1] if duplicates: raise ValueError(f"Duplicate rule IDs found: {set(duplicates)}") return rules def load_rules_config(config_path: str | Path = "config/rules.yaml") -> RulesConfig: """ Load and validate rules configuration from YAML file. Args: config_path: Path to rules.yaml file Returns: Validated RulesConfig object Raises: FileNotFoundError: If config file doesn't exist ValueError: If YAML is invalid or validation fails """ config_path = Path(config_path) if not config_path.exists(): raise FileNotFoundError(f"Rules configuration not found: {config_path}") with open(config_path, 'r', encoding='utf-8') as f: try: data = yaml.safe_load(f) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML in {config_path}: {e}") from e if not data: raise ValueError(f"Empty configuration file: {config_path}") if 'rules' not in data: raise ValueError( f"Missing 'rules:' key in {config_path}. " "Configuration must start with 'rules:' followed by a list of rule definitions." ) try: return RulesConfig(**data) except Exception as e: raise ValueError(f"Configuration validation failed: {e}") from e def get_rule_by_id(config: RulesConfig, rule_id: str) -> Rule | None: """Get a specific rule by ID""" for rule in config.rules: if rule.id == rule_id: return rule return None def get_rules_by_type(config: RulesConfig, rule_type: str) -> list[Rule]: """Get all rules of a specific type""" return [rule for rule in config.rules if rule.type == rule_type] if __name__ == "__main__": # Test configuration loading try: config = load_rules_config() print(f"✅ Loaded {len(config.rules)} rules:") for rule in config.rules: name = f" ({rule.name})" if rule.name else "" enabled = "✓" if rule.enabled else "✗" print(f" [{enabled}] {rule.id}{name}: {rule.type}") if rule.objects: obj_summary = ", ".join(f"{k}: {len(v) if isinstance(v, list) else v}" for k, v in rule.objects.items()) print(f" Objects: {obj_summary}") except Exception as e: print(f"❌ Configuration error: {e}")