292 lines
10 KiB
Python
Executable File
292 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Unified Device Simulator for Home Automation.
|
|
|
|
Simulates multiple device types:
|
|
- Lights (test_lampe_1, test_lampe_2, test_lampe_3)
|
|
- Thermostats (test_thermo_1)
|
|
|
|
Each device:
|
|
- Subscribes to vendor/{device_id}/set
|
|
- Maintains local state
|
|
- Publishes state changes to vendor/{device_id}/state (retained, QoS 1)
|
|
- Thermostats simulate temperature drift every 5 seconds
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
|
|
from aiomqtt import Client, MqttError
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
BROKER_HOST = os.getenv("MQTT_BROKER", "172.16.2.16")
|
|
BROKER_PORT = int(os.getenv("MQTT_PORT", "1883"))
|
|
DRIFT_INTERVAL = 5 # seconds for thermostat temperature drift
|
|
|
|
# Device configurations
|
|
LIGHT_DEVICES = ["test_lampe_1", "test_lampe_2", "test_lampe_3"]
|
|
THERMOSTAT_DEVICES = ["test_thermo_1"]
|
|
|
|
|
|
class DeviceSimulator:
|
|
"""Unified simulator for lights and thermostats."""
|
|
|
|
def __init__(self):
|
|
# Light states
|
|
self.light_states: Dict[str, Dict[str, Any]] = {
|
|
"test_lampe_1": {"power": "off", "brightness": 50},
|
|
"test_lampe_2": {"power": "off", "brightness": 50},
|
|
"test_lampe_3": {"power": "off", "brightness": 50}
|
|
}
|
|
|
|
# Thermostat states
|
|
self.thermostat_states: Dict[str, Dict[str, Any]] = {
|
|
"test_thermo_1": {
|
|
"mode": "auto",
|
|
"target": 21.0,
|
|
"current": 20.5,
|
|
"battery": 90,
|
|
"window_open": False
|
|
}
|
|
}
|
|
|
|
self.client = None
|
|
self.running = True
|
|
self.drift_task = None
|
|
|
|
async def publish_state(self, device_id: str, device_type: str):
|
|
"""Publish device state to MQTT (retained, QoS 1)."""
|
|
if not self.client:
|
|
return
|
|
|
|
if device_type == "light":
|
|
state = self.light_states.get(device_id)
|
|
elif device_type == "thermostat":
|
|
state = self.thermostat_states.get(device_id)
|
|
else:
|
|
logger.warning(f"Unknown device type: {device_type}")
|
|
return
|
|
|
|
if not state:
|
|
logger.warning(f"Unknown device: {device_id}")
|
|
return
|
|
|
|
state_topic = f"vendor/{device_id}/state"
|
|
payload = json.dumps(state)
|
|
|
|
await self.client.publish(
|
|
state_topic,
|
|
payload=payload,
|
|
qos=1,
|
|
retain=True
|
|
)
|
|
logger.info(f"[{device_id}] Published state: {payload}")
|
|
|
|
async def handle_light_set(self, device_id: str, payload: dict):
|
|
"""Handle SET command for light device."""
|
|
if device_id not in self.light_states:
|
|
logger.warning(f"Unknown light device: {device_id}")
|
|
return
|
|
|
|
state = self.light_states[device_id]
|
|
updated = False
|
|
|
|
if "power" in payload:
|
|
old_power = state["power"]
|
|
state["power"] = payload["power"]
|
|
if old_power != state["power"]:
|
|
updated = True
|
|
logger.info(f"[{device_id}] Power: {old_power} -> {state['power']}")
|
|
|
|
if "brightness" in payload:
|
|
old_brightness = state["brightness"]
|
|
state["brightness"] = int(payload["brightness"])
|
|
if old_brightness != state["brightness"]:
|
|
updated = True
|
|
logger.info(f"[{device_id}] Brightness: {old_brightness} -> {state['brightness']}")
|
|
|
|
if updated:
|
|
await self.publish_state(device_id, "light")
|
|
|
|
async def handle_thermostat_set(self, device_id: str, payload: dict):
|
|
"""Handle SET command for thermostat device."""
|
|
if device_id not in self.thermostat_states:
|
|
logger.warning(f"Unknown thermostat device: {device_id}")
|
|
return
|
|
|
|
state = self.thermostat_states[device_id]
|
|
updated = False
|
|
|
|
if "mode" in payload:
|
|
new_mode = payload["mode"]
|
|
if new_mode in ["off", "heat", "auto"]:
|
|
old_mode = state["mode"]
|
|
state["mode"] = new_mode
|
|
if old_mode != new_mode:
|
|
updated = True
|
|
logger.info(f"[{device_id}] Mode: {old_mode} -> {new_mode}")
|
|
else:
|
|
logger.warning(f"[{device_id}] Invalid mode: {new_mode}")
|
|
|
|
if "target" in payload:
|
|
try:
|
|
new_target = float(payload["target"])
|
|
if 5.0 <= new_target <= 30.0:
|
|
old_target = state["target"]
|
|
state["target"] = new_target
|
|
if old_target != new_target:
|
|
updated = True
|
|
logger.info(f"[{device_id}] Target: {old_target}°C -> {new_target}°C")
|
|
else:
|
|
logger.warning(f"[{device_id}] Target out of range: {new_target}")
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"[{device_id}] Invalid target value: {payload['target']}")
|
|
|
|
if updated:
|
|
await self.publish_state(device_id, "thermostat")
|
|
|
|
def apply_temperature_drift(self, device_id: str):
|
|
"""
|
|
Simulate temperature drift for thermostat.
|
|
Max change: ±0.2°C per interval.
|
|
"""
|
|
if device_id not in self.thermostat_states:
|
|
return
|
|
|
|
state = self.thermostat_states[device_id]
|
|
|
|
if state["mode"] == "off":
|
|
# Drift towards ambient (18°C)
|
|
ambient = 18.0
|
|
diff = ambient - state["current"]
|
|
else:
|
|
# Drift towards target
|
|
diff = state["target"] - state["current"]
|
|
|
|
# Apply max ±0.2°C drift
|
|
if abs(diff) < 0.1:
|
|
state["current"] = round(state["current"] + diff, 1)
|
|
elif diff > 0:
|
|
state["current"] = round(state["current"] + 0.2, 1)
|
|
else:
|
|
state["current"] = round(state["current"] - 0.2, 1)
|
|
|
|
logger.info(f"[{device_id}] Temperature drift: current={state['current']}°C (target={state['target']}°C, mode={state['mode']})")
|
|
|
|
async def thermostat_drift_loop(self):
|
|
"""Background loop for thermostat temperature drift."""
|
|
while self.running:
|
|
await asyncio.sleep(DRIFT_INTERVAL)
|
|
|
|
for device_id in THERMOSTAT_DEVICES:
|
|
self.apply_temperature_drift(device_id)
|
|
await self.publish_state(device_id, "thermostat")
|
|
|
|
async def handle_message(self, message):
|
|
"""Handle incoming MQTT message."""
|
|
try:
|
|
# Extract device_id from topic (vendor/{device_id}/set)
|
|
topic_parts = message.topic.value.split('/')
|
|
if len(topic_parts) != 3 or topic_parts[0] != "vendor" or topic_parts[2] != "set":
|
|
logger.warning(f"Unexpected topic format: {message.topic}")
|
|
return
|
|
|
|
device_id = topic_parts[1]
|
|
payload = json.loads(message.payload.decode())
|
|
|
|
logger.info(f"[{device_id}] Received SET: {payload}")
|
|
|
|
# Determine device type and handle accordingly
|
|
if device_id in self.light_states:
|
|
await self.handle_light_set(device_id, payload)
|
|
elif device_id in self.thermostat_states:
|
|
await self.handle_thermostat_set(device_id, payload)
|
|
else:
|
|
logger.warning(f"Unknown device: {device_id}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Invalid JSON: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error handling message: {e}")
|
|
|
|
async def run(self):
|
|
"""Main simulator loop."""
|
|
# Generate unique client ID to avoid collisions
|
|
base_client_id = "device_simulator"
|
|
client_suffix = os.environ.get("MQTT_CLIENT_ID_SUFFIX") or uuid.uuid4().hex[:6]
|
|
unique_client_id = f"{base_client_id}-{client_suffix}"
|
|
|
|
try:
|
|
async with Client(
|
|
hostname=BROKER_HOST,
|
|
port=BROKER_PORT,
|
|
identifier=unique_client_id
|
|
) as client:
|
|
self.client = client
|
|
logger.info(f"✅ Connected to MQTT broker {BROKER_HOST}:{BROKER_PORT}")
|
|
|
|
# Publish initial states
|
|
for device_id in LIGHT_DEVICES:
|
|
await self.publish_state(device_id, "light")
|
|
logger.info(f"💡 Light simulator started: {device_id}")
|
|
|
|
for device_id in THERMOSTAT_DEVICES:
|
|
await self.publish_state(device_id, "thermostat")
|
|
logger.info(f"🌡️ Thermostat simulator started: {device_id}")
|
|
|
|
# Subscribe to all SET topics
|
|
all_devices = LIGHT_DEVICES + THERMOSTAT_DEVICES
|
|
for device_id in all_devices:
|
|
set_topic = f"vendor/{device_id}/set"
|
|
await client.subscribe(set_topic, qos=1)
|
|
logger.info(f"👂 Subscribed to {set_topic}")
|
|
|
|
# Start thermostat drift loop
|
|
self.drift_task = asyncio.create_task(self.thermostat_drift_loop())
|
|
|
|
# Listen for messages
|
|
async for message in client.messages:
|
|
await self.handle_message(message)
|
|
|
|
# Cancel drift loop on disconnect
|
|
if self.drift_task:
|
|
self.drift_task.cancel()
|
|
|
|
except MqttError as e:
|
|
logger.error(f"❌ MQTT Error: {e}")
|
|
except KeyboardInterrupt:
|
|
logger.info("⚠️ Interrupted by user")
|
|
finally:
|
|
self.running = False
|
|
if self.drift_task:
|
|
self.drift_task.cancel()
|
|
logger.info("👋 Simulator stopped")
|
|
|
|
|
|
async def main():
|
|
"""Entry point."""
|
|
simulator = DeviceSimulator()
|
|
await simulator.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Simulator terminated")
|
|
sys.exit(0)
|