From c3ec6e3fc479b8901e0babdeb9f5a17244d19d9d Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 31 Oct 2025 15:17:28 +0100 Subject: [PATCH] zwei Lampen und Test-Werkzeug --- apps/abstraction/main.py | 42 +++- apps/api/main.py | 114 ++++++++++- apps/ui/templates/index.html | 362 +++++++++++++++++++++++++++++++++-- config/devices.yaml | 34 ++-- pyproject.toml | 1 + tools/sim_test_lampe.py | 186 ++++++++++++++++++ tools/test_config_loader.py | 253 ++++++++++++++++++++++++ 7 files changed, 949 insertions(+), 43 deletions(-) create mode 100644 tools/sim_test_lampe.py create mode 100644 tools/test_config_loader.py diff --git a/apps/abstraction/main.py b/apps/abstraction/main.py index ba443d2..717b658 100644 --- a/apps/abstraction/main.py +++ b/apps/abstraction/main.py @@ -43,6 +43,11 @@ def load_config(config_path: Path) -> dict[str, Any]: with open(config_path, "r") as f: config = yaml.safe_load(f) + # Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally + devices = config.get("devices", []) + for device in devices: + device["device_id"] = device.pop("device_id", device.pop("id", None)) + logger.info(f"Loaded configuration from {config_path}") return config @@ -56,16 +61,33 @@ def validate_devices(devices: list[dict[str, Any]]) -> None: Raises: ValueError: If device configuration is invalid """ + required_fields = ["device_id", "type", "cap_version", "technology"] + for device in devices: - if "id" not in device: - raise ValueError(f"Device missing 'id': {device}") - if "type" not in device: - raise ValueError(f"Device {device['id']} missing 'type'") + # Check for device_id + if "device_id" not in device or device["device_id"] is None: + raise ValueError(f"Device entry requires 'id' or 'device_id': {device}") + + device_id = device["device_id"] + + # Check required top-level fields + for field in required_fields: + if field not in device: + raise ValueError(f"Device {device_id} missing '{field}'") + + # Check topics structure if "topics" not in device: - raise ValueError(f"Device {device['id']} missing 'topics'") - if "set" not in device["topics"] or "state" not in device["topics"]: - raise ValueError(f"Device {device['id']} missing 'topics.set' or 'topics.state'") - logger.info(f"Validated {len(devices)} device(s)") + raise ValueError(f"Device {device_id} missing 'topics'") + + if "set" not in device["topics"]: + raise ValueError(f"Device {device_id} missing 'topics.set'") + + if "state" not in device["topics"]: + raise ValueError(f"Device {device_id} missing 'topics.state'") + + # Log loaded devices + device_ids = [d["device_id"] for d in devices] + logger.info(f"Loaded {len(devices)} device(s): {', '.join(device_ids)}") async def get_redis_client(redis_url: str, max_retries: int = 5) -> aioredis.Redis: @@ -172,7 +194,7 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N redis_config = config.get("redis", {}) redis_channel = redis_config.get("channel", "ui:updates") - devices = {d["id"]: d for d in config.get("devices", [])} + devices = {d["device_id"]: d for d in config.get("devices", [])} retry_delay = 1 max_retry_delay = 60 @@ -191,7 +213,7 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N # Subscribe to abstract SET topics for all devices for device in devices.values(): - abstract_set_topic = f"home/{device['type']}/{device['id']}/set" + abstract_set_topic = f"home/{device['type']}/{device['device_id']}/set" await client.subscribe(abstract_set_topic) logger.info(f"Subscribed to abstract SET: {abstract_set_topic}") diff --git a/apps/api/main.py b/apps/api/main.py index be70d9d..242694e 100644 --- a/apps/api/main.py +++ b/apps/api/main.py @@ -1,14 +1,17 @@ """API main entry point.""" +import asyncio import json import os from pathlib import Path -from typing import Any +from typing import Any, AsyncGenerator +import redis.asyncio as aioredis import yaml from aiomqtt import Client -from fastapi import FastAPI, HTTPException, status +from fastapi import FastAPI, HTTPException, Request, status from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse from pydantic import BaseModel, ValidationError from packages.home_capabilities import CAP_VERSION, LightState @@ -85,7 +88,12 @@ def load_devices() -> list[dict[str, Any]]: with open(config_path, "r") as f: config = yaml.safe_load(f) - return config.get("devices", []) + # Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally + devices = config.get("devices", []) + for device in devices: + device["device_id"] = device.pop("device_id", device.pop("id", None)) + + return devices def get_mqtt_settings() -> tuple[str, int]: @@ -99,6 +107,25 @@ def get_mqtt_settings() -> tuple[str, int]: return host, port +def get_redis_settings() -> tuple[str, str]: + """Get Redis settings from configuration. + + Returns: + tuple: (url, channel) + """ + config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml" + + if config_path.exists(): + with open(config_path, "r") as f: + config = yaml.safe_load(f) + redis_config = config.get("redis", {}) + url = redis_config.get("url", "redis://localhost:6379/0") + channel = redis_config.get("channel", "ui:updates") + return url, channel + + return "redis://localhost:6379/0", "ui:updates" + + async def publish_mqtt(topic: str, payload: dict[str, Any]) -> None: """Publish message to MQTT broker. @@ -123,9 +150,9 @@ async def get_devices() -> list[DeviceInfo]: devices = load_devices() return [ DeviceInfo( - device_id=device["id"], + device_id=device["device_id"], type=device["type"], - name=device.get("name", device["id"]) + name=device.get("name", device["device_id"]) ) for device in devices ] @@ -147,7 +174,7 @@ async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str """ # Load devices and check if device exists devices = load_devices() - device = next((d for d in devices if d["id"] == device_id), None) + device = next((d for d in devices if d["device_id"] == device_id), None) if not device: raise HTTPException( @@ -182,6 +209,81 @@ async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str return {"message": f"Command sent to {device_id}"} +async def event_generator(request: Request) -> AsyncGenerator[str, None]: + """Generate SSE events from Redis Pub/Sub. + + Args: + request: FastAPI request object for disconnect detection + + Yields: + str: SSE formatted event strings + """ + redis_url, redis_channel = get_redis_settings() + redis_client = await aioredis.from_url(redis_url, decode_responses=True) + pubsub = redis_client.pubsub() + + try: + await pubsub.subscribe(redis_channel) + + # Create heartbeat task + last_heartbeat = asyncio.get_event_loop().time() + + while True: + # Check if client disconnected + if await request.is_disconnected(): + break + + # Get message with timeout for heartbeat + try: + message = await asyncio.wait_for( + pubsub.get_message(ignore_subscribe_messages=True), + timeout=1.0 + ) + + if message and message["type"] == "message": + # Send data event + data = message["data"] + yield f"event: message\ndata: {data}\n\n" + last_heartbeat = asyncio.get_event_loop().time() + + except asyncio.TimeoutError: + pass + + # Send heartbeat every 25 seconds + current_time = asyncio.get_event_loop().time() + if current_time - last_heartbeat >= 25: + yield "event: ping\ndata: heartbeat\n\n" + last_heartbeat = current_time + + finally: + await pubsub.unsubscribe(redis_channel) + await pubsub.close() + await redis_client.close() + + +@app.get("/realtime") +async def realtime_events(request: Request) -> StreamingResponse: + """Server-Sent Events endpoint for real-time updates. + + Args: + request: FastAPI request object + + Returns: + StreamingResponse: SSE stream of Redis messages + """ + return StreamingResponse( + event_generator(request), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + } + ) + + return {"message": f"Command sent to {device_id}"} + + def main() -> None: """Run the API application with uvicorn.""" import uvicorn diff --git a/apps/ui/templates/index.html b/apps/ui/templates/index.html index f0cf3ad..2288fc1 100644 --- a/apps/ui/templates/index.html +++ b/apps/ui/templates/index.html @@ -15,18 +15,20 @@ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; - display: flex; - justify-content: center; - align-items: center; + padding: 2rem; } .container { + max-width: 1200px; + margin: 0 auto; + } + + header { background: white; border-radius: 16px; padding: 2rem; - box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3); - max-width: 600px; - width: 90%; + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); + margin-bottom: 2rem; } h1 { @@ -34,17 +36,355 @@ margin-bottom: 0.5rem; } - p { + .status { + display: inline-block; + padding: 0.25rem 0.75rem; + border-radius: 12px; + font-size: 0.875rem; + font-weight: 500; + } + + .status.connected { + background: #d4edda; + color: #155724; + } + + .status.disconnected { + background: #f8d7da; + color: #721c24; + } + + .devices { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(300px, 1fr)); + gap: 1.5rem; + } + + .device-card { + background: white; + border-radius: 16px; + padding: 1.5rem; + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); + } + + .device-header { + margin-bottom: 1.5rem; + } + + .device-name { + font-size: 1.5rem; + font-weight: 600; + color: #333; + margin-bottom: 0.25rem; + } + + .device-type { + font-size: 0.875rem; color: #666; - line-height: 1.6; + text-transform: uppercase; + } + + .device-state { + padding: 0.5rem 1rem; + background: #f8f9fa; + border-radius: 8px; + margin: 1rem 0; + font-family: 'Courier New', monospace; + font-size: 0.875rem; + } + + .state-label { + color: #666; + font-weight: 500; + } + + .state-value { + color: #333; + font-weight: 600; + } + + .state-value.on { + color: #28a745; + } + + .state-value.off { + color: #dc3545; + } + + .controls { + display: flex; + flex-direction: column; + gap: 1rem; + } + + .toggle-button { + padding: 0.75rem 1.5rem; + border: none; + border-radius: 8px; + font-size: 1rem; + font-weight: 600; + cursor: pointer; + transition: all 0.2s; + color: white; + } + + .toggle-button.on { + background: #28a745; + } + + .toggle-button.on:hover { + background: #218838; + } + + .toggle-button.off { + background: #6c757d; + } + + .toggle-button.off:hover { + background: #5a6268; + } + + .events { + margin-top: 2rem; + background: white; + border-radius: 16px; + padding: 1.5rem; + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); + } + + .events h2 { + color: #333; + margin-bottom: 1rem; + font-size: 1.25rem; + } + + .event-list { + max-height: 300px; + overflow-y: auto; + } + + .event-item { + padding: 0.75rem; + border-left: 3px solid #667eea; + background: #f8f9fa; + margin-bottom: 0.5rem; + border-radius: 4px; + font-size: 0.875rem; + } + + .event-time { + color: #666; + font-size: 0.75rem; + } + + .event-data { + color: #333; + margin-top: 0.25rem; + font-family: 'Courier New', monospace; }
-

🏠 Home Automation

-

UI wird geladen...

-

API erreichbar? API Health Check

+
+

🏠 Home Automation

+

Realtime Status: Verbinde...

+
+ +
+ +
+
+
💡 Test Lampe 1
+
Light
+
+ +
+ Status: + off +
+ +
+ +
+
+
+
+
💡 Test Lampe 2
+
Light
+
+ +
+ Status: + off +
+ +
+ +
+
+
+ +
+

📡 Realtime Events

+
+

Warte auf Events...

+
+
+ + diff --git a/config/devices.yaml b/config/devices.yaml index 22c0c4c..7fd0270 100644 --- a/config/devices.yaml +++ b/config/devices.yaml @@ -1,5 +1,4 @@ -# Device Configuration -# Configuration for home automation devices +version: 1 mqtt: broker: "172.16.2.16" @@ -14,18 +13,21 @@ redis: channel: "ui:updates" devices: - - id: "test_lampe" - type: "light" - name: "Test Lampe" + - device_id: test_lampe_1 + type: light + cap_version: "light@1.2.0" + technology: zigbee2mqtt + features: + power: true topics: - set: "vendor/test_lampe/set" - state: "vendor/test_lampe/state" - # - color - - # - id: "light_bedroom" - # type: "light" - # name: "Bedroom Light" - # mqtt_topic: "home/bedroom/light" - # capabilities: - # - power - # - brightness + set: "vendor/test_lampe_1/set" + state: "vendor/test_lampe_1/state" + - device_id: test_lampe_2 + type: light + cap_version: "light@1.2.0" + technology: zigbee2mqtt + features: + power: true + topics: + set: "vendor/test_lampe_2/set" + state: "vendor/test_lampe_2/state" diff --git a/pyproject.toml b/pyproject.toml index b42a19f..8c2b3db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ aiomqtt = "^2.4.0" jinja2 = "^3.1.6" apscheduler = "^3.11.0" redis = "^7.0.1" +paho-mqtt = "^2.1.0" [tool.poetry.group.dev.dependencies] ruff = "^0.6.0" diff --git a/tools/sim_test_lampe.py b/tools/sim_test_lampe.py new file mode 100644 index 0000000..b889de3 --- /dev/null +++ b/tools/sim_test_lampe.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +"""MQTT Simulator for test_lampe device. + +This simulator acts as a virtual light device that: +- Subscribes to vendor/test_lampe/set +- Maintains local state +- Publishes state changes to vendor/test_lampe/state (retained) +""" + +import json +import logging +import os +import signal +import sys +import time + +import paho.mqtt.client as mqtt + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Configuration +BROKER_HOST = os.environ.get("MQTT_HOST", "172.16.2.16") +BROKER_PORT = int(os.environ.get("MQTT_PORT", "1883")) +DEVICE_ID = "test_lampe_1" +SET_TOPIC = f"vendor/{DEVICE_ID}/set" +STATE_TOPIC = f"vendor/{DEVICE_ID}/state" + +# Device state +device_state = { + "power": "off", + "brightness": 50 +} + +# Global client for signal handler +client_global = None + + +def on_connect(client, userdata, flags, rc, properties=None): + """Callback when connected to MQTT broker. + + Args: + client: MQTT client instance + userdata: User data + flags: Connection flags + rc: Connection result code + properties: Connection properties (MQTT v5) + """ + if rc == 0: + logger.info(f"Connected to MQTT broker {BROKER_HOST}:{BROKER_PORT}") + + # Subscribe to SET topic + client.subscribe(SET_TOPIC, qos=1) + logger.info(f"Subscribed to {SET_TOPIC}") + + # Publish initial state (retained) + publish_state(client) + logger.info(f"Simulator started, initial state published: {device_state}") + else: + logger.error(f"Connection failed with code {rc}") + + +def on_message(client, userdata, msg): + """Callback when message received on subscribed topic. + + Args: + client: MQTT client instance + userdata: User data + msg: MQTT message + """ + global device_state + + try: + payload = json.loads(msg.payload.decode()) + logger.info(f"Received SET command: {payload}") + + # Update device state + updated = False + + if "power" in payload: + old_power = device_state["power"] + device_state["power"] = payload["power"] + if old_power != device_state["power"]: + updated = True + logger.info(f"Power changed: {old_power} -> {device_state['power']}") + + if "brightness" in payload: + old_brightness = device_state["brightness"] + device_state["brightness"] = int(payload["brightness"]) + if old_brightness != device_state["brightness"]: + updated = True + logger.info(f"Brightness changed: {old_brightness} -> {device_state['brightness']}") + + # Publish updated state if changed + if updated: + publish_state(client) + logger.info(f"Published new state: {device_state}") + + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in message: {e}") + except Exception as e: + logger.error(f"Error processing message: {e}") + + +def publish_state(client): + """Publish current device state to STATE topic. + + Args: + client: MQTT client instance + """ + state_json = json.dumps(device_state) + result = client.publish(STATE_TOPIC, state_json, qos=1, retain=True) + + if result.rc == mqtt.MQTT_ERR_SUCCESS: + logger.debug(f"Published state to {STATE_TOPIC}: {state_json}") + else: + logger.error(f"Failed to publish state: {result.rc}") + + +def signal_handler(sig, frame): + """Handle shutdown signals gracefully. + + Args: + sig: Signal number + frame: Current stack frame + """ + logger.info(f"Received signal {sig}, shutting down...") + + if client_global: + # Publish offline state before disconnecting + offline_state = device_state.copy() + offline_state["power"] = "off" + client_global.publish(STATE_TOPIC, json.dumps(offline_state), qos=1, retain=True) + logger.info("Published offline state") + + client_global.disconnect() + client_global.loop_stop() + + sys.exit(0) + + +def main(): + """Main entry point for the simulator.""" + global client_global + + # Setup signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Create MQTT client + client = mqtt.Client( + client_id=f"simulator-{DEVICE_ID}", + protocol=mqtt.MQTTv5, + callback_api_version=mqtt.CallbackAPIVersion.VERSION2 + ) + client_global = client + + # Set callbacks + client.on_connect = on_connect + client.on_message = on_message + + # Connect to broker + logger.info(f"Connecting to MQTT broker {BROKER_HOST}:{BROKER_PORT}...") + + try: + client.connect(BROKER_HOST, BROKER_PORT, keepalive=60) + + # Start network loop + client.loop_forever() + + except KeyboardInterrupt: + logger.info("Interrupted by user") + except Exception as e: + logger.error(f"Error: {e}") + finally: + if client.is_connected(): + client.disconnect() + client.loop_stop() + + +if __name__ == "__main__": + main() diff --git a/tools/test_config_loader.py b/tools/test_config_loader.py new file mode 100644 index 0000000..429c014 --- /dev/null +++ b/tools/test_config_loader.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +"""Test configuration loader with different YAML formats.""" + +import sys +import tempfile +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from apps.abstraction.main import load_config, validate_devices + + +def test_device_id_format(): + """Test YAML with 'device_id' key.""" + yaml_content = """version: 1 + +mqtt: + broker: "172.16.2.16" + port: 1883 + client_id: "test" + +redis: + url: "redis://localhost:6379/8" + channel: "ui:updates" + +devices: + - device_id: test_lampe + type: light + cap_version: "light@1.2.0" + technology: zigbee2mqtt + features: + power: true + topics: + set: "vendor/test_lampe/set" + state: "vendor/test_lampe/state" +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write(yaml_content) + temp_path = Path(f.name) + + try: + config = load_config(temp_path) + devices = config.get("devices", []) + validate_devices(devices) + print("✓ Test 1 passed: YAML with 'device_id' loaded successfully") + print(f" Device ID: {devices[0]['device_id']}") + return True + except Exception as e: + print(f"✗ Test 1 failed: {e}") + return False + finally: + temp_path.unlink() + + +def test_id_format(): + """Test YAML with 'id' key (legacy format).""" + yaml_content = """version: 1 + +mqtt: + broker: "172.16.2.16" + port: 1883 + client_id: "test" + +redis: + url: "redis://localhost:6379/8" + channel: "ui:updates" + +devices: + - id: test_lampe + type: light + cap_version: "light@1.2.0" + technology: zigbee2mqtt + features: + power: true + topics: + set: "vendor/test_lampe/set" + state: "vendor/test_lampe/state" +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write(yaml_content) + temp_path = Path(f.name) + + try: + config = load_config(temp_path) + devices = config.get("devices", []) + validate_devices(devices) + print("✓ Test 2 passed: YAML with 'id' loaded successfully") + print(f" Device ID: {devices[0]['device_id']}") + return True + except Exception as e: + print(f"✗ Test 2 failed: {e}") + return False + finally: + temp_path.unlink() + + +def test_missing_id(): + """Test YAML without 'id' or 'device_id' key.""" + yaml_content = """version: 1 + +mqtt: + broker: "172.16.2.16" + port: 1883 + +devices: + - type: light + cap_version: "light@1.2.0" + technology: zigbee2mqtt + topics: + set: "vendor/test_lampe/set" + state: "vendor/test_lampe/state" +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write(yaml_content) + temp_path = Path(f.name) + + try: + config = load_config(temp_path) + devices = config.get("devices", []) + validate_devices(devices) + print("✗ Test 3 failed: Should have raised ValueError for missing device_id") + return False + except ValueError as e: + if "requires 'id' or 'device_id'" in str(e): + print(f"✓ Test 3 passed: Correct error for missing device_id") + print(f" Error: {e}") + return True + else: + print(f"✗ Test 3 failed: Wrong error message: {e}") + return False + except Exception as e: + print(f"✗ Test 3 failed: Unexpected error: {e}") + return False + finally: + temp_path.unlink() + + +def test_missing_topics_set(): + """Test YAML without 'topics.set'.""" + yaml_content = """version: 1 + +mqtt: + broker: "172.16.2.16" + port: 1883 + +devices: + - device_id: test_lampe + type: light + cap_version: "light@1.2.0" + technology: zigbee2mqtt + topics: + state: "vendor/test_lampe/state" +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write(yaml_content) + temp_path = Path(f.name) + + try: + config = load_config(temp_path) + devices = config.get("devices", []) + validate_devices(devices) + print("✗ Test 4 failed: Should have raised ValueError for missing topics.set") + return False + except ValueError as e: + if "missing 'topics.set'" in str(e): + print(f"✓ Test 4 passed: Correct error for missing topics.set") + print(f" Error: {e}") + return True + else: + print(f"✗ Test 4 failed: Wrong error message: {e}") + return False + except Exception as e: + print(f"✗ Test 4 failed: Unexpected error: {e}") + return False + finally: + temp_path.unlink() + + +def test_missing_topics_state(): + """Test YAML without 'topics.state'.""" + yaml_content = """version: 1 + +mqtt: + broker: "172.16.2.16" + port: 1883 + +devices: + - device_id: test_lampe + type: light + cap_version: "light@1.2.0" + technology: zigbee2mqtt + topics: + set: "vendor/test_lampe/set" +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write(yaml_content) + temp_path = Path(f.name) + + try: + config = load_config(temp_path) + devices = config.get("devices", []) + validate_devices(devices) + print("✗ Test 5 failed: Should have raised ValueError for missing topics.state") + return False + except ValueError as e: + if "missing 'topics.state'" in str(e): + print(f"✓ Test 5 passed: Correct error for missing topics.state") + print(f" Error: {e}") + return True + else: + print(f"✗ Test 5 failed: Wrong error message: {e}") + return False + except Exception as e: + print(f"✗ Test 5 failed: Unexpected error: {e}") + return False + finally: + temp_path.unlink() + + +def main(): + """Run all tests.""" + print("Testing Configuration Loader") + print("=" * 60) + + tests = [ + test_device_id_format, + test_id_format, + test_missing_id, + test_missing_topics_set, + test_missing_topics_state, + ] + + results = [] + for test in tests: + results.append(test()) + print() + + print("=" * 60) + passed = sum(results) + total = len(results) + print(f"Results: {passed}/{total} tests passed") + + return 0 if all(results) else 1 + + +if __name__ == "__main__": + sys.exit(main())