"""Device Simulator Web Application. FastAPI application that runs the device simulator in the background and provides a web interface with real-time event streaming (SSE). """ import asyncio import json import logging import os import uuid from datetime import datetime from typing import Dict, Any, AsyncGenerator from queue import Queue from collections import deque from aiomqtt import Client, MqttError from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.templating import Jinja2Templates from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Configuration BROKER_HOST = os.getenv("MQTT_BROKER", "172.16.2.16") BROKER_PORT = int(os.getenv("MQTT_PORT", "1883")) DRIFT_INTERVAL = 5 # seconds for thermostat temperature drift # Device configurations LIGHT_DEVICES = ["test_lampe_1", "test_lampe_2", "test_lampe_3"] THERMOSTAT_DEVICES = ["test_thermo_1"] # Global event queue for SSE event_queue = deque(maxlen=100) # Keep last 100 events sse_queues = [] # List of queues for active SSE connections def add_event(event: dict): """Add event to global queue and notify all SSE clients.""" event_with_ts = { **event, "timestamp": datetime.utcnow().isoformat() + "Z" } event_queue.append(event_with_ts) # Notify all SSE clients for q in sse_queues: try: q.put_nowait(event_with_ts) except: pass class DeviceSimulator: """Unified simulator for lights and thermostats.""" def __init__(self): # Light states self.light_states: Dict[str, Dict[str, Any]] = { "test_lampe_1": {"power": "off", "brightness": 50}, "test_lampe_2": {"power": "off", "brightness": 50}, "test_lampe_3": {"power": "off", "brightness": 50} } # Thermostat states self.thermostat_states: Dict[str, Dict[str, Any]] = { "test_thermo_1": { "mode": "auto", "target": 21.0, "current": 20.5, "battery": 90, "window_open": False } } self.client = None self.running = True self.drift_task = None self.connected = False async def publish_state(self, device_id: str, device_type: str): """Publish device state to MQTT (retained, QoS 1).""" if not self.client: return if device_type == "light": state = self.light_states.get(device_id) elif device_type == "thermostat": state = self.thermostat_states.get(device_id) else: logger.warning(f"Unknown device type: {device_type}") return if not state: logger.warning(f"Unknown device: {device_id}") return state_topic = f"vendor/{device_id}/state" payload = json.dumps(state) await self.client.publish( state_topic, payload=payload, qos=1, retain=True ) add_event({ "type": "state_published", "device_id": device_id, "device_type": device_type, "topic": state_topic, "payload": state }) logger.info(f"[{device_id}] Published state: {payload}") async def handle_light_set(self, device_id: str, payload: dict): """Handle SET command for light device.""" if device_id not in self.light_states: logger.warning(f"Unknown light device: {device_id}") return state = self.light_states[device_id] changes = {} if "power" in payload: old_power = state["power"] state["power"] = payload["power"] if old_power != state["power"]: changes["power"] = {"old": old_power, "new": state["power"]} logger.info(f"[{device_id}] Power: {old_power} -> {state['power']}") if "brightness" in payload: old_brightness = state["brightness"] state["brightness"] = int(payload["brightness"]) if old_brightness != state["brightness"]: changes["brightness"] = {"old": old_brightness, "new": state["brightness"]} logger.info(f"[{device_id}] Brightness: {old_brightness} -> {state['brightness']}") if changes: add_event({ "type": "light_updated", "device_id": device_id, "changes": changes, "new_state": dict(state) }) await self.publish_state(device_id, "light") async def handle_thermostat_set(self, device_id: str, payload: dict): """Handle SET command for thermostat device.""" if device_id not in self.thermostat_states: logger.warning(f"Unknown thermostat device: {device_id}") return state = self.thermostat_states[device_id] changes = {} if "mode" in payload: new_mode = payload["mode"] if new_mode in ["off", "heat", "auto"]: old_mode = state["mode"] state["mode"] = new_mode if old_mode != new_mode: changes["mode"] = {"old": old_mode, "new": new_mode} logger.info(f"[{device_id}] Mode: {old_mode} -> {new_mode}") else: logger.warning(f"[{device_id}] Invalid mode: {new_mode}") if "target" in payload: try: new_target = float(payload["target"]) if 5.0 <= new_target <= 30.0: old_target = state["target"] state["target"] = new_target if old_target != new_target: changes["target"] = {"old": old_target, "new": new_target} logger.info(f"[{device_id}] Target: {old_target}°C -> {new_target}°C") else: logger.warning(f"[{device_id}] Target out of range: {new_target}") except (ValueError, TypeError): logger.warning(f"[{device_id}] Invalid target value: {payload['target']}") if changes: add_event({ "type": "thermostat_updated", "device_id": device_id, "changes": changes, "new_state": dict(state) }) await self.publish_state(device_id, "thermostat") def apply_temperature_drift(self, device_id: str): """ Simulate temperature drift for thermostat. Max change: ±0.2°C per interval. """ if device_id not in self.thermostat_states: return state = self.thermostat_states[device_id] old_current = state["current"] if state["mode"] == "off": # Drift towards ambient (18°C) ambient = 18.0 diff = ambient - state["current"] else: # Drift towards target diff = state["target"] - state["current"] # Apply max ±0.2°C drift if abs(diff) < 0.1: state["current"] = round(state["current"] + diff, 1) elif diff > 0: state["current"] = round(state["current"] + 0.2, 1) else: state["current"] = round(state["current"] - 0.2, 1) if old_current != state["current"]: add_event({ "type": "temperature_drift", "device_id": device_id, "old_temp": old_current, "new_temp": state["current"], "target": state["target"], "mode": state["mode"] }) logger.info(f"[{device_id}] Temperature drift: current={state['current']}°C (target={state['target']}°C, mode={state['mode']})") async def thermostat_drift_loop(self): """Background loop for thermostat temperature drift.""" while self.running: await asyncio.sleep(DRIFT_INTERVAL) for device_id in THERMOSTAT_DEVICES: self.apply_temperature_drift(device_id) await self.publish_state(device_id, "thermostat") async def handle_message(self, message): """Handle incoming MQTT message.""" try: # Extract device_id from topic (vendor/{device_id}/set) topic_parts = message.topic.value.split('/') if len(topic_parts) != 3 or topic_parts[0] != "vendor" or topic_parts[2] != "set": logger.warning(f"Unexpected topic format: {message.topic}") return device_id = topic_parts[1] payload = json.loads(message.payload.decode()) add_event({ "type": "command_received", "device_id": device_id, "topic": message.topic.value, "payload": payload }) logger.info(f"[{device_id}] Received SET: {payload}") # Determine device type and handle accordingly if device_id in self.light_states: await self.handle_light_set(device_id, payload) elif device_id in self.thermostat_states: await self.handle_thermostat_set(device_id, payload) else: logger.warning(f"Unknown device: {device_id}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON: {e}") add_event({"type": "error", "message": f"Invalid JSON: {e}"}) except Exception as e: logger.error(f"Error handling message: {e}") add_event({"type": "error", "message": f"Error: {e}"}) async def run(self): """Main simulator loop.""" # Generate unique client ID to avoid collisions base_client_id = "device_simulator" client_suffix = os.environ.get("MQTT_CLIENT_ID_SUFFIX") or uuid.uuid4().hex[:6] unique_client_id = f"{base_client_id}-{client_suffix}" try: async with Client( hostname=BROKER_HOST, port=BROKER_PORT, identifier=unique_client_id ) as client: self.client = client self.connected = True add_event({ "type": "simulator_connected", "broker": f"{BROKER_HOST}:{BROKER_PORT}", "client_id": unique_client_id }) logger.info(f"✅ Connected to MQTT broker {BROKER_HOST}:{BROKER_PORT}") # Publish initial states for device_id in LIGHT_DEVICES: await self.publish_state(device_id, "light") logger.info(f"💡 Light simulator started: {device_id}") for device_id in THERMOSTAT_DEVICES: await self.publish_state(device_id, "thermostat") logger.info(f"🌡️ Thermostat simulator started: {device_id}") add_event({ "type": "devices_initialized", "lights": LIGHT_DEVICES, "thermostats": THERMOSTAT_DEVICES }) # Subscribe to all SET topics all_devices = LIGHT_DEVICES + THERMOSTAT_DEVICES for device_id in all_devices: set_topic = f"vendor/{device_id}/set" await client.subscribe(set_topic, qos=1) logger.info(f"👂 Subscribed to {set_topic}") add_event({ "type": "subscriptions_complete", "topics": [f"vendor/{d}/set" for d in all_devices] }) # Start thermostat drift loop self.drift_task = asyncio.create_task(self.thermostat_drift_loop()) # Listen for messages async for message in client.messages: await self.handle_message(message) # Cancel drift loop on disconnect if self.drift_task: self.drift_task.cancel() except MqttError as e: logger.error(f"❌ MQTT Error: {e}") add_event({"type": "error", "message": f"MQTT Error: {e}"}) self.connected = False except Exception as e: logger.error(f"❌ Error: {e}") add_event({"type": "error", "message": f"Error: {e}"}) self.connected = False finally: self.running = False self.connected = False if self.drift_task: self.drift_task.cancel() add_event({"type": "simulator_stopped"}) logger.info("👋 Simulator stopped") # Create FastAPI app app = FastAPI( title="Device Simulator", description="Web interface for device simulator with real-time event streaming", version="1.0.0" ) # Setup templates templates_dir = Path(__file__).parent / "templates" templates = Jinja2Templates(directory=str(templates_dir)) # Global simulator instance simulator = DeviceSimulator() simulator_task = None @app.on_event("startup") async def startup_event(): """Start simulator on application startup.""" global simulator_task simulator_task = asyncio.create_task(simulator.run()) add_event({"type": "app_started", "message": "Simulator web app started"}) logger.info("🚀 Simulator web app started") @app.on_event("shutdown") async def shutdown_event(): """Stop simulator on application shutdown.""" global simulator_task simulator.running = False if simulator_task: simulator_task.cancel() try: await simulator_task except asyncio.CancelledError: pass add_event({"type": "app_stopped", "message": "Simulator web app stopped"}) logger.info("🛑 Simulator web app stopped") @app.get("/", response_class=HTMLResponse) async def index(request: Request): """Render simulator dashboard.""" return templates.TemplateResponse("index.html", { "request": request, "broker": f"{BROKER_HOST}:{BROKER_PORT}", "light_devices": LIGHT_DEVICES, "thermostat_devices": THERMOSTAT_DEVICES, "connected": simulator.connected }) @app.get("/health") async def health(): """Health check endpoint.""" return { "status": "ok", "simulator_running": simulator.running, "mqtt_connected": simulator.connected } @app.get("/status") async def status(): """Get current simulator status.""" return { "connected": simulator.connected, "running": simulator.running, "light_states": simulator.light_states, "thermostat_states": simulator.thermostat_states, "broker": f"{BROKER_HOST}:{BROKER_PORT}" } @app.get("/events") async def get_events(): """Get recent events.""" return { "events": list(event_queue) } @app.get("/realtime") async def realtime(request: Request): """Server-Sent Events stream for real-time updates.""" async def event_generator() -> AsyncGenerator[str, None]: # Create a queue for this SSE connection q = asyncio.Queue() sse_queues.append(q) try: # Send recent events first for event in event_queue: yield f"data: {json.dumps(event)}\n\n" # Stream new events while True: # Check if client disconnected if await request.is_disconnected(): break try: # Wait for new event with timeout event = await asyncio.wait_for(q.get(), timeout=30.0) yield f"data: {json.dumps(event)}\n\n" except asyncio.TimeoutError: # Send heartbeat yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n" finally: # Remove queue on disconnect if q in sse_queues: sse_queues.remove(q) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8003)