simulator
This commit is contained in:
489
apps/simulator/main.py
Normal file
489
apps/simulator/main.py
Normal file
@@ -0,0 +1,489 @@
|
||||
"""Device Simulator Web Application.
|
||||
|
||||
FastAPI application that runs the device simulator in the background
|
||||
and provides a web interface with real-time event streaming (SSE).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, AsyncGenerator
|
||||
from queue import Queue
|
||||
from collections import deque
|
||||
|
||||
from aiomqtt import Client, MqttError
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import HTMLResponse, StreamingResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from pathlib import Path
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Configuration
|
||||
BROKER_HOST = os.getenv("MQTT_BROKER", "172.16.2.16")
|
||||
BROKER_PORT = int(os.getenv("MQTT_PORT", "1883"))
|
||||
DRIFT_INTERVAL = 5 # seconds for thermostat temperature drift
|
||||
|
||||
# Device configurations
|
||||
LIGHT_DEVICES = ["test_lampe_1", "test_lampe_2", "test_lampe_3"]
|
||||
THERMOSTAT_DEVICES = ["test_thermo_1"]
|
||||
|
||||
# Global event queue for SSE
|
||||
event_queue = deque(maxlen=100) # Keep last 100 events
|
||||
sse_queues = [] # List of queues for active SSE connections
|
||||
|
||||
|
||||
def add_event(event: dict):
|
||||
"""Add event to global queue and notify all SSE clients."""
|
||||
event_with_ts = {
|
||||
**event,
|
||||
"timestamp": datetime.utcnow().isoformat() + "Z"
|
||||
}
|
||||
event_queue.append(event_with_ts)
|
||||
|
||||
# Notify all SSE clients
|
||||
for q in sse_queues:
|
||||
try:
|
||||
q.put_nowait(event_with_ts)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
class DeviceSimulator:
|
||||
"""Unified simulator for lights and thermostats."""
|
||||
|
||||
def __init__(self):
|
||||
# Light states
|
||||
self.light_states: Dict[str, Dict[str, Any]] = {
|
||||
"test_lampe_1": {"power": "off", "brightness": 50},
|
||||
"test_lampe_2": {"power": "off", "brightness": 50},
|
||||
"test_lampe_3": {"power": "off", "brightness": 50}
|
||||
}
|
||||
|
||||
# Thermostat states
|
||||
self.thermostat_states: Dict[str, Dict[str, Any]] = {
|
||||
"test_thermo_1": {
|
||||
"mode": "auto",
|
||||
"target": 21.0,
|
||||
"current": 20.5,
|
||||
"battery": 90,
|
||||
"window_open": False
|
||||
}
|
||||
}
|
||||
|
||||
self.client = None
|
||||
self.running = True
|
||||
self.drift_task = None
|
||||
self.connected = False
|
||||
|
||||
async def publish_state(self, device_id: str, device_type: str):
|
||||
"""Publish device state to MQTT (retained, QoS 1)."""
|
||||
if not self.client:
|
||||
return
|
||||
|
||||
if device_type == "light":
|
||||
state = self.light_states.get(device_id)
|
||||
elif device_type == "thermostat":
|
||||
state = self.thermostat_states.get(device_id)
|
||||
else:
|
||||
logger.warning(f"Unknown device type: {device_type}")
|
||||
return
|
||||
|
||||
if not state:
|
||||
logger.warning(f"Unknown device: {device_id}")
|
||||
return
|
||||
|
||||
state_topic = f"vendor/{device_id}/state"
|
||||
payload = json.dumps(state)
|
||||
|
||||
await self.client.publish(
|
||||
state_topic,
|
||||
payload=payload,
|
||||
qos=1,
|
||||
retain=True
|
||||
)
|
||||
|
||||
add_event({
|
||||
"type": "state_published",
|
||||
"device_id": device_id,
|
||||
"device_type": device_type,
|
||||
"topic": state_topic,
|
||||
"payload": state
|
||||
})
|
||||
|
||||
logger.info(f"[{device_id}] Published state: {payload}")
|
||||
|
||||
async def handle_light_set(self, device_id: str, payload: dict):
|
||||
"""Handle SET command for light device."""
|
||||
if device_id not in self.light_states:
|
||||
logger.warning(f"Unknown light device: {device_id}")
|
||||
return
|
||||
|
||||
state = self.light_states[device_id]
|
||||
changes = {}
|
||||
|
||||
if "power" in payload:
|
||||
old_power = state["power"]
|
||||
state["power"] = payload["power"]
|
||||
if old_power != state["power"]:
|
||||
changes["power"] = {"old": old_power, "new": state["power"]}
|
||||
logger.info(f"[{device_id}] Power: {old_power} -> {state['power']}")
|
||||
|
||||
if "brightness" in payload:
|
||||
old_brightness = state["brightness"]
|
||||
state["brightness"] = int(payload["brightness"])
|
||||
if old_brightness != state["brightness"]:
|
||||
changes["brightness"] = {"old": old_brightness, "new": state["brightness"]}
|
||||
logger.info(f"[{device_id}] Brightness: {old_brightness} -> {state['brightness']}")
|
||||
|
||||
if changes:
|
||||
add_event({
|
||||
"type": "light_updated",
|
||||
"device_id": device_id,
|
||||
"changes": changes,
|
||||
"new_state": dict(state)
|
||||
})
|
||||
await self.publish_state(device_id, "light")
|
||||
|
||||
async def handle_thermostat_set(self, device_id: str, payload: dict):
|
||||
"""Handle SET command for thermostat device."""
|
||||
if device_id not in self.thermostat_states:
|
||||
logger.warning(f"Unknown thermostat device: {device_id}")
|
||||
return
|
||||
|
||||
state = self.thermostat_states[device_id]
|
||||
changes = {}
|
||||
|
||||
if "mode" in payload:
|
||||
new_mode = payload["mode"]
|
||||
if new_mode in ["off", "heat", "auto"]:
|
||||
old_mode = state["mode"]
|
||||
state["mode"] = new_mode
|
||||
if old_mode != new_mode:
|
||||
changes["mode"] = {"old": old_mode, "new": new_mode}
|
||||
logger.info(f"[{device_id}] Mode: {old_mode} -> {new_mode}")
|
||||
else:
|
||||
logger.warning(f"[{device_id}] Invalid mode: {new_mode}")
|
||||
|
||||
if "target" in payload:
|
||||
try:
|
||||
new_target = float(payload["target"])
|
||||
if 5.0 <= new_target <= 30.0:
|
||||
old_target = state["target"]
|
||||
state["target"] = new_target
|
||||
if old_target != new_target:
|
||||
changes["target"] = {"old": old_target, "new": new_target}
|
||||
logger.info(f"[{device_id}] Target: {old_target}°C -> {new_target}°C")
|
||||
else:
|
||||
logger.warning(f"[{device_id}] Target out of range: {new_target}")
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(f"[{device_id}] Invalid target value: {payload['target']}")
|
||||
|
||||
if changes:
|
||||
add_event({
|
||||
"type": "thermostat_updated",
|
||||
"device_id": device_id,
|
||||
"changes": changes,
|
||||
"new_state": dict(state)
|
||||
})
|
||||
await self.publish_state(device_id, "thermostat")
|
||||
|
||||
def apply_temperature_drift(self, device_id: str):
|
||||
"""
|
||||
Simulate temperature drift for thermostat.
|
||||
Max change: ±0.2°C per interval.
|
||||
"""
|
||||
if device_id not in self.thermostat_states:
|
||||
return
|
||||
|
||||
state = self.thermostat_states[device_id]
|
||||
old_current = state["current"]
|
||||
|
||||
if state["mode"] == "off":
|
||||
# Drift towards ambient (18°C)
|
||||
ambient = 18.0
|
||||
diff = ambient - state["current"]
|
||||
else:
|
||||
# Drift towards target
|
||||
diff = state["target"] - state["current"]
|
||||
|
||||
# Apply max ±0.2°C drift
|
||||
if abs(diff) < 0.1:
|
||||
state["current"] = round(state["current"] + diff, 1)
|
||||
elif diff > 0:
|
||||
state["current"] = round(state["current"] + 0.2, 1)
|
||||
else:
|
||||
state["current"] = round(state["current"] - 0.2, 1)
|
||||
|
||||
if old_current != state["current"]:
|
||||
add_event({
|
||||
"type": "temperature_drift",
|
||||
"device_id": device_id,
|
||||
"old_temp": old_current,
|
||||
"new_temp": state["current"],
|
||||
"target": state["target"],
|
||||
"mode": state["mode"]
|
||||
})
|
||||
|
||||
logger.info(f"[{device_id}] Temperature drift: current={state['current']}°C (target={state['target']}°C, mode={state['mode']})")
|
||||
|
||||
async def thermostat_drift_loop(self):
|
||||
"""Background loop for thermostat temperature drift."""
|
||||
while self.running:
|
||||
await asyncio.sleep(DRIFT_INTERVAL)
|
||||
|
||||
for device_id in THERMOSTAT_DEVICES:
|
||||
self.apply_temperature_drift(device_id)
|
||||
await self.publish_state(device_id, "thermostat")
|
||||
|
||||
async def handle_message(self, message):
|
||||
"""Handle incoming MQTT message."""
|
||||
try:
|
||||
# Extract device_id from topic (vendor/{device_id}/set)
|
||||
topic_parts = message.topic.value.split('/')
|
||||
if len(topic_parts) != 3 or topic_parts[0] != "vendor" or topic_parts[2] != "set":
|
||||
logger.warning(f"Unexpected topic format: {message.topic}")
|
||||
return
|
||||
|
||||
device_id = topic_parts[1]
|
||||
payload = json.loads(message.payload.decode())
|
||||
|
||||
add_event({
|
||||
"type": "command_received",
|
||||
"device_id": device_id,
|
||||
"topic": message.topic.value,
|
||||
"payload": payload
|
||||
})
|
||||
|
||||
logger.info(f"[{device_id}] Received SET: {payload}")
|
||||
|
||||
# Determine device type and handle accordingly
|
||||
if device_id in self.light_states:
|
||||
await self.handle_light_set(device_id, payload)
|
||||
elif device_id in self.thermostat_states:
|
||||
await self.handle_thermostat_set(device_id, payload)
|
||||
else:
|
||||
logger.warning(f"Unknown device: {device_id}")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Invalid JSON: {e}")
|
||||
add_event({"type": "error", "message": f"Invalid JSON: {e}"})
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling message: {e}")
|
||||
add_event({"type": "error", "message": f"Error: {e}"})
|
||||
|
||||
async def run(self):
|
||||
"""Main simulator loop."""
|
||||
# Generate unique client ID to avoid collisions
|
||||
base_client_id = "device_simulator"
|
||||
client_suffix = os.environ.get("MQTT_CLIENT_ID_SUFFIX") or uuid.uuid4().hex[:6]
|
||||
unique_client_id = f"{base_client_id}-{client_suffix}"
|
||||
|
||||
try:
|
||||
async with Client(
|
||||
hostname=BROKER_HOST,
|
||||
port=BROKER_PORT,
|
||||
identifier=unique_client_id
|
||||
) as client:
|
||||
self.client = client
|
||||
self.connected = True
|
||||
|
||||
add_event({
|
||||
"type": "simulator_connected",
|
||||
"broker": f"{BROKER_HOST}:{BROKER_PORT}",
|
||||
"client_id": unique_client_id
|
||||
})
|
||||
|
||||
logger.info(f"✅ Connected to MQTT broker {BROKER_HOST}:{BROKER_PORT}")
|
||||
|
||||
# Publish initial states
|
||||
for device_id in LIGHT_DEVICES:
|
||||
await self.publish_state(device_id, "light")
|
||||
logger.info(f"💡 Light simulator started: {device_id}")
|
||||
|
||||
for device_id in THERMOSTAT_DEVICES:
|
||||
await self.publish_state(device_id, "thermostat")
|
||||
logger.info(f"🌡️ Thermostat simulator started: {device_id}")
|
||||
|
||||
add_event({
|
||||
"type": "devices_initialized",
|
||||
"lights": LIGHT_DEVICES,
|
||||
"thermostats": THERMOSTAT_DEVICES
|
||||
})
|
||||
|
||||
# Subscribe to all SET topics
|
||||
all_devices = LIGHT_DEVICES + THERMOSTAT_DEVICES
|
||||
for device_id in all_devices:
|
||||
set_topic = f"vendor/{device_id}/set"
|
||||
await client.subscribe(set_topic, qos=1)
|
||||
logger.info(f"👂 Subscribed to {set_topic}")
|
||||
|
||||
add_event({
|
||||
"type": "subscriptions_complete",
|
||||
"topics": [f"vendor/{d}/set" for d in all_devices]
|
||||
})
|
||||
|
||||
# Start thermostat drift loop
|
||||
self.drift_task = asyncio.create_task(self.thermostat_drift_loop())
|
||||
|
||||
# Listen for messages
|
||||
async for message in client.messages:
|
||||
await self.handle_message(message)
|
||||
|
||||
# Cancel drift loop on disconnect
|
||||
if self.drift_task:
|
||||
self.drift_task.cancel()
|
||||
|
||||
except MqttError as e:
|
||||
logger.error(f"❌ MQTT Error: {e}")
|
||||
add_event({"type": "error", "message": f"MQTT Error: {e}"})
|
||||
self.connected = False
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error: {e}")
|
||||
add_event({"type": "error", "message": f"Error: {e}"})
|
||||
self.connected = False
|
||||
finally:
|
||||
self.running = False
|
||||
self.connected = False
|
||||
if self.drift_task:
|
||||
self.drift_task.cancel()
|
||||
add_event({"type": "simulator_stopped"})
|
||||
logger.info("👋 Simulator stopped")
|
||||
|
||||
|
||||
# Create FastAPI app
|
||||
app = FastAPI(
|
||||
title="Device Simulator",
|
||||
description="Web interface for device simulator with real-time event streaming",
|
||||
version="1.0.0"
|
||||
)
|
||||
|
||||
# Setup templates
|
||||
templates_dir = Path(__file__).parent / "templates"
|
||||
templates = Jinja2Templates(directory=str(templates_dir))
|
||||
|
||||
# Global simulator instance
|
||||
simulator = DeviceSimulator()
|
||||
simulator_task = None
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""Start simulator on application startup."""
|
||||
global simulator_task
|
||||
simulator_task = asyncio.create_task(simulator.run())
|
||||
add_event({"type": "app_started", "message": "Simulator web app started"})
|
||||
logger.info("🚀 Simulator web app started")
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_event():
|
||||
"""Stop simulator on application shutdown."""
|
||||
global simulator_task
|
||||
simulator.running = False
|
||||
if simulator_task:
|
||||
simulator_task.cancel()
|
||||
try:
|
||||
await simulator_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
add_event({"type": "app_stopped", "message": "Simulator web app stopped"})
|
||||
logger.info("🛑 Simulator web app stopped")
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def index(request: Request):
|
||||
"""Render simulator dashboard."""
|
||||
return templates.TemplateResponse("index.html", {
|
||||
"request": request,
|
||||
"broker": f"{BROKER_HOST}:{BROKER_PORT}",
|
||||
"light_devices": LIGHT_DEVICES,
|
||||
"thermostat_devices": THERMOSTAT_DEVICES,
|
||||
"connected": simulator.connected
|
||||
})
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
"""Health check endpoint."""
|
||||
return {
|
||||
"status": "ok",
|
||||
"simulator_running": simulator.running,
|
||||
"mqtt_connected": simulator.connected
|
||||
}
|
||||
|
||||
|
||||
@app.get("/status")
|
||||
async def status():
|
||||
"""Get current simulator status."""
|
||||
return {
|
||||
"connected": simulator.connected,
|
||||
"running": simulator.running,
|
||||
"light_states": simulator.light_states,
|
||||
"thermostat_states": simulator.thermostat_states,
|
||||
"broker": f"{BROKER_HOST}:{BROKER_PORT}"
|
||||
}
|
||||
|
||||
|
||||
@app.get("/events")
|
||||
async def get_events():
|
||||
"""Get recent events."""
|
||||
return {
|
||||
"events": list(event_queue)
|
||||
}
|
||||
|
||||
|
||||
@app.get("/realtime")
|
||||
async def realtime(request: Request):
|
||||
"""Server-Sent Events stream for real-time updates."""
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
# Create a queue for this SSE connection
|
||||
q = asyncio.Queue()
|
||||
sse_queues.append(q)
|
||||
|
||||
try:
|
||||
# Send recent events first
|
||||
for event in event_queue:
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
|
||||
# Stream new events
|
||||
while True:
|
||||
# Check if client disconnected
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
|
||||
try:
|
||||
# Wait for new event with timeout
|
||||
event = await asyncio.wait_for(q.get(), timeout=30.0)
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
# Send heartbeat
|
||||
yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n"
|
||||
|
||||
finally:
|
||||
# Remove queue on disconnect
|
||||
if q in sse_queues:
|
||||
sse_queues.remove(q)
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no"
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8003)
|
||||
Reference in New Issue
Block a user