""" API Client for HomeKit Bridge Handles all HTTP communication with the REST API backend. """ import logging from typing import Dict, List, Iterator, Optional import httpx import json import time logger = logging.getLogger(__name__) class ApiClient: """HTTP client for communicating with the home automation API.""" def __init__(self, base_url: str, token: Optional[str] = None, timeout: int = 5): """ Initialize API client. Args: base_url: Base URL of the API (e.g., "http://192.168.1.100:8001") token: Optional API token for authentication timeout: Request timeout in seconds """ self.base_url = base_url.rstrip('/') self.timeout = timeout self.headers = {} if token: self.headers['Authorization'] = f'Bearer {token}' def get_devices(self) -> List[Dict]: """ Get list of all devices. Returns: List of device dictionaries """ try: response = httpx.get( f'{self.base_url}/devices', headers=self.headers, timeout=self.timeout ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Failed to get devices: {e}") raise def get_layout(self) -> Dict: """ Get layout information (rooms and device assignments). Returns: Layout dictionary with room structure """ try: response = httpx.get( f'{self.base_url}/layout', headers=self.headers, timeout=self.timeout ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Failed to get layout: {e}") raise def get_device_state(self, device_id: str) -> Dict: """ Get current state of a specific device. Args: device_id: Device identifier Returns: Device state dictionary """ try: response = httpx.get( f'{self.base_url}/devices/{device_id}/state', headers=self.headers, timeout=self.timeout ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Failed to get state for {device_id}: {e}") raise def post_device_set(self, device_id: str, device_type: str, payload: Dict) -> None: """ Send command to a device. Args: device_id: Device identifier device_type: Device type (e.g., "light", "thermostat") payload: Command payload (e.g., {"power": "on", "brightness": 75}) """ try: data = { "type": device_type, "payload": payload } response = httpx.post( f'{self.base_url}/devices/{device_id}/set', headers=self.headers, json=data, timeout=self.timeout ) response.raise_for_status() logger.debug(f"Set {device_id}: {payload}") except Exception as e: logger.error(f"Failed to set {device_id}: {e}") raise def stream_realtime(self, reconnect_delay: int = 5) -> Iterator[Dict]: """ Stream real-time events from the API using Server-Sent Events (SSE). Automatically reconnects on connection loss. Args: reconnect_delay: Seconds to wait before reconnecting Yields: Event dictionaries: {"type": "state", "device_id": "...", "payload": {...}, "ts": ...} """ while True: try: logger.info("Connecting to realtime event stream...") with httpx.stream( 'GET', f'{self.base_url}/realtime', headers=self.headers, timeout=None # No timeout for streaming ) as response: response.raise_for_status() logger.info("Connected to realtime event stream") for line in response.iter_lines(): if line.startswith('data: '): data_str = line[6:] # Remove 'data: ' prefix try: event = json.loads(data_str) yield event except json.JSONDecodeError as e: logger.warning(f"Failed to parse SSE event: {e}") except httpx.HTTPError as e: logger.error(f"Realtime stream error: {e}") logger.info(f"Reconnecting in {reconnect_delay} seconds...") time.sleep(reconnect_delay) except Exception as e: logger.error(f"Unexpected error in realtime stream: {e}") logger.info(f"Reconnecting in {reconnect_delay} seconds...") time.sleep(reconnect_delay)