#!/usr/bin/env python3 """MQTT Simulator for multiple test_lampe devices. This simulator acts as virtual light devices that: - Subscribe to vendor/test_lampe_*/set - Maintain local state for each device - Publish state changes to vendor/test_lampe_*/state (retained) """ import json import logging import os import signal import sys import time import paho.mqtt.client as mqtt # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Configuration BROKER_HOST = os.environ.get("MQTT_HOST", "172.16.2.16") BROKER_PORT = int(os.environ.get("MQTT_PORT", "1883")) # Devices to simulate DEVICES = ["test_lampe_1", "test_lampe_2", "test_lampe_3"] # Device states (one per device) device_states = { "test_lampe_1": { "power": "off", "brightness": 50 }, "test_lampe_2": { "power": "off", "brightness": 50 }, "test_lampe_3": { "power": "off", "brightness": 50 } } # Global client for signal handler client_global = None def on_connect(client, userdata, flags, rc, properties=None): """Callback when connected to MQTT broker. Args: client: MQTT client instance userdata: User data flags: Connection flags rc: Connection result code properties: Connection properties (MQTT v5) """ if rc == 0: logger.info(f"Connected to MQTT broker {BROKER_HOST}:{BROKER_PORT}") # Subscribe to SET topics for all devices for device_id in DEVICES: set_topic = f"vendor/{device_id}/set" client.subscribe(set_topic, qos=1) logger.info(f"Subscribed to {set_topic}") # Publish initial states (retained) for device_id in DEVICES: publish_state(client, device_id) logger.info(f"Simulator started for {device_id}, initial state: {device_states[device_id]}") else: logger.error(f"Connection failed with code {rc}") def on_message(client, userdata, msg): """Callback when message received on subscribed topic. Args: client: MQTT client instance userdata: User data msg: MQTT message """ # Extract device_id from topic (vendor/test_lampe_X/set) topic_parts = msg.topic.split('/') if len(topic_parts) != 3 or topic_parts[0] != "vendor" or topic_parts[2] != "set": logger.warning(f"Unexpected topic format: {msg.topic}") return device_id = topic_parts[1] if device_id not in device_states: logger.warning(f"Unknown device: {device_id}") return try: payload = json.loads(msg.payload.decode()) logger.info(f"[{device_id}] Received SET command: {payload}") # Update device state updated = False device_state = device_states[device_id] if "power" in payload: old_power = device_state["power"] device_state["power"] = payload["power"] if old_power != device_state["power"]: updated = True logger.info(f"[{device_id}] Power changed: {old_power} -> {device_state['power']}") if "brightness" in payload: old_brightness = device_state["brightness"] device_state["brightness"] = int(payload["brightness"]) if old_brightness != device_state["brightness"]: updated = True logger.info(f"[{device_id}] Brightness changed: {old_brightness} -> {device_state['brightness']}") # Publish updated state if changed if updated: publish_state(client, device_id) logger.info(f"[{device_id}] Published new state: {device_state}") except json.JSONDecodeError as e: logger.error(f"[{device_id}] Invalid JSON in message: {e}") except Exception as e: logger.error(f"[{device_id}] Error processing message: {e}") def publish_state(client, device_id): """Publish current device state to STATE topic. Args: client: MQTT client instance device_id: Device identifier """ device_state = device_states[device_id] state_topic = f"vendor/{device_id}/state" state_json = json.dumps(device_state) result = client.publish(state_topic, state_json, qos=1, retain=True) if result.rc == mqtt.MQTT_ERR_SUCCESS: logger.debug(f"[{device_id}] Published state to {state_topic}: {state_json}") else: logger.error(f"[{device_id}] Failed to publish state: {result.rc}") def signal_handler(sig, frame): """Handle shutdown signals gracefully. Args: sig: Signal number frame: Current stack frame """ logger.info(f"Received signal {sig}, shutting down...") if client_global: # Publish offline state for all devices before disconnecting for device_id in DEVICES: offline_state = device_states[device_id].copy() offline_state["power"] = "off" state_topic = f"vendor/{device_id}/state" client_global.publish(state_topic, json.dumps(offline_state), qos=1, retain=True) logger.info(f"[{device_id}] Published offline state") client_global.disconnect() client_global.loop_stop() sys.exit(0) def main(): """Main entry point for the simulator.""" global client_global # Setup signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Create MQTT client client = mqtt.Client( client_id="simulator-test-lampes", protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2 ) client_global = client # Set callbacks client.on_connect = on_connect client.on_message = on_message # Connect to broker logger.info(f"Connecting to MQTT broker {BROKER_HOST}:{BROKER_PORT}...") try: client.connect(BROKER_HOST, BROKER_PORT, keepalive=60) # Start network loop client.loop_forever() except KeyboardInterrupt: logger.info("Interrupted by user") except Exception as e: logger.error(f"Error: {e}") finally: if client.is_connected(): client.disconnect() client.loop_stop() if __name__ == "__main__": main()