zwei Lampen und Test-Werkzeug

This commit is contained in:
2025-10-31 15:17:28 +01:00
parent ea17d048ad
commit c3ec6e3fc4
7 changed files with 949 additions and 43 deletions

View File

@@ -43,6 +43,11 @@ def load_config(config_path: Path) -> dict[str, Any]:
with open(config_path, "r") as f:
config = yaml.safe_load(f)
# Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally
devices = config.get("devices", [])
for device in devices:
device["device_id"] = device.pop("device_id", device.pop("id", None))
logger.info(f"Loaded configuration from {config_path}")
return config
@@ -56,16 +61,33 @@ def validate_devices(devices: list[dict[str, Any]]) -> None:
Raises:
ValueError: If device configuration is invalid
"""
required_fields = ["device_id", "type", "cap_version", "technology"]
for device in devices:
if "id" not in device:
raise ValueError(f"Device missing 'id': {device}")
if "type" not in device:
raise ValueError(f"Device {device['id']} missing 'type'")
# Check for device_id
if "device_id" not in device or device["device_id"] is None:
raise ValueError(f"Device entry requires 'id' or 'device_id': {device}")
device_id = device["device_id"]
# Check required top-level fields
for field in required_fields:
if field not in device:
raise ValueError(f"Device {device_id} missing '{field}'")
# Check topics structure
if "topics" not in device:
raise ValueError(f"Device {device['id']} missing 'topics'")
if "set" not in device["topics"] or "state" not in device["topics"]:
raise ValueError(f"Device {device['id']} missing 'topics.set' or 'topics.state'")
logger.info(f"Validated {len(devices)} device(s)")
raise ValueError(f"Device {device_id} missing 'topics'")
if "set" not in device["topics"]:
raise ValueError(f"Device {device_id} missing 'topics.set'")
if "state" not in device["topics"]:
raise ValueError(f"Device {device_id} missing 'topics.state'")
# Log loaded devices
device_ids = [d["device_id"] for d in devices]
logger.info(f"Loaded {len(devices)} device(s): {', '.join(device_ids)}")
async def get_redis_client(redis_url: str, max_retries: int = 5) -> aioredis.Redis:
@@ -172,7 +194,7 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
redis_config = config.get("redis", {})
redis_channel = redis_config.get("channel", "ui:updates")
devices = {d["id"]: d for d in config.get("devices", [])}
devices = {d["device_id"]: d for d in config.get("devices", [])}
retry_delay = 1
max_retry_delay = 60
@@ -191,7 +213,7 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
# Subscribe to abstract SET topics for all devices
for device in devices.values():
abstract_set_topic = f"home/{device['type']}/{device['id']}/set"
abstract_set_topic = f"home/{device['type']}/{device['device_id']}/set"
await client.subscribe(abstract_set_topic)
logger.info(f"Subscribed to abstract SET: {abstract_set_topic}")

View File

@@ -1,14 +1,17 @@
"""API main entry point."""
import asyncio
import json
import os
from pathlib import Path
from typing import Any
from typing import Any, AsyncGenerator
import redis.asyncio as aioredis
import yaml
from aiomqtt import Client
from fastapi import FastAPI, HTTPException, status
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ValidationError
from packages.home_capabilities import CAP_VERSION, LightState
@@ -85,7 +88,12 @@ def load_devices() -> list[dict[str, Any]]:
with open(config_path, "r") as f:
config = yaml.safe_load(f)
return config.get("devices", [])
# Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally
devices = config.get("devices", [])
for device in devices:
device["device_id"] = device.pop("device_id", device.pop("id", None))
return devices
def get_mqtt_settings() -> tuple[str, int]:
@@ -99,6 +107,25 @@ def get_mqtt_settings() -> tuple[str, int]:
return host, port
def get_redis_settings() -> tuple[str, str]:
"""Get Redis settings from configuration.
Returns:
tuple: (url, channel)
"""
config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
if config_path.exists():
with open(config_path, "r") as f:
config = yaml.safe_load(f)
redis_config = config.get("redis", {})
url = redis_config.get("url", "redis://localhost:6379/0")
channel = redis_config.get("channel", "ui:updates")
return url, channel
return "redis://localhost:6379/0", "ui:updates"
async def publish_mqtt(topic: str, payload: dict[str, Any]) -> None:
"""Publish message to MQTT broker.
@@ -123,9 +150,9 @@ async def get_devices() -> list[DeviceInfo]:
devices = load_devices()
return [
DeviceInfo(
device_id=device["id"],
device_id=device["device_id"],
type=device["type"],
name=device.get("name", device["id"])
name=device.get("name", device["device_id"])
)
for device in devices
]
@@ -147,7 +174,7 @@ async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str
"""
# Load devices and check if device exists
devices = load_devices()
device = next((d for d in devices if d["id"] == device_id), None)
device = next((d for d in devices if d["device_id"] == device_id), None)
if not device:
raise HTTPException(
@@ -182,6 +209,81 @@ async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str
return {"message": f"Command sent to {device_id}"}
async def event_generator(request: Request) -> AsyncGenerator[str, None]:
"""Generate SSE events from Redis Pub/Sub.
Args:
request: FastAPI request object for disconnect detection
Yields:
str: SSE formatted event strings
"""
redis_url, redis_channel = get_redis_settings()
redis_client = await aioredis.from_url(redis_url, decode_responses=True)
pubsub = redis_client.pubsub()
try:
await pubsub.subscribe(redis_channel)
# Create heartbeat task
last_heartbeat = asyncio.get_event_loop().time()
while True:
# Check if client disconnected
if await request.is_disconnected():
break
# Get message with timeout for heartbeat
try:
message = await asyncio.wait_for(
pubsub.get_message(ignore_subscribe_messages=True),
timeout=1.0
)
if message and message["type"] == "message":
# Send data event
data = message["data"]
yield f"event: message\ndata: {data}\n\n"
last_heartbeat = asyncio.get_event_loop().time()
except asyncio.TimeoutError:
pass
# Send heartbeat every 25 seconds
current_time = asyncio.get_event_loop().time()
if current_time - last_heartbeat >= 25:
yield "event: ping\ndata: heartbeat\n\n"
last_heartbeat = current_time
finally:
await pubsub.unsubscribe(redis_channel)
await pubsub.close()
await redis_client.close()
@app.get("/realtime")
async def realtime_events(request: Request) -> StreamingResponse:
"""Server-Sent Events endpoint for real-time updates.
Args:
request: FastAPI request object
Returns:
StreamingResponse: SSE stream of Redis messages
"""
return StreamingResponse(
event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
return {"message": f"Command sent to {device_id}"}
def main() -> None:
"""Run the API application with uvicorn."""
import uvicorn

View File

@@ -15,18 +15,20 @@
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
padding: 2rem;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
header {
background: white;
border-radius: 16px;
padding: 2rem;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3);
max-width: 600px;
width: 90%;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
margin-bottom: 2rem;
}
h1 {
@@ -34,17 +36,355 @@
margin-bottom: 0.5rem;
}
p {
.status {
display: inline-block;
padding: 0.25rem 0.75rem;
border-radius: 12px;
font-size: 0.875rem;
font-weight: 500;
}
.status.connected {
background: #d4edda;
color: #155724;
}
.status.disconnected {
background: #f8d7da;
color: #721c24;
}
.devices {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
gap: 1.5rem;
}
.device-card {
background: white;
border-radius: 16px;
padding: 1.5rem;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
}
.device-header {
margin-bottom: 1.5rem;
}
.device-name {
font-size: 1.5rem;
font-weight: 600;
color: #333;
margin-bottom: 0.25rem;
}
.device-type {
font-size: 0.875rem;
color: #666;
line-height: 1.6;
text-transform: uppercase;
}
.device-state {
padding: 0.5rem 1rem;
background: #f8f9fa;
border-radius: 8px;
margin: 1rem 0;
font-family: 'Courier New', monospace;
font-size: 0.875rem;
}
.state-label {
color: #666;
font-weight: 500;
}
.state-value {
color: #333;
font-weight: 600;
}
.state-value.on {
color: #28a745;
}
.state-value.off {
color: #dc3545;
}
.controls {
display: flex;
flex-direction: column;
gap: 1rem;
}
.toggle-button {
padding: 0.75rem 1.5rem;
border: none;
border-radius: 8px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: all 0.2s;
color: white;
}
.toggle-button.on {
background: #28a745;
}
.toggle-button.on:hover {
background: #218838;
}
.toggle-button.off {
background: #6c757d;
}
.toggle-button.off:hover {
background: #5a6268;
}
.events {
margin-top: 2rem;
background: white;
border-radius: 16px;
padding: 1.5rem;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
}
.events h2 {
color: #333;
margin-bottom: 1rem;
font-size: 1.25rem;
}
.event-list {
max-height: 300px;
overflow-y: auto;
}
.event-item {
padding: 0.75rem;
border-left: 3px solid #667eea;
background: #f8f9fa;
margin-bottom: 0.5rem;
border-radius: 4px;
font-size: 0.875rem;
}
.event-time {
color: #666;
font-size: 0.75rem;
}
.event-data {
color: #333;
margin-top: 0.25rem;
font-family: 'Courier New', monospace;
}
</style>
</head>
<body>
<div class="container">
<header>
<h1>🏠 Home Automation</h1>
<p>UI wird geladen...</p>
<p>API erreichbar? <a href="http://localhost:8001/health" target="_blank">API Health Check</a></p>
<p>Realtime Status: <span class="status disconnected" id="connection-status">Verbinde...</span></p>
</header>
<div class="devices">
<!-- Test Lampe Card -->
<div class="device-card">
<div class="device-header">
<div class="device-name">💡 Test Lampe 1</div>
<div class="device-type">Light</div>
</div>
<div class="device-state">
<span class="state-label">Status:</span>
<span class="state-value off" id="state-test_lampe_1">off</span>
</div>
<div class="controls">
<button
class="toggle-button off"
id="toggle-test_lampe_1"
onclick="toggleDevice('test_lampe_1')">
Einschalten
</button>
</div>
</div>
<div class="device-card">
<div class="device-header">
<div class="device-name">💡 Test Lampe 2</div>
<div class="device-type">Light</div>
</div>
<div class="device-state">
<span class="state-label">Status:</span>
<span class="state-value off" id="state-test_lampe_2">off</span>
</div>
<div class="controls">
<button
class="toggle-button off"
id="toggle-test_lampe_2"
onclick="toggleDevice('test_lampe_2')">
Einschalten
</button>
</div>
</div>
</div>
<div class="events">
<h2>📡 Realtime Events</h2>
<div class="event-list" id="event-list">
<p style="color: #666; font-size: 0.875rem;">Warte auf Events...</p>
</div>
</div>
</div>
<script>
const API_BASE = 'http://localhost:8001';
let eventSource = null;
let currentState = {
'test_lampe_1': 'off',
'test_lampe_2': 'off'
};
// Toggle device state
async function toggleDevice(deviceId) {
const newState = currentState[deviceId] === 'on' ? 'off' : 'on';
try {
const response = await fetch(`${API_BASE}/devices/${deviceId}/set`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
type: 'light',
payload: {
power: newState
}
})
});
if (response.ok) {
console.log(`Sent ${newState} command to ${deviceId}`);
addEvent({
action: 'command_sent',
device_id: deviceId,
state: newState
});
}
} catch (error) {
console.error('Failed to toggle device:', error);
}
}
// Update device UI
function updateDeviceUI(deviceId, power) {
currentState[deviceId] = power;
const stateSpan = document.getElementById(`state-${deviceId}`);
const toggleButton = document.getElementById(`toggle-${deviceId}`);
if (stateSpan) {
stateSpan.textContent = power;
stateSpan.className = `state-value ${power}`;
}
if (toggleButton) {
if (power === 'on') {
toggleButton.textContent = 'Ausschalten';
toggleButton.className = 'toggle-button on';
} else {
toggleButton.textContent = 'Einschalten';
toggleButton.className = 'toggle-button off';
}
}
}
// Add event to list
function addEvent(event) {
const eventList = document.getElementById('event-list');
// Clear placeholder
if (eventList.children.length === 1 && eventList.children[0].tagName === 'P') {
eventList.innerHTML = '';
}
const eventItem = document.createElement('div');
eventItem.className = 'event-item';
const now = new Date().toLocaleTimeString('de-DE');
eventItem.innerHTML = `
<div class="event-time">${now}</div>
<div class="event-data">${JSON.stringify(event, null, 2)}</div>
`;
eventList.insertBefore(eventItem, eventList.firstChild);
// Keep only last 10 events
while (eventList.children.length > 10) {
eventList.removeChild(eventList.lastChild);
}
}
// Connect to SSE
function connectSSE() {
eventSource = new EventSource(`${API_BASE}/realtime`);
eventSource.onopen = () => {
console.log('SSE connected');
document.getElementById('connection-status').textContent = 'Verbunden';
document.getElementById('connection-status').className = 'status connected';
};
eventSource.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
console.log('SSE message:', data);
addEvent(data);
// Update device state
if (data.type === 'state' && data.device_id) {
if (data.payload && data.payload.power) {
updateDeviceUI(data.device_id, data.payload.power);
}
}
});
eventSource.addEventListener('ping', (e) => {
console.log('Heartbeat received');
});
eventSource.onerror = (error) => {
console.error('SSE error:', error);
document.getElementById('connection-status').textContent = 'Getrennt';
document.getElementById('connection-status').className = 'status disconnected';
eventSource.close();
// Reconnect after 5 seconds
setTimeout(connectSSE, 5000);
};
}
// Initialize
connectSSE();
// Optional: Load initial state from API
async function loadDevices() {
try {
const response = await fetch(`${API_BASE}/devices`);
const devices = await response.json();
console.log('Loaded devices:', devices);
} catch (error) {
console.error('Failed to load devices:', error);
}
}
loadDevices();
</script>
</body>
</html>

View File

@@ -1,5 +1,4 @@
# Device Configuration
# Configuration for home automation devices
version: 1
mqtt:
broker: "172.16.2.16"
@@ -14,18 +13,21 @@ redis:
channel: "ui:updates"
devices:
- id: "test_lampe"
type: "light"
name: "Test Lampe"
- device_id: test_lampe_1
type: light
cap_version: "light@1.2.0"
technology: zigbee2mqtt
features:
power: true
topics:
set: "vendor/test_lampe/set"
state: "vendor/test_lampe/state"
# - color
# - id: "light_bedroom"
# type: "light"
# name: "Bedroom Light"
# mqtt_topic: "home/bedroom/light"
# capabilities:
# - power
# - brightness
set: "vendor/test_lampe_1/set"
state: "vendor/test_lampe_1/state"
- device_id: test_lampe_2
type: light
cap_version: "light@1.2.0"
technology: zigbee2mqtt
features:
power: true
topics:
set: "vendor/test_lampe_2/set"
state: "vendor/test_lampe_2/state"

View File

@@ -20,6 +20,7 @@ aiomqtt = "^2.4.0"
jinja2 = "^3.1.6"
apscheduler = "^3.11.0"
redis = "^7.0.1"
paho-mqtt = "^2.1.0"
[tool.poetry.group.dev.dependencies]
ruff = "^0.6.0"

186
tools/sim_test_lampe.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""MQTT Simulator for test_lampe device.
This simulator acts as a virtual light device that:
- Subscribes to vendor/test_lampe/set
- Maintains local state
- Publishes state changes to vendor/test_lampe/state (retained)
"""
import json
import logging
import os
import signal
import sys
import time
import paho.mqtt.client as mqtt
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Configuration
BROKER_HOST = os.environ.get("MQTT_HOST", "172.16.2.16")
BROKER_PORT = int(os.environ.get("MQTT_PORT", "1883"))
DEVICE_ID = "test_lampe_1"
SET_TOPIC = f"vendor/{DEVICE_ID}/set"
STATE_TOPIC = f"vendor/{DEVICE_ID}/state"
# Device state
device_state = {
"power": "off",
"brightness": 50
}
# Global client for signal handler
client_global = None
def on_connect(client, userdata, flags, rc, properties=None):
"""Callback when connected to MQTT broker.
Args:
client: MQTT client instance
userdata: User data
flags: Connection flags
rc: Connection result code
properties: Connection properties (MQTT v5)
"""
if rc == 0:
logger.info(f"Connected to MQTT broker {BROKER_HOST}:{BROKER_PORT}")
# Subscribe to SET topic
client.subscribe(SET_TOPIC, qos=1)
logger.info(f"Subscribed to {SET_TOPIC}")
# Publish initial state (retained)
publish_state(client)
logger.info(f"Simulator started, initial state published: {device_state}")
else:
logger.error(f"Connection failed with code {rc}")
def on_message(client, userdata, msg):
"""Callback when message received on subscribed topic.
Args:
client: MQTT client instance
userdata: User data
msg: MQTT message
"""
global device_state
try:
payload = json.loads(msg.payload.decode())
logger.info(f"Received SET command: {payload}")
# Update device state
updated = False
if "power" in payload:
old_power = device_state["power"]
device_state["power"] = payload["power"]
if old_power != device_state["power"]:
updated = True
logger.info(f"Power changed: {old_power} -> {device_state['power']}")
if "brightness" in payload:
old_brightness = device_state["brightness"]
device_state["brightness"] = int(payload["brightness"])
if old_brightness != device_state["brightness"]:
updated = True
logger.info(f"Brightness changed: {old_brightness} -> {device_state['brightness']}")
# Publish updated state if changed
if updated:
publish_state(client)
logger.info(f"Published new state: {device_state}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in message: {e}")
except Exception as e:
logger.error(f"Error processing message: {e}")
def publish_state(client):
"""Publish current device state to STATE topic.
Args:
client: MQTT client instance
"""
state_json = json.dumps(device_state)
result = client.publish(STATE_TOPIC, state_json, qos=1, retain=True)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.debug(f"Published state to {STATE_TOPIC}: {state_json}")
else:
logger.error(f"Failed to publish state: {result.rc}")
def signal_handler(sig, frame):
"""Handle shutdown signals gracefully.
Args:
sig: Signal number
frame: Current stack frame
"""
logger.info(f"Received signal {sig}, shutting down...")
if client_global:
# Publish offline state before disconnecting
offline_state = device_state.copy()
offline_state["power"] = "off"
client_global.publish(STATE_TOPIC, json.dumps(offline_state), qos=1, retain=True)
logger.info("Published offline state")
client_global.disconnect()
client_global.loop_stop()
sys.exit(0)
def main():
"""Main entry point for the simulator."""
global client_global
# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Create MQTT client
client = mqtt.Client(
client_id=f"simulator-{DEVICE_ID}",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
client_global = client
# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
# Connect to broker
logger.info(f"Connecting to MQTT broker {BROKER_HOST}:{BROKER_PORT}...")
try:
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
# Start network loop
client.loop_forever()
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Error: {e}")
finally:
if client.is_connected():
client.disconnect()
client.loop_stop()
if __name__ == "__main__":
main()

253
tools/test_config_loader.py Normal file
View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""Test configuration loader with different YAML formats."""
import sys
import tempfile
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from apps.abstraction.main import load_config, validate_devices
def test_device_id_format():
"""Test YAML with 'device_id' key."""
yaml_content = """version: 1
mqtt:
broker: "172.16.2.16"
port: 1883
client_id: "test"
redis:
url: "redis://localhost:6379/8"
channel: "ui:updates"
devices:
- device_id: test_lampe
type: light
cap_version: "light@1.2.0"
technology: zigbee2mqtt
features:
power: true
topics:
set: "vendor/test_lampe/set"
state: "vendor/test_lampe/state"
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(yaml_content)
temp_path = Path(f.name)
try:
config = load_config(temp_path)
devices = config.get("devices", [])
validate_devices(devices)
print("✓ Test 1 passed: YAML with 'device_id' loaded successfully")
print(f" Device ID: {devices[0]['device_id']}")
return True
except Exception as e:
print(f"✗ Test 1 failed: {e}")
return False
finally:
temp_path.unlink()
def test_id_format():
"""Test YAML with 'id' key (legacy format)."""
yaml_content = """version: 1
mqtt:
broker: "172.16.2.16"
port: 1883
client_id: "test"
redis:
url: "redis://localhost:6379/8"
channel: "ui:updates"
devices:
- id: test_lampe
type: light
cap_version: "light@1.2.0"
technology: zigbee2mqtt
features:
power: true
topics:
set: "vendor/test_lampe/set"
state: "vendor/test_lampe/state"
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(yaml_content)
temp_path = Path(f.name)
try:
config = load_config(temp_path)
devices = config.get("devices", [])
validate_devices(devices)
print("✓ Test 2 passed: YAML with 'id' loaded successfully")
print(f" Device ID: {devices[0]['device_id']}")
return True
except Exception as e:
print(f"✗ Test 2 failed: {e}")
return False
finally:
temp_path.unlink()
def test_missing_id():
"""Test YAML without 'id' or 'device_id' key."""
yaml_content = """version: 1
mqtt:
broker: "172.16.2.16"
port: 1883
devices:
- type: light
cap_version: "light@1.2.0"
technology: zigbee2mqtt
topics:
set: "vendor/test_lampe/set"
state: "vendor/test_lampe/state"
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(yaml_content)
temp_path = Path(f.name)
try:
config = load_config(temp_path)
devices = config.get("devices", [])
validate_devices(devices)
print("✗ Test 3 failed: Should have raised ValueError for missing device_id")
return False
except ValueError as e:
if "requires 'id' or 'device_id'" in str(e):
print(f"✓ Test 3 passed: Correct error for missing device_id")
print(f" Error: {e}")
return True
else:
print(f"✗ Test 3 failed: Wrong error message: {e}")
return False
except Exception as e:
print(f"✗ Test 3 failed: Unexpected error: {e}")
return False
finally:
temp_path.unlink()
def test_missing_topics_set():
"""Test YAML without 'topics.set'."""
yaml_content = """version: 1
mqtt:
broker: "172.16.2.16"
port: 1883
devices:
- device_id: test_lampe
type: light
cap_version: "light@1.2.0"
technology: zigbee2mqtt
topics:
state: "vendor/test_lampe/state"
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(yaml_content)
temp_path = Path(f.name)
try:
config = load_config(temp_path)
devices = config.get("devices", [])
validate_devices(devices)
print("✗ Test 4 failed: Should have raised ValueError for missing topics.set")
return False
except ValueError as e:
if "missing 'topics.set'" in str(e):
print(f"✓ Test 4 passed: Correct error for missing topics.set")
print(f" Error: {e}")
return True
else:
print(f"✗ Test 4 failed: Wrong error message: {e}")
return False
except Exception as e:
print(f"✗ Test 4 failed: Unexpected error: {e}")
return False
finally:
temp_path.unlink()
def test_missing_topics_state():
"""Test YAML without 'topics.state'."""
yaml_content = """version: 1
mqtt:
broker: "172.16.2.16"
port: 1883
devices:
- device_id: test_lampe
type: light
cap_version: "light@1.2.0"
technology: zigbee2mqtt
topics:
set: "vendor/test_lampe/set"
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(yaml_content)
temp_path = Path(f.name)
try:
config = load_config(temp_path)
devices = config.get("devices", [])
validate_devices(devices)
print("✗ Test 5 failed: Should have raised ValueError for missing topics.state")
return False
except ValueError as e:
if "missing 'topics.state'" in str(e):
print(f"✓ Test 5 passed: Correct error for missing topics.state")
print(f" Error: {e}")
return True
else:
print(f"✗ Test 5 failed: Wrong error message: {e}")
return False
except Exception as e:
print(f"✗ Test 5 failed: Unexpected error: {e}")
return False
finally:
temp_path.unlink()
def main():
"""Run all tests."""
print("Testing Configuration Loader")
print("=" * 60)
tests = [
test_device_id_format,
test_id_format,
test_missing_id,
test_missing_topics_set,
test_missing_topics_state,
]
results = []
for test in tests:
results.append(test())
print()
print("=" * 60)
passed = sum(results)
total = len(results)
print(f"Results: {passed}/{total} tests passed")
return 0 if all(results) else 1
if __name__ == "__main__":
sys.exit(main())